Merge pull request #645 from jbenet/net-notif

Network Notifications
This commit is contained in:
Juan Batiz-Benet 2015-01-24 10:25:53 -08:00
commit eb79b1b637
22 changed files with 703 additions and 14 deletions

2
Godeps/Godeps.json generated
View File

@ -165,7 +165,7 @@
},
{
"ImportPath": "github.com/jbenet/go-peerstream",
"Rev": "530b09b2300da11cc19f479289be5d014c146581"
"Rev": "bbe2a6461aa80ee25fd87eccf35bd54bac7f788d"
},
{
"ImportPath": "github.com/jbenet/go-random",

View File

@ -0,0 +1 @@
example/closer

View File

@ -1,11 +1,10 @@
language: go
go:
- 1.2
- 1.3
- 1.4
- release
- tip
script:
- go test -race -cpu=5 -v ./...
- go test -race -cpu=5 ./...

View File

@ -122,7 +122,11 @@ func (c *Conn) Close() error {
// close underlying connection
c.swarm.removeConn(c)
return c.pstConn.Close()
err := c.pstConn.Close()
c.swarm.notifyAll(func(n Notifiee) {
n.Disconnected(c)
})
return err
}
// ConnsWithGroup narrows down a set of connections to those in a given group.
@ -198,6 +202,9 @@ func (s *Swarm) addConn(netConn net.Conn, isServer bool) (*Conn, error) {
s.StreamHandler()(stream) // call our handler
})
s.notifyAll(func(n Notifiee) {
n.Connected(c)
})
return c, nil
}
@ -228,6 +235,10 @@ func (s *Swarm) setupStream(pstStream pst.Stream, c *Conn) *Stream {
c.streams[stream] = struct{}{}
s.streamLock.Unlock()
c.streamLock.Unlock()
s.notifyAll(func(n Notifiee) {
n.OpenedStream(stream)
})
return stream
}
@ -241,7 +252,11 @@ func (s *Swarm) removeStream(stream *Stream) error {
s.streamLock.Unlock()
stream.conn.streamLock.Unlock()
return stream.pstStream.Close()
err := stream.pstStream.Close()
s.notifyAll(func(n Notifiee) {
n.ClosedStream(stream)
})
return err
}
func (s *Swarm) removeConn(conn *Conn) {

View File

@ -39,6 +39,10 @@ type Swarm struct {
streamHandler StreamHandler // receives Streams initiated remotely
selectConn SelectConn // default SelectConn function
// notification listeners
notifiees map[Notifiee]struct{}
notifieeLock sync.RWMutex
closed chan struct{}
}
@ -48,6 +52,7 @@ func NewSwarm(t pst.Transport) *Swarm {
streams: make(map[*Stream]struct{}),
conns: make(map[*Conn]struct{}),
listeners: make(map[*Listener]struct{}),
notifiees: make(map[Notifiee]struct{}),
selectConn: SelectRandomConn,
streamHandler: NoOpStreamHandler,
connHandler: NoOpConnHandler,
@ -361,3 +366,37 @@ func (s *Swarm) connGarbageCollect() {
}
}
}
// Notify signs up Notifiee to receive signals when events happen
func (s *Swarm) Notify(n Notifiee) {
s.notifieeLock.Lock()
s.notifiees[n] = struct{}{}
s.notifieeLock.Unlock()
}
// StopNotify unregisters Notifiee fromr receiving signals
func (s *Swarm) StopNotify(n Notifiee) {
s.notifieeLock.Lock()
delete(s.notifiees, n)
s.notifieeLock.Unlock()
}
// notifyAll runs the notification function on all Notifiees
func (s *Swarm) notifyAll(notification func(n Notifiee)) {
s.notifieeLock.RLock()
for n := range s.notifiees {
// make sure we dont block
// and they dont block each other.
go notification(n)
}
s.notifieeLock.RUnlock()
}
// Notifiee is an interface for an object wishing to receive
// notifications from a Swarm
type Notifiee interface {
Connected(*Conn) // called when a connection opened
Disconnected(*Conn) // called when a connection closed
OpenedStream(*Stream) // called when a stream opened
ClosedStream(*Stream) // called when a stream closed
}

View File

@ -350,6 +350,8 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm,
for _, a := range swarms {
for _, b := range swarms {
wg.Add(1)
a := a // race
b := b // race
go rateLimit(func() {
defer wg.Done()
openConnsAndRW(a, b)
@ -370,6 +372,10 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm,
t.Error(err)
}
for _, s := range swarms {
s.Close()
}
}
func SubtestStress1Swarm1Conn1Stream1Msg(t *testing.T, tr pst.Transport) {

View File

@ -339,6 +339,24 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
return "", nil
}
// Connected/Disconnected warns bitswap about peer connections
func (bs *bitswap) PeerConnected(p peer.ID) {
// TODO: add to clientWorker??
peers := make(chan peer.ID)
err := bs.sendWantlistToPeers(context.TODO(), peers)
if err != nil {
log.Errorf("error sending wantlist: %s", err)
}
peers <- p
close(peers)
}
// Connected/Disconnected warns bitswap about peer connections
func (bs *bitswap) PeerDisconnected(peer.ID) {
// TODO: release resources.
}
func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
if len(bkeys) < 1 {
return

View File

@ -40,6 +40,10 @@ type Receiver interface {
destination peer.ID, outgoing bsmsg.BitSwapMessage)
ReceiveError(error)
// Connected/Disconnected warns bitswap about peer connections
PeerConnected(peer.ID)
PeerDisconnected(peer.ID)
}
type Routing interface {

View File

@ -21,6 +21,9 @@ func NewFromIpfsHost(host host.Host, r routing.IpfsRouting) BitSwapNetwork {
routing: r,
}
host.SetStreamHandler(ProtocolBitswap, bitswapNetwork.handleNewStream)
host.Network().Notify((*netNotifiee)(&bitswapNetwork))
// TODO: StopNotify.
return &bitswapNetwork
}
@ -139,3 +142,20 @@ func (bsnet *impl) handleNewStream(s inet.Stream) {
log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer())
bsnet.receiver.ReceiveMessage(ctx, p, received)
}
type netNotifiee impl
func (nn *netNotifiee) impl() *impl {
return (*impl)(nn)
}
func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
nn.impl().receiver.PeerConnected(v.RemotePeer())
}
func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
nn.impl().receiver.PeerDisconnected(v.RemotePeer())
}
func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {}
func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) {}

View File

@ -146,3 +146,10 @@ func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
func (lam *lambdaImpl) ReceiveError(err error) {
// TODO log error
}
func (lam *lambdaImpl) PeerConnected(p peer.ID) {
// TODO
}
func (lam *lambdaImpl) PeerDisconnected(peer.ID) {
// TODO
}

View File

@ -111,6 +111,10 @@ type Dialer interface {
// ConnsToPeer returns the connections in this Netowrk for given peer.
ConnsToPeer(p peer.ID) []Conn
// Notify/StopNotify register and unregister a notifiee for signals
Notify(Notifiee)
StopNotify(Notifiee)
}
// Connectedness signals the capacity for a connection with a given node.
@ -131,3 +135,16 @@ const (
// (should signal "made effort, failed")
CannotConnect
)
// Notifiee is an interface for an object wishing to receive
// notifications from a Network.
type Notifiee interface {
Connected(Network, Conn) // called when a connection opened
Disconnected(Network, Conn) // called when a connection closed
OpenedStream(Network, Stream) // called when a stream opened
ClosedStream(Network, Stream) // called when a stream closed
// TODO
// PeerConnected(Network, peer.ID) // called when a peer connected
// PeerDisconnected(Network, peer.ID) // called when a peer disconnected
}

View File

@ -37,6 +37,9 @@ func (c *conn) Close() error {
s.Close()
}
c.net.removeConn(c)
c.net.notifyAll(func(n inet.Notifiee) {
n.Disconnected(c.net, c)
})
return nil
}
@ -73,11 +76,17 @@ func (c *conn) allStreams() []inet.Stream {
func (c *conn) remoteOpenedStream(s *stream) {
c.addStream(s)
c.net.handleNewStream(s)
c.net.notifyAll(func(n inet.Notifiee) {
n.OpenedStream(c.net, s)
})
}
func (c *conn) openStream() *stream {
sl, sr := c.link.newStreamPair()
c.addStream(sl)
c.net.notifyAll(func(n inet.Notifiee) {
n.OpenedStream(c.net, sl)
})
c.rconn.remoteOpenedStream(sr)
return sl
}

View File

@ -0,0 +1,198 @@
package mocknet
import (
"testing"
"time"
inet "github.com/jbenet/go-ipfs/p2p/net"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
func TestNotifications(t *testing.T) {
t.Parallel()
mn, err := FullMeshLinked(context.Background(), 5)
if err != nil {
t.Fatal(err)
}
timeout := 5 * time.Second
// signup notifs
nets := mn.Nets()
notifiees := make([]*netNotifiee, len(nets))
for i, pn := range nets {
n := newNetNotifiee()
pn.Notify(n)
notifiees[i] = n
}
// connect all
for _, n1 := range nets {
for _, n2 := range nets {
if n1 == n2 {
continue
}
if _, err := mn.ConnectNets(n1, n2); err != nil {
t.Fatal(err)
}
}
}
// test everyone got the correct connection opened calls
for i, s := range nets {
n := notifiees[i]
for _, s2 := range nets {
cos := s.ConnsToPeer(s2.LocalPeer())
func() {
for i := 0; i < len(cos); i++ {
var c inet.Conn
select {
case c = <-n.connected:
case <-time.After(timeout):
t.Fatal("timeout")
}
for _, c2 := range cos {
if c == c2 {
t.Log("got notif for conn")
return
}
}
t.Error("connection not found")
}
}()
}
}
complement := func(c inet.Conn) (inet.Network, *netNotifiee, *conn) {
for i, s := range nets {
for _, c2 := range s.Conns() {
if c2.(*conn).rconn == c {
return s, notifiees[i], c2.(*conn)
}
}
}
t.Fatal("complementary conn not found", c)
return nil, nil, nil
}
testOCStream := func(n *netNotifiee, s inet.Stream) {
var s2 inet.Stream
select {
case s2 = <-n.openedStream:
t.Log("got notif for opened stream")
case <-time.After(timeout):
t.Fatal("timeout")
}
if s != nil && s != s2 {
t.Fatalf("got incorrect stream %p %p", s, s2)
}
select {
case s2 = <-n.closedStream:
t.Log("got notif for closed stream")
case <-time.After(timeout):
t.Fatal("timeout")
}
if s != nil && s != s2 {
t.Fatalf("got incorrect stream %p %p", s, s2)
}
}
streams := make(chan inet.Stream)
for _, s := range nets {
s.SetStreamHandler(func(s inet.Stream) {
streams <- s
s.Close()
})
}
// there's one stream per conn that we need to drain....
// unsure where these are coming from
for i, _ := range nets {
n := notifiees[i]
testOCStream(n, nil)
testOCStream(n, nil)
testOCStream(n, nil)
testOCStream(n, nil)
}
// open a streams in each conn
for i, s := range nets {
conns := s.Conns()
for _, c := range conns {
_, n2, c2 := complement(c)
st1, err := c.NewStream()
if err != nil {
t.Error(err)
} else {
t.Logf("%s %s <--%p--> %s %s", c.LocalPeer(), c.LocalMultiaddr(), st1, c.RemotePeer(), c.RemoteMultiaddr())
// st1.Write([]byte("hello"))
st1.Close()
st2 := <-streams
t.Logf("%s %s <--%p--> %s %s", c2.LocalPeer(), c2.LocalMultiaddr(), st2, c2.RemotePeer(), c2.RemoteMultiaddr())
testOCStream(notifiees[i], st1)
testOCStream(n2, st2)
}
}
}
// close conns
for i, s := range nets {
n := notifiees[i]
for _, c := range s.Conns() {
_, n2, c2 := complement(c)
c.(*conn).Close()
c2.Close()
var c3, c4 inet.Conn
select {
case c3 = <-n.disconnected:
case <-time.After(timeout):
t.Fatal("timeout")
}
if c != c3 {
t.Fatal("got incorrect conn", c, c3)
}
select {
case c4 = <-n2.disconnected:
case <-time.After(timeout):
t.Fatal("timeout")
}
if c2 != c4 {
t.Fatal("got incorrect conn", c, c2)
}
}
}
}
type netNotifiee struct {
connected chan inet.Conn
disconnected chan inet.Conn
openedStream chan inet.Stream
closedStream chan inet.Stream
}
func newNetNotifiee() *netNotifiee {
return &netNotifiee{
connected: make(chan inet.Conn),
disconnected: make(chan inet.Conn),
openedStream: make(chan inet.Stream),
closedStream: make(chan inet.Stream),
}
}
func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
nn.connected <- v
}
func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
nn.disconnected <- v
}
func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {
nn.openedStream <- v
}
func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) {
nn.closedStream <- v
}

View File

@ -31,6 +31,9 @@ type peernet struct {
streamHandler inet.StreamHandler
connHandler inet.ConnHandler
notifmu sync.RWMutex
notifs map[inet.Notifiee]struct{}
cg ctxgroup.ContextGroup
sync.RWMutex
}
@ -58,6 +61,8 @@ func newPeernet(ctx context.Context, m *mocknet, k ic.PrivKey,
connsByPeer: map[peer.ID]map[*conn]struct{}{},
connsByLink: map[*link]map[*conn]struct{}{},
notifs: make(map[inet.Notifiee]struct{}),
}
n.cg.SetTeardown(n.teardown)
@ -163,6 +168,9 @@ func (pn *peernet) openConn(r peer.ID, l *link) *conn {
lc, rc := l.newConnPair(pn)
log.Debugf("%s opening connection to %s", pn.LocalPeer(), lc.RemotePeer())
pn.addConn(lc)
pn.notifyAll(func(n inet.Notifiee) {
n.Connected(pn, lc)
})
rc.net.remoteOpenedConn(rc)
return lc
}
@ -171,6 +179,9 @@ func (pn *peernet) remoteOpenedConn(c *conn) {
log.Debugf("%s accepting connection from %s", pn.LocalPeer(), c.RemotePeer())
pn.addConn(c)
pn.handleNewConn(c)
pn.notifyAll(func(n inet.Notifiee) {
n.Connected(pn, c)
})
}
// addConn constructs and adds a connection
@ -201,13 +212,13 @@ func (pn *peernet) removeConn(c *conn) {
cs, found := pn.connsByLink[c.link]
if !found || len(cs) < 1 {
panic("attempting to remove a conn that doesnt exist")
panic(fmt.Sprintf("attempting to remove a conn that doesnt exist %p", c.link))
}
delete(cs, c)
cs, found = pn.connsByPeer[c.remote]
if !found {
panic("attempting to remove a conn that doesnt exist")
panic(fmt.Sprintf("attempting to remove a conn that doesnt exist %p", c.remote))
}
delete(cs, c)
}
@ -360,3 +371,28 @@ func (pn *peernet) SetConnHandler(h inet.ConnHandler) {
pn.connHandler = h
pn.Unlock()
}
// Notify signs up Notifiee to receive signals when events happen
func (pn *peernet) Notify(f inet.Notifiee) {
pn.notifmu.Lock()
pn.notifs[f] = struct{}{}
pn.notifmu.Unlock()
}
// StopNotify unregisters Notifiee fromr receiving signals
func (pn *peernet) StopNotify(f inet.Notifiee) {
pn.notifmu.Lock()
delete(pn.notifs, f)
pn.notifmu.Unlock()
}
// notifyAll runs the notification function on all Notifiees
func (pn *peernet) notifyAll(notification func(f inet.Notifiee)) {
pn.notifmu.RLock()
for n := range pn.notifs {
// make sure we dont block
// and they dont block each other.
go notification(n)
}
pn.notifmu.RUnlock()
}

View File

@ -19,8 +19,11 @@ func (s *stream) Close() error {
r.Close()
}
if w, ok := (s.Writer).(io.Closer); ok {
return w.Close()
w.Close()
}
s.conn.net.notifyAll(func(n inet.Notifiee) {
n.ClosedStream(s.conn.net, s)
})
return nil
}

View File

@ -4,6 +4,7 @@ package swarm
import (
"fmt"
"sync"
"time"
inet "github.com/jbenet/go-ipfs/p2p/net"
@ -38,6 +39,9 @@ type Swarm struct {
backf dialbackoff
dialT time.Duration // mainly for tests
notifmu sync.RWMutex
notifs map[inet.Notifiee]ps.Notifiee
cg ctxgroup.ContextGroup
}
@ -54,11 +58,12 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
}
s := &Swarm{
swarm: ps.NewSwarm(PSTransport),
local: local,
peers: peers,
cg: ctxgroup.WithContext(ctx),
dialT: DialTimeout,
swarm: ps.NewSwarm(PSTransport),
local: local,
peers: peers,
cg: ctxgroup.WithContext(ctx),
dialT: DialTimeout,
notifs: make(map[inet.Notifiee]ps.Notifiee),
}
// configure Swarm
@ -177,3 +182,51 @@ func (s *Swarm) Peers() []peer.ID {
func (s *Swarm) LocalPeer() peer.ID {
return s.local
}
// Notify signs up Notifiee to receive signals when events happen
func (s *Swarm) Notify(f inet.Notifiee) {
// wrap with our notifiee, to translate function calls
n := &ps2netNotifee{net: (*Network)(s), not: f}
s.notifmu.Lock()
s.notifs[f] = n
s.notifmu.Unlock()
// register for notifications in the peer swarm.
s.swarm.Notify(n)
}
// StopNotify unregisters Notifiee fromr receiving signals
func (s *Swarm) StopNotify(f inet.Notifiee) {
s.notifmu.Lock()
n, found := s.notifs[f]
if found {
delete(s.notifs, f)
}
s.notifmu.Unlock()
if found {
s.swarm.StopNotify(n)
}
}
type ps2netNotifee struct {
net *Network
not inet.Notifiee
}
func (n *ps2netNotifee) Connected(c *ps.Conn) {
n.not.Connected(n.net, inet.Conn((*Conn)(c)))
}
func (n *ps2netNotifee) Disconnected(c *ps.Conn) {
n.not.Disconnected(n.net, inet.Conn((*Conn)(c)))
}
func (n *ps2netNotifee) OpenedStream(s *ps.Stream) {
n.not.OpenedStream(n.net, inet.Stream((*Stream)(s)))
}
func (n *ps2netNotifee) ClosedStream(s *ps.Stream) {
n.not.ClosedStream(n.net, inet.Stream((*Stream)(s)))
}

View File

@ -154,3 +154,13 @@ func (n *Network) SetConnHandler(h inet.ConnHandler) {
func (n *Network) String() string {
return fmt.Sprintf("<Network %s>", n.LocalPeer())
}
// Notify signs up Notifiee to receive signals when events happen
func (n *Network) Notify(f inet.Notifiee) {
n.Swarm().Notify(f)
}
// StopNotify unregisters Notifiee fromr receiving signals
func (n *Network) StopNotify(f inet.Notifiee) {
n.Swarm().StopNotify(f)
}

View File

@ -0,0 +1,186 @@
package swarm
import (
"testing"
"time"
inet "github.com/jbenet/go-ipfs/p2p/net"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
func TestNotifications(t *testing.T) {
t.Parallel()
ctx := context.Background()
swarms := makeSwarms(ctx, t, 5)
defer func() {
for _, s := range swarms {
s.Close()
}
}()
timeout := 5 * time.Second
// signup notifs
notifiees := make([]*netNotifiee, len(swarms))
for i, swarm := range swarms {
n := newNetNotifiee()
swarm.Notify(n)
notifiees[i] = n
}
connectSwarms(t, ctx, swarms)
<-time.After(time.Millisecond)
// should've gotten 5 by now.
// test everyone got the correct connection opened calls
for i, s := range swarms {
n := notifiees[i]
for _, s2 := range swarms {
if s == s2 {
continue
}
cos := s.ConnectionsToPeer(s2.LocalPeer())
func() {
for i := 0; i < len(cos); i++ {
var c inet.Conn
select {
case c = <-n.connected:
case <-time.After(timeout):
t.Fatal("timeout")
}
for _, c2 := range cos {
if c == c2 {
t.Log("got notif for conn", c)
return
}
}
t.Error("connection not found", c)
}
}()
}
}
complement := func(c inet.Conn) (*Swarm, *netNotifiee, *Conn) {
for i, s := range swarms {
for _, c2 := range s.Connections() {
if c.LocalMultiaddr().Equal(c2.RemoteMultiaddr()) &&
c2.LocalMultiaddr().Equal(c.RemoteMultiaddr()) {
return s, notifiees[i], c2
}
}
}
t.Fatal("complementary conn not found", c)
return nil, nil, nil
}
testOCStream := func(n *netNotifiee, s inet.Stream) {
var s2 inet.Stream
select {
case s2 = <-n.openedStream:
t.Log("got notif for opened stream")
case <-time.After(timeout):
t.Fatal("timeout")
}
if s != s2 {
t.Fatal("got incorrect stream", s.Conn(), s2.Conn())
}
select {
case s2 = <-n.closedStream:
t.Log("got notif for closed stream")
case <-time.After(timeout):
t.Fatal("timeout")
}
if s != s2 {
t.Fatal("got incorrect stream", s.Conn(), s2.Conn())
}
}
streams := make(chan inet.Stream)
for _, s := range swarms {
s.SetStreamHandler(func(s inet.Stream) {
streams <- s
s.Close()
})
}
// open a streams in each conn
for i, s := range swarms {
for _, c := range s.Connections() {
_, n2, _ := complement(c)
st1, err := c.NewStream()
if err != nil {
t.Error(err)
} else {
st1.Write([]byte("hello"))
st1.Close()
testOCStream(notifiees[i], st1)
st2 := <-streams
testOCStream(n2, st2)
}
}
}
// close conns
for i, s := range swarms {
n := notifiees[i]
for _, c := range s.Connections() {
_, n2, c2 := complement(c)
c.Close()
c2.Close()
var c3, c4 inet.Conn
select {
case c3 = <-n.disconnected:
case <-time.After(timeout):
t.Fatal("timeout")
}
if c != c3 {
t.Fatal("got incorrect conn", c, c3)
}
select {
case c4 = <-n2.disconnected:
case <-time.After(timeout):
t.Fatal("timeout")
}
if c2 != c4 {
t.Fatal("got incorrect conn", c, c2)
}
}
}
}
type netNotifiee struct {
connected chan inet.Conn
disconnected chan inet.Conn
openedStream chan inet.Stream
closedStream chan inet.Stream
}
func newNetNotifiee() *netNotifiee {
return &netNotifiee{
connected: make(chan inet.Conn),
disconnected: make(chan inet.Conn),
openedStream: make(chan inet.Stream),
closedStream: make(chan inet.Stream),
}
}
func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
nn.connected <- v
}
func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
nn.disconnected <- v
}
func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {
nn.openedStream <- v
}
func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) {
nn.closedStream <- v
}

View File

@ -65,9 +65,17 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip
dht.datastore = dstore
dht.self = h.ID()
dht.peerstore = h.Peerstore()
dht.ContextGroup = ctxgroup.WithContext(ctx)
dht.host = h
// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))
dht.ContextGroup = ctxgroup.WithContextAndTeardown(ctx, func() error {
// remove ourselves from network notifs.
dht.host.Network().StopNotify((*netNotifiee)(dht))
return nil
})
// sanity check. this should **never** happen
if len(dht.peerstore.Addresses(dht.self)) < 1 {
panic("attempt to initialize dht without addresses for self")

33
routing/dht/notif.go Normal file
View File

@ -0,0 +1,33 @@
package dht
import (
inet "github.com/jbenet/go-ipfs/p2p/net"
)
// netNotifiee defines methods to be used with the IpfsDHT
type netNotifiee IpfsDHT
func (nn *netNotifiee) DHT() *IpfsDHT {
return (*IpfsDHT)(nn)
}
func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
dht := nn.DHT()
select {
case <-dht.Closing():
return
}
dht.Update(dht.Context(), v.RemotePeer())
}
func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
dht := nn.DHT()
select {
case <-dht.Closing():
return
}
dht.routingTable.Remove(v.RemotePeer())
}
func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {}
func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) {}

View File

@ -30,6 +30,16 @@ func (b *Bucket) find(id peer.ID) *list.Element {
return nil
}
func (b *Bucket) remove(id peer.ID) {
b.lk.RLock()
defer b.lk.RUnlock()
for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(peer.ID) == id {
b.list.Remove(e)
}
}
}
func (b *Bucket) moveToFront(e *list.Element) {
b.lk.Lock()
b.list.MoveToFront(e)

View File

@ -87,6 +87,23 @@ func (rt *RoutingTable) Update(p peer.ID) peer.ID {
return ""
}
// Remove deletes a peer from the routing table. This is to be used
// when we are sure a node has disconnected completely.
func (rt *RoutingTable) Remove(p peer.ID) {
rt.tabLock.Lock()
defer rt.tabLock.Unlock()
peerID := ConvertPeerID(p)
cpl := commonPrefixLen(peerID, rt.local)
bucketID := cpl
if bucketID >= len(rt.Buckets) {
bucketID = len(rt.Buckets) - 1
}
bucket := rt.Buckets[bucketID]
bucket.remove(p)
}
func (rt *RoutingTable) nextBucket() peer.ID {
bucket := rt.Buckets[len(rt.Buckets)-1]
newBucket := bucket.Split(len(rt.Buckets)-1, rt.local)