From b63cbf5331495e9ab9683d98b7efde34bc7d955a Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 24 Jan 2015 05:36:03 -0800 Subject: [PATCH 1/4] updated peerstream --- Godeps/Godeps.json | 2 +- .../jbenet/go-peerstream/.gitignore | 1 + .../jbenet/go-peerstream/.travis.yml | 3 +- .../github.com/jbenet/go-peerstream/conn.go | 19 ++++++++- .../github.com/jbenet/go-peerstream/swarm.go | 39 +++++++++++++++++++ .../go-peerstream/transport/test/ttest.go | 6 +++ 6 files changed, 65 insertions(+), 5 deletions(-) create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-peerstream/.gitignore diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 4cb9a36a9..1e8a98ebc 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -165,7 +165,7 @@ }, { "ImportPath": "github.com/jbenet/go-peerstream", - "Rev": "530b09b2300da11cc19f479289be5d014c146581" + "Rev": "bbe2a6461aa80ee25fd87eccf35bd54bac7f788d" }, { "ImportPath": "github.com/jbenet/go-random", diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.gitignore b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.gitignore new file mode 100644 index 000000000..00a069958 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.gitignore @@ -0,0 +1 @@ +example/closer diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.travis.yml b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.travis.yml index 66d247e53..3c1c47ab9 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.travis.yml +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.travis.yml @@ -1,11 +1,10 @@ language: go go: - - 1.2 - 1.3 - 1.4 - release - tip script: - - go test -race -cpu=5 -v ./... + - go test -race -cpu=5 ./... diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go index 378218e84..c26136e29 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go @@ -122,7 +122,11 @@ func (c *Conn) Close() error { // close underlying connection c.swarm.removeConn(c) - return c.pstConn.Close() + err := c.pstConn.Close() + c.swarm.notifyAll(func(n Notifiee) { + n.Disconnected(c) + }) + return err } // ConnsWithGroup narrows down a set of connections to those in a given group. @@ -198,6 +202,9 @@ func (s *Swarm) addConn(netConn net.Conn, isServer bool) (*Conn, error) { s.StreamHandler()(stream) // call our handler }) + s.notifyAll(func(n Notifiee) { + n.Connected(c) + }) return c, nil } @@ -228,6 +235,10 @@ func (s *Swarm) setupStream(pstStream pst.Stream, c *Conn) *Stream { c.streams[stream] = struct{}{} s.streamLock.Unlock() c.streamLock.Unlock() + + s.notifyAll(func(n Notifiee) { + n.OpenedStream(stream) + }) return stream } @@ -241,7 +252,11 @@ func (s *Swarm) removeStream(stream *Stream) error { s.streamLock.Unlock() stream.conn.streamLock.Unlock() - return stream.pstStream.Close() + err := stream.pstStream.Close() + s.notifyAll(func(n Notifiee) { + n.ClosedStream(stream) + }) + return err } func (s *Swarm) removeConn(conn *Conn) { diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go index e25b44d82..923fc4324 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go @@ -39,6 +39,10 @@ type Swarm struct { streamHandler StreamHandler // receives Streams initiated remotely selectConn SelectConn // default SelectConn function + // notification listeners + notifiees map[Notifiee]struct{} + notifieeLock sync.RWMutex + closed chan struct{} } @@ -48,6 +52,7 @@ func NewSwarm(t pst.Transport) *Swarm { streams: make(map[*Stream]struct{}), conns: make(map[*Conn]struct{}), listeners: make(map[*Listener]struct{}), + notifiees: make(map[Notifiee]struct{}), selectConn: SelectRandomConn, streamHandler: NoOpStreamHandler, connHandler: NoOpConnHandler, @@ -361,3 +366,37 @@ func (s *Swarm) connGarbageCollect() { } } } + +// Notify signs up Notifiee to receive signals when events happen +func (s *Swarm) Notify(n Notifiee) { + s.notifieeLock.Lock() + s.notifiees[n] = struct{}{} + s.notifieeLock.Unlock() +} + +// StopNotify unregisters Notifiee fromr receiving signals +func (s *Swarm) StopNotify(n Notifiee) { + s.notifieeLock.Lock() + delete(s.notifiees, n) + s.notifieeLock.Unlock() +} + +// notifyAll runs the notification function on all Notifiees +func (s *Swarm) notifyAll(notification func(n Notifiee)) { + s.notifieeLock.RLock() + for n := range s.notifiees { + // make sure we dont block + // and they dont block each other. + go notification(n) + } + s.notifieeLock.RUnlock() +} + +// Notifiee is an interface for an object wishing to receive +// notifications from a Swarm +type Notifiee interface { + Connected(*Conn) // called when a connection opened + Disconnected(*Conn) // called when a connection closed + OpenedStream(*Stream) // called when a stream opened + ClosedStream(*Stream) // called when a stream closed +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test/ttest.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test/ttest.go index d9aaff969..cd0047c83 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test/ttest.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test/ttest.go @@ -350,6 +350,8 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm, for _, a := range swarms { for _, b := range swarms { wg.Add(1) + a := a // race + b := b // race go rateLimit(func() { defer wg.Done() openConnsAndRW(a, b) @@ -370,6 +372,10 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm, t.Error(err) } + for _, s := range swarms { + s.Close() + } + } func SubtestStress1Swarm1Conn1Stream1Msg(t *testing.T, tr pst.Transport) { From 4ae01e7a5ec3eddf183eb01f01553194b665c298 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 24 Jan 2015 07:14:56 -0800 Subject: [PATCH 2/4] p2p/net/swarm: notifications --- p2p/net/interface.go | 17 +++ p2p/net/mock/mock_conn.go | 9 ++ p2p/net/mock/mock_notif_test.go | 198 ++++++++++++++++++++++++++++++ p2p/net/mock/mock_peernet.go | 40 +++++- p2p/net/mock/mock_stream.go | 5 +- p2p/net/swarm/swarm.go | 63 +++++++++- p2p/net/swarm/swarm_net.go | 10 ++ p2p/net/swarm/swarm_notif_test.go | 186 ++++++++++++++++++++++++++++ 8 files changed, 520 insertions(+), 8 deletions(-) create mode 100644 p2p/net/mock/mock_notif_test.go create mode 100644 p2p/net/swarm/swarm_notif_test.go diff --git a/p2p/net/interface.go b/p2p/net/interface.go index 74add2bb6..577c496cb 100644 --- a/p2p/net/interface.go +++ b/p2p/net/interface.go @@ -111,6 +111,10 @@ type Dialer interface { // ConnsToPeer returns the connections in this Netowrk for given peer. ConnsToPeer(p peer.ID) []Conn + + // Notify/StopNotify register and unregister a notifiee for signals + Notify(Notifiee) + StopNotify(Notifiee) } // Connectedness signals the capacity for a connection with a given node. @@ -131,3 +135,16 @@ const ( // (should signal "made effort, failed") CannotConnect ) + +// Notifiee is an interface for an object wishing to receive +// notifications from a Network. +type Notifiee interface { + Connected(Network, Conn) // called when a connection opened + Disconnected(Network, Conn) // called when a connection closed + OpenedStream(Network, Stream) // called when a stream opened + ClosedStream(Network, Stream) // called when a stream closed + + // TODO + // PeerConnected(Network, peer.ID) // called when a peer connected + // PeerDisconnected(Network, peer.ID) // called when a peer disconnected +} diff --git a/p2p/net/mock/mock_conn.go b/p2p/net/mock/mock_conn.go index 7e58eaae5..23a4ead62 100644 --- a/p2p/net/mock/mock_conn.go +++ b/p2p/net/mock/mock_conn.go @@ -37,6 +37,9 @@ func (c *conn) Close() error { s.Close() } c.net.removeConn(c) + c.net.notifyAll(func(n inet.Notifiee) { + n.Disconnected(c.net, c) + }) return nil } @@ -73,11 +76,17 @@ func (c *conn) allStreams() []inet.Stream { func (c *conn) remoteOpenedStream(s *stream) { c.addStream(s) c.net.handleNewStream(s) + c.net.notifyAll(func(n inet.Notifiee) { + n.OpenedStream(c.net, s) + }) } func (c *conn) openStream() *stream { sl, sr := c.link.newStreamPair() c.addStream(sl) + c.net.notifyAll(func(n inet.Notifiee) { + n.OpenedStream(c.net, sl) + }) c.rconn.remoteOpenedStream(sr) return sl } diff --git a/p2p/net/mock/mock_notif_test.go b/p2p/net/mock/mock_notif_test.go new file mode 100644 index 000000000..1b80ecb96 --- /dev/null +++ b/p2p/net/mock/mock_notif_test.go @@ -0,0 +1,198 @@ +package mocknet + +import ( + "testing" + "time" + + inet "github.com/jbenet/go-ipfs/p2p/net" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" +) + +func TestNotifications(t *testing.T) { + t.Parallel() + + mn, err := FullMeshLinked(context.Background(), 5) + if err != nil { + t.Fatal(err) + } + + timeout := 5 * time.Second + + // signup notifs + nets := mn.Nets() + notifiees := make([]*netNotifiee, len(nets)) + for i, pn := range nets { + n := newNetNotifiee() + pn.Notify(n) + notifiees[i] = n + } + + // connect all + for _, n1 := range nets { + for _, n2 := range nets { + if n1 == n2 { + continue + } + if _, err := mn.ConnectNets(n1, n2); err != nil { + t.Fatal(err) + } + } + } + + // test everyone got the correct connection opened calls + for i, s := range nets { + n := notifiees[i] + for _, s2 := range nets { + cos := s.ConnsToPeer(s2.LocalPeer()) + func() { + for i := 0; i < len(cos); i++ { + var c inet.Conn + select { + case c = <-n.connected: + case <-time.After(timeout): + t.Fatal("timeout") + } + for _, c2 := range cos { + if c == c2 { + t.Log("got notif for conn") + return + } + } + t.Error("connection not found") + } + }() + } + } + + complement := func(c inet.Conn) (inet.Network, *netNotifiee, *conn) { + for i, s := range nets { + for _, c2 := range s.Conns() { + if c2.(*conn).rconn == c { + return s, notifiees[i], c2.(*conn) + } + } + } + t.Fatal("complementary conn not found", c) + return nil, nil, nil + } + + testOCStream := func(n *netNotifiee, s inet.Stream) { + var s2 inet.Stream + select { + case s2 = <-n.openedStream: + t.Log("got notif for opened stream") + case <-time.After(timeout): + t.Fatal("timeout") + } + if s != nil && s != s2 { + t.Fatalf("got incorrect stream %p %p", s, s2) + } + + select { + case s2 = <-n.closedStream: + t.Log("got notif for closed stream") + case <-time.After(timeout): + t.Fatal("timeout") + } + if s != nil && s != s2 { + t.Fatalf("got incorrect stream %p %p", s, s2) + } + } + + streams := make(chan inet.Stream) + for _, s := range nets { + s.SetStreamHandler(func(s inet.Stream) { + streams <- s + s.Close() + }) + } + + // there's one stream per conn that we need to drain.... + // unsure where these are coming from + for i, _ := range nets { + n := notifiees[i] + testOCStream(n, nil) + testOCStream(n, nil) + testOCStream(n, nil) + testOCStream(n, nil) + } + + // open a streams in each conn + for i, s := range nets { + conns := s.Conns() + for _, c := range conns { + _, n2, c2 := complement(c) + st1, err := c.NewStream() + if err != nil { + t.Error(err) + } else { + t.Logf("%s %s <--%p--> %s %s", c.LocalPeer(), c.LocalMultiaddr(), st1, c.RemotePeer(), c.RemoteMultiaddr()) + // st1.Write([]byte("hello")) + st1.Close() + st2 := <-streams + t.Logf("%s %s <--%p--> %s %s", c2.LocalPeer(), c2.LocalMultiaddr(), st2, c2.RemotePeer(), c2.RemoteMultiaddr()) + testOCStream(notifiees[i], st1) + testOCStream(n2, st2) + } + } + } + + // close conns + for i, s := range nets { + n := notifiees[i] + for _, c := range s.Conns() { + _, n2, c2 := complement(c) + c.(*conn).Close() + c2.Close() + + var c3, c4 inet.Conn + select { + case c3 = <-n.disconnected: + case <-time.After(timeout): + t.Fatal("timeout") + } + if c != c3 { + t.Fatal("got incorrect conn", c, c3) + } + + select { + case c4 = <-n2.disconnected: + case <-time.After(timeout): + t.Fatal("timeout") + } + if c2 != c4 { + t.Fatal("got incorrect conn", c, c2) + } + } + } +} + +type netNotifiee struct { + connected chan inet.Conn + disconnected chan inet.Conn + openedStream chan inet.Stream + closedStream chan inet.Stream +} + +func newNetNotifiee() *netNotifiee { + return &netNotifiee{ + connected: make(chan inet.Conn), + disconnected: make(chan inet.Conn), + openedStream: make(chan inet.Stream), + closedStream: make(chan inet.Stream), + } +} + +func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { + nn.connected <- v +} +func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) { + nn.disconnected <- v +} +func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) { + nn.openedStream <- v +} +func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) { + nn.closedStream <- v +} diff --git a/p2p/net/mock/mock_peernet.go b/p2p/net/mock/mock_peernet.go index 633a32762..b6bbbaa61 100644 --- a/p2p/net/mock/mock_peernet.go +++ b/p2p/net/mock/mock_peernet.go @@ -31,6 +31,9 @@ type peernet struct { streamHandler inet.StreamHandler connHandler inet.ConnHandler + notifmu sync.RWMutex + notifs map[inet.Notifiee]struct{} + cg ctxgroup.ContextGroup sync.RWMutex } @@ -58,6 +61,8 @@ func newPeernet(ctx context.Context, m *mocknet, k ic.PrivKey, connsByPeer: map[peer.ID]map[*conn]struct{}{}, connsByLink: map[*link]map[*conn]struct{}{}, + + notifs: make(map[inet.Notifiee]struct{}), } n.cg.SetTeardown(n.teardown) @@ -163,6 +168,9 @@ func (pn *peernet) openConn(r peer.ID, l *link) *conn { lc, rc := l.newConnPair(pn) log.Debugf("%s opening connection to %s", pn.LocalPeer(), lc.RemotePeer()) pn.addConn(lc) + pn.notifyAll(func(n inet.Notifiee) { + n.Connected(pn, lc) + }) rc.net.remoteOpenedConn(rc) return lc } @@ -171,6 +179,9 @@ func (pn *peernet) remoteOpenedConn(c *conn) { log.Debugf("%s accepting connection from %s", pn.LocalPeer(), c.RemotePeer()) pn.addConn(c) pn.handleNewConn(c) + pn.notifyAll(func(n inet.Notifiee) { + n.Connected(pn, c) + }) } // addConn constructs and adds a connection @@ -201,13 +212,13 @@ func (pn *peernet) removeConn(c *conn) { cs, found := pn.connsByLink[c.link] if !found || len(cs) < 1 { - panic("attempting to remove a conn that doesnt exist") + panic(fmt.Sprintf("attempting to remove a conn that doesnt exist %p", c.link)) } delete(cs, c) cs, found = pn.connsByPeer[c.remote] if !found { - panic("attempting to remove a conn that doesnt exist") + panic(fmt.Sprintf("attempting to remove a conn that doesnt exist %p", c.remote)) } delete(cs, c) } @@ -360,3 +371,28 @@ func (pn *peernet) SetConnHandler(h inet.ConnHandler) { pn.connHandler = h pn.Unlock() } + +// Notify signs up Notifiee to receive signals when events happen +func (pn *peernet) Notify(f inet.Notifiee) { + pn.notifmu.Lock() + pn.notifs[f] = struct{}{} + pn.notifmu.Unlock() +} + +// StopNotify unregisters Notifiee fromr receiving signals +func (pn *peernet) StopNotify(f inet.Notifiee) { + pn.notifmu.Lock() + delete(pn.notifs, f) + pn.notifmu.Unlock() +} + +// notifyAll runs the notification function on all Notifiees +func (pn *peernet) notifyAll(notification func(f inet.Notifiee)) { + pn.notifmu.RLock() + for n := range pn.notifs { + // make sure we dont block + // and they dont block each other. + go notification(n) + } + pn.notifmu.RUnlock() +} diff --git a/p2p/net/mock/mock_stream.go b/p2p/net/mock/mock_stream.go index 71a0ba66d..e116bb117 100644 --- a/p2p/net/mock/mock_stream.go +++ b/p2p/net/mock/mock_stream.go @@ -19,8 +19,11 @@ func (s *stream) Close() error { r.Close() } if w, ok := (s.Writer).(io.Closer); ok { - return w.Close() + w.Close() } + s.conn.net.notifyAll(func(n inet.Notifiee) { + n.ClosedStream(s.conn.net, s) + }) return nil } diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index f167dec4c..56e88445c 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -4,6 +4,7 @@ package swarm import ( "fmt" + "sync" "time" inet "github.com/jbenet/go-ipfs/p2p/net" @@ -38,6 +39,9 @@ type Swarm struct { backf dialbackoff dialT time.Duration // mainly for tests + notifmu sync.RWMutex + notifs map[inet.Notifiee]ps.Notifiee + cg ctxgroup.ContextGroup } @@ -54,11 +58,12 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, } s := &Swarm{ - swarm: ps.NewSwarm(PSTransport), - local: local, - peers: peers, - cg: ctxgroup.WithContext(ctx), - dialT: DialTimeout, + swarm: ps.NewSwarm(PSTransport), + local: local, + peers: peers, + cg: ctxgroup.WithContext(ctx), + dialT: DialTimeout, + notifs: make(map[inet.Notifiee]ps.Notifiee), } // configure Swarm @@ -177,3 +182,51 @@ func (s *Swarm) Peers() []peer.ID { func (s *Swarm) LocalPeer() peer.ID { return s.local } + +// Notify signs up Notifiee to receive signals when events happen +func (s *Swarm) Notify(f inet.Notifiee) { + // wrap with our notifiee, to translate function calls + n := &ps2netNotifee{net: (*Network)(s), not: f} + + s.notifmu.Lock() + s.notifs[f] = n + s.notifmu.Unlock() + + // register for notifications in the peer swarm. + s.swarm.Notify(n) +} + +// StopNotify unregisters Notifiee fromr receiving signals +func (s *Swarm) StopNotify(f inet.Notifiee) { + s.notifmu.Lock() + n, found := s.notifs[f] + if found { + delete(s.notifs, f) + } + s.notifmu.Unlock() + + if found { + s.swarm.StopNotify(n) + } +} + +type ps2netNotifee struct { + net *Network + not inet.Notifiee +} + +func (n *ps2netNotifee) Connected(c *ps.Conn) { + n.not.Connected(n.net, inet.Conn((*Conn)(c))) +} + +func (n *ps2netNotifee) Disconnected(c *ps.Conn) { + n.not.Disconnected(n.net, inet.Conn((*Conn)(c))) +} + +func (n *ps2netNotifee) OpenedStream(s *ps.Stream) { + n.not.OpenedStream(n.net, inet.Stream((*Stream)(s))) +} + +func (n *ps2netNotifee) ClosedStream(s *ps.Stream) { + n.not.ClosedStream(n.net, inet.Stream((*Stream)(s))) +} diff --git a/p2p/net/swarm/swarm_net.go b/p2p/net/swarm/swarm_net.go index 5df744747..561ceef82 100644 --- a/p2p/net/swarm/swarm_net.go +++ b/p2p/net/swarm/swarm_net.go @@ -154,3 +154,13 @@ func (n *Network) SetConnHandler(h inet.ConnHandler) { func (n *Network) String() string { return fmt.Sprintf("", n.LocalPeer()) } + +// Notify signs up Notifiee to receive signals when events happen +func (n *Network) Notify(f inet.Notifiee) { + n.Swarm().Notify(f) +} + +// StopNotify unregisters Notifiee fromr receiving signals +func (n *Network) StopNotify(f inet.Notifiee) { + n.Swarm().StopNotify(f) +} diff --git a/p2p/net/swarm/swarm_notif_test.go b/p2p/net/swarm/swarm_notif_test.go new file mode 100644 index 000000000..3469a1b32 --- /dev/null +++ b/p2p/net/swarm/swarm_notif_test.go @@ -0,0 +1,186 @@ +package swarm + +import ( + "testing" + "time" + + inet "github.com/jbenet/go-ipfs/p2p/net" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" +) + +func TestNotifications(t *testing.T) { + t.Parallel() + + ctx := context.Background() + swarms := makeSwarms(ctx, t, 5) + defer func() { + for _, s := range swarms { + s.Close() + } + }() + + timeout := 5 * time.Second + + // signup notifs + notifiees := make([]*netNotifiee, len(swarms)) + for i, swarm := range swarms { + n := newNetNotifiee() + swarm.Notify(n) + notifiees[i] = n + } + + connectSwarms(t, ctx, swarms) + + <-time.After(time.Millisecond) + // should've gotten 5 by now. + + // test everyone got the correct connection opened calls + for i, s := range swarms { + n := notifiees[i] + for _, s2 := range swarms { + if s == s2 { + continue + } + + cos := s.ConnectionsToPeer(s2.LocalPeer()) + func() { + for i := 0; i < len(cos); i++ { + var c inet.Conn + select { + case c = <-n.connected: + case <-time.After(timeout): + t.Fatal("timeout") + } + for _, c2 := range cos { + if c == c2 { + t.Log("got notif for conn", c) + return + } + } + t.Error("connection not found", c) + } + }() + } + } + + complement := func(c inet.Conn) (*Swarm, *netNotifiee, *Conn) { + for i, s := range swarms { + for _, c2 := range s.Connections() { + if c.LocalMultiaddr().Equal(c2.RemoteMultiaddr()) && + c2.LocalMultiaddr().Equal(c.RemoteMultiaddr()) { + return s, notifiees[i], c2 + } + } + } + t.Fatal("complementary conn not found", c) + return nil, nil, nil + } + + testOCStream := func(n *netNotifiee, s inet.Stream) { + var s2 inet.Stream + select { + case s2 = <-n.openedStream: + t.Log("got notif for opened stream") + case <-time.After(timeout): + t.Fatal("timeout") + } + if s != s2 { + t.Fatal("got incorrect stream", s.Conn(), s2.Conn()) + } + + select { + case s2 = <-n.closedStream: + t.Log("got notif for closed stream") + case <-time.After(timeout): + t.Fatal("timeout") + } + if s != s2 { + t.Fatal("got incorrect stream", s.Conn(), s2.Conn()) + } + } + + streams := make(chan inet.Stream) + for _, s := range swarms { + s.SetStreamHandler(func(s inet.Stream) { + streams <- s + s.Close() + }) + } + + // open a streams in each conn + for i, s := range swarms { + for _, c := range s.Connections() { + _, n2, _ := complement(c) + + st1, err := c.NewStream() + if err != nil { + t.Error(err) + } else { + st1.Write([]byte("hello")) + st1.Close() + testOCStream(notifiees[i], st1) + st2 := <-streams + testOCStream(n2, st2) + } + } + } + + // close conns + for i, s := range swarms { + n := notifiees[i] + for _, c := range s.Connections() { + _, n2, c2 := complement(c) + c.Close() + c2.Close() + + var c3, c4 inet.Conn + select { + case c3 = <-n.disconnected: + case <-time.After(timeout): + t.Fatal("timeout") + } + if c != c3 { + t.Fatal("got incorrect conn", c, c3) + } + + select { + case c4 = <-n2.disconnected: + case <-time.After(timeout): + t.Fatal("timeout") + } + if c2 != c4 { + t.Fatal("got incorrect conn", c, c2) + } + } + } +} + +type netNotifiee struct { + connected chan inet.Conn + disconnected chan inet.Conn + openedStream chan inet.Stream + closedStream chan inet.Stream +} + +func newNetNotifiee() *netNotifiee { + return &netNotifiee{ + connected: make(chan inet.Conn), + disconnected: make(chan inet.Conn), + openedStream: make(chan inet.Stream), + closedStream: make(chan inet.Stream), + } +} + +func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { + nn.connected <- v +} +func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) { + nn.disconnected <- v +} +func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) { + nn.openedStream <- v +} +func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) { + nn.closedStream <- v +} From 00d7b498efc2e3f4929354f3379c158b289b305f Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 24 Jan 2015 08:52:01 -0800 Subject: [PATCH 3/4] routing/dht: adjust routing table on peer conn/disc --- routing/dht/dht.go | 10 +++++++++- routing/dht/notif.go | 33 +++++++++++++++++++++++++++++++++ routing/kbucket/bucket.go | 10 ++++++++++ routing/kbucket/table.go | 17 +++++++++++++++++ 4 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 routing/dht/notif.go diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 5f7512393..e66517072 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -65,9 +65,17 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip dht.datastore = dstore dht.self = h.ID() dht.peerstore = h.Peerstore() - dht.ContextGroup = ctxgroup.WithContext(ctx) dht.host = h + // register for network notifs. + dht.host.Network().Notify((*netNotifiee)(dht)) + + dht.ContextGroup = ctxgroup.WithContextAndTeardown(ctx, func() error { + // remove ourselves from network notifs. + dht.host.Network().StopNotify((*netNotifiee)(dht)) + return nil + }) + // sanity check. this should **never** happen if len(dht.peerstore.Addresses(dht.self)) < 1 { panic("attempt to initialize dht without addresses for self") diff --git a/routing/dht/notif.go b/routing/dht/notif.go new file mode 100644 index 000000000..318db12ea --- /dev/null +++ b/routing/dht/notif.go @@ -0,0 +1,33 @@ +package dht + +import ( + inet "github.com/jbenet/go-ipfs/p2p/net" +) + +// netNotifiee defines methods to be used with the IpfsDHT +type netNotifiee IpfsDHT + +func (nn *netNotifiee) DHT() *IpfsDHT { + return (*IpfsDHT)(nn) +} + +func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { + dht := nn.DHT() + select { + case <-dht.Closing(): + return + } + dht.Update(dht.Context(), v.RemotePeer()) +} + +func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) { + dht := nn.DHT() + select { + case <-dht.Closing(): + return + } + dht.routingTable.Remove(v.RemotePeer()) +} + +func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {} +func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) {} diff --git a/routing/kbucket/bucket.go b/routing/kbucket/bucket.go index e158f70f9..7d4f87c2e 100644 --- a/routing/kbucket/bucket.go +++ b/routing/kbucket/bucket.go @@ -30,6 +30,16 @@ func (b *Bucket) find(id peer.ID) *list.Element { return nil } +func (b *Bucket) remove(id peer.ID) { + b.lk.RLock() + defer b.lk.RUnlock() + for e := b.list.Front(); e != nil; e = e.Next() { + if e.Value.(peer.ID) == id { + b.list.Remove(e) + } + } +} + func (b *Bucket) moveToFront(e *list.Element) { b.lk.Lock() b.list.MoveToFront(e) diff --git a/routing/kbucket/table.go b/routing/kbucket/table.go index 62bfa0646..59b81282d 100644 --- a/routing/kbucket/table.go +++ b/routing/kbucket/table.go @@ -87,6 +87,23 @@ func (rt *RoutingTable) Update(p peer.ID) peer.ID { return "" } +// Remove deletes a peer from the routing table. This is to be used +// when we are sure a node has disconnected completely. +func (rt *RoutingTable) Remove(p peer.ID) { + rt.tabLock.Lock() + defer rt.tabLock.Unlock() + peerID := ConvertPeerID(p) + cpl := commonPrefixLen(peerID, rt.local) + + bucketID := cpl + if bucketID >= len(rt.Buckets) { + bucketID = len(rt.Buckets) - 1 + } + + bucket := rt.Buckets[bucketID] + bucket.remove(p) +} + func (rt *RoutingTable) nextBucket() peer.ID { bucket := rt.Buckets[len(rt.Buckets)-1] newBucket := bucket.Split(len(rt.Buckets)-1, rt.local) From 105448769050cc046a97f55ece6cd39a7ae05bbe Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 24 Jan 2015 09:12:27 -0800 Subject: [PATCH 4/4] bitswap: respond to peers connecting + disconnecting With these notifications, bitswap can reclaim all resources for any outstanding work for a peer. cc @briantigerchow @whyrusleeping --- exchange/bitswap/bitswap.go | 18 ++++++++++++++++++ exchange/bitswap/network/interface.go | 4 ++++ exchange/bitswap/network/ipfs_impl.go | 20 ++++++++++++++++++++ exchange/bitswap/testnet/network_test.go | 7 +++++++ 4 files changed, 49 insertions(+) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index f703bf7e1..262b2fd5f 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -339,6 +339,24 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg return "", nil } +// Connected/Disconnected warns bitswap about peer connections +func (bs *bitswap) PeerConnected(p peer.ID) { + // TODO: add to clientWorker?? + + peers := make(chan peer.ID) + err := bs.sendWantlistToPeers(context.TODO(), peers) + if err != nil { + log.Errorf("error sending wantlist: %s", err) + } + peers <- p + close(peers) +} + +// Connected/Disconnected warns bitswap about peer connections +func (bs *bitswap) PeerDisconnected(peer.ID) { + // TODO: release resources. +} + func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) { if len(bkeys) < 1 { return diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go index 18bb1df83..857201152 100644 --- a/exchange/bitswap/network/interface.go +++ b/exchange/bitswap/network/interface.go @@ -40,6 +40,10 @@ type Receiver interface { destination peer.ID, outgoing bsmsg.BitSwapMessage) ReceiveError(error) + + // Connected/Disconnected warns bitswap about peer connections + PeerConnected(peer.ID) + PeerDisconnected(peer.ID) } type Routing interface { diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index 652a1f9c6..f54e181d1 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -21,6 +21,9 @@ func NewFromIpfsHost(host host.Host, r routing.IpfsRouting) BitSwapNetwork { routing: r, } host.SetStreamHandler(ProtocolBitswap, bitswapNetwork.handleNewStream) + host.Network().Notify((*netNotifiee)(&bitswapNetwork)) + // TODO: StopNotify. + return &bitswapNetwork } @@ -139,3 +142,20 @@ func (bsnet *impl) handleNewStream(s inet.Stream) { log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer()) bsnet.receiver.ReceiveMessage(ctx, p, received) } + +type netNotifiee impl + +func (nn *netNotifiee) impl() *impl { + return (*impl)(nn) +} + +func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { + nn.impl().receiver.PeerConnected(v.RemotePeer()) +} + +func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) { + nn.impl().receiver.PeerDisconnected(v.RemotePeer()) +} + +func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {} +func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) {} diff --git a/exchange/bitswap/testnet/network_test.go b/exchange/bitswap/testnet/network_test.go index e80fccba5..268f93607 100644 --- a/exchange/bitswap/testnet/network_test.go +++ b/exchange/bitswap/testnet/network_test.go @@ -146,3 +146,10 @@ func (lam *lambdaImpl) ReceiveMessage(ctx context.Context, func (lam *lambdaImpl) ReceiveError(err error) { // TODO log error } + +func (lam *lambdaImpl) PeerConnected(p peer.ID) { + // TODO +} +func (lam *lambdaImpl) PeerDisconnected(peer.ID) { + // TODO +}