diff --git a/epictest/addcat_test.go b/epictest/addcat_test.go index 0fb492e57..a69660cf4 100644 --- a/epictest/addcat_test.go +++ b/epictest/addcat_test.go @@ -104,7 +104,10 @@ func AddCatBytes(data []byte, conf Config) error { sessionGenerator := bitswap.NewSessionGenerator( tn.VirtualNetwork(delay.Fixed(conf.NetworkLatency)), // TODO rename VirtualNetwork - mockrouting.NewServerWithDelay(delay.Fixed(conf.RoutingLatency)), + mockrouting.NewServerWithDelay(mockrouting.DelayConfig{ + Query: delay.Fixed(conf.RoutingLatency), + ValueVisibility: delay.Fixed(conf.RoutingLatency), + }), ) defer sessionGenerator.Close() diff --git a/routing/mock/client.go b/routing/mock/client.go index f4702aae6..444a4b960 100644 --- a/routing/mock/client.go +++ b/routing/mock/client.go @@ -67,6 +67,8 @@ func (c *client) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-cha return out } +// Provide returns once the message is on the network. Value is not necessarily +// visible yet. func (c *client) Provide(_ context.Context, key u.Key) error { return c.server.Announce(c.peer, key) } diff --git a/routing/mock/interface.go b/routing/mock/interface.go index e84a9ba5a..639736292 100644 --- a/routing/mock/interface.go +++ b/routing/mock/interface.go @@ -28,13 +28,25 @@ type Client interface { // NewServer returns a mockrouting Server func NewServer() Server { - return NewServerWithDelay(delay.Fixed(0)) + return NewServerWithDelay(DelayConfig{ + ValueVisibility: delay.Fixed(0), + Query: delay.Fixed(0), + }) } // NewServerWithDelay returns a mockrouting Server with a delay! -func NewServerWithDelay(d delay.D) Server { +func NewServerWithDelay(conf DelayConfig) Server { return &s{ - providers: make(map[u.Key]peer.Map), - delay: d, + providers: make(map[u.Key]map[u.Key]providerRecord), + delayConf: conf, } } + +type DelayConfig struct { + // ValueVisibility is the time it takes for a value to be visible in the network + // FIXME there _must_ be a better term for this + ValueVisibility delay.D + + // Query is the time it takes to receive a response from a routing query + Query delay.D +} diff --git a/routing/mock/mockrouting_test.go b/routing/mock/mockrouting_test.go index 3f9bfab6c..6700cd8ed 100644 --- a/routing/mock/mockrouting_test.go +++ b/routing/mock/mockrouting_test.go @@ -3,10 +3,12 @@ package mockrouting import ( "bytes" "testing" + "time" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - "github.com/jbenet/go-ipfs/peer" + peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" + delay "github.com/jbenet/go-ipfs/util/delay" testutil "github.com/jbenet/go-ipfs/util/testutil" ) @@ -129,3 +131,36 @@ func TestCanceledContext(t *testing.T) { t.Fatal("Context cancel had no effect") } } + +func TestValidAfter(t *testing.T) { + + var p = testutil.NewPeerWithID(peer.ID([]byte("the peer id"))) + var key = u.Key("mock key") + var ctx = context.Background() + conf := DelayConfig{ + ValueVisibility: delay.Fixed(1 * time.Hour), + Query: delay.Fixed(0), + } + + rs := NewServerWithDelay(conf) + + rs.Client(p).Provide(ctx, key) + + var providers []peer.Peer + providers, err := rs.Client(p).FindProviders(ctx, key) + if err != nil { + t.Fatal(err) + } + if len(providers) > 0 { + t.Fail() + } + + conf.ValueVisibility.Set(0) + providers, err = rs.Client(p).FindProviders(ctx, key) + if err != nil { + t.Fatal(err) + } + if len(providers) != 1 { + t.Fail() + } +} diff --git a/routing/mock/server.go b/routing/mock/server.go index 3e189d954..e176c7aeb 100644 --- a/routing/mock/server.go +++ b/routing/mock/server.go @@ -3,11 +3,11 @@ package mockrouting import ( "math/rand" "sync" + "time" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" - delay "github.com/jbenet/go-ipfs/util/delay" ) // server is the mockrouting.Client's private interface to the routing server @@ -20,39 +20,47 @@ type server interface { // s is an implementation of the private server interface type s struct { - delay delay.D + delayConf DelayConfig lock sync.RWMutex - providers map[u.Key]peer.Map + providers map[u.Key]map[u.Key]providerRecord +} + +type providerRecord struct { + Peer peer.Peer + Created time.Time } func (rs *s) Announce(p peer.Peer, k u.Key) error { - rs.delay.Wait() // before locking - rs.lock.Lock() defer rs.lock.Unlock() _, ok := rs.providers[k] if !ok { - rs.providers[k] = make(peer.Map) + rs.providers[k] = make(map[u.Key]providerRecord) + } + rs.providers[k][p.Key()] = providerRecord{ + Created: time.Now(), + Peer: p, } - rs.providers[k][p.Key()] = p return nil } func (rs *s) Providers(k u.Key) []peer.Peer { - rs.delay.Wait() // before locking + rs.delayConf.Query.Wait() // before locking rs.lock.RLock() defer rs.lock.RUnlock() var ret []peer.Peer - peerset, ok := rs.providers[k] + records, ok := rs.providers[k] if !ok { return ret } - for _, peer := range peerset { - ret = append(ret, peer) + for _, r := range records { + if time.Now().Sub(r.Created) > rs.delayConf.ValueVisibility.Get() { + ret = append(ret, r.Peer) + } } for i := range ret { diff --git a/util/delay/delay.go b/util/delay/delay.go index e7fb28091..8116b300e 100644 --- a/util/delay/delay.go +++ b/util/delay/delay.go @@ -10,6 +10,7 @@ import ( type D interface { Set(time.Duration) time.Duration Wait() + Get() time.Duration } // Fixed returns a delay with fixed latency @@ -37,3 +38,9 @@ func (d *delay) Wait() { defer d.l.RUnlock() time.Sleep(d.t) } + +func (d *delay) Get() time.Duration { + d.l.Lock() + defer d.l.Unlock() + return d.t +}