diff --git a/core/core.go b/core/core.go index af95cac98..93411c4a4 100644 --- a/core/core.go +++ b/core/core.go @@ -214,12 +214,30 @@ func (n *IpfsNode) StartOnlineServices(ctx context.Context) error { return err } - peerhost, err := constructPeerHost(ctx, n.Repo.Config(), n.Identity, n.Peerstore) + peerhost, err := constructPeerHost(ctx, n.Identity, n.Peerstore) if err != nil { return debugerror.Wrap(err) } n.PeerHost = peerhost + if err := n.startOnlineServicesWithHost(ctx); err != nil { + return err + } + + // Ok, now we're ready to listen. + if err := startListening(ctx, n.PeerHost, n.Repo.Config()); err != nil { + return debugerror.Wrap(err) + } + + n.Reprovider = rp.NewReprovider(n.Routing, n.Blockstore) + go n.Reprovider.ProvideEvery(ctx, kReprovideFrequency) + + return n.Bootstrap(DefaultBootstrapConfig) +} + +// startOnlineServicesWithHost is the set of services which need to be +// initialized with the host and _before_ we start listening. +func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context) error { // setup diagnostics service n.Diagnostics = diag.NewDiagnostics(n.Identity, n.PeerHost) @@ -236,13 +254,8 @@ func (n *IpfsNode) StartOnlineServices(ctx context.Context) error { n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Blockstore, alwaysSendToPeer) // setup name system - // TODO implement an offline namesys that serves only local names. n.Namesys = namesys.NewNameSystem(n.Routing) - - n.Reprovider = rp.NewReprovider(n.Routing, n.Blockstore) - go n.Reprovider.ProvideEvery(ctx, kReprovideFrequency) - - return n.Bootstrap(DefaultBootstrapConfig) + return nil } // teardown closes owned children. If any errors occur, this function returns @@ -405,37 +418,49 @@ func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) { } // isolates the complex initialization steps -func constructPeerHost(ctx context.Context, cfg *config.Config, id peer.ID, ps peer.Peerstore) (p2phost.Host, error) { - listenAddrs, err := listenAddresses(cfg) +func constructPeerHost(ctx context.Context, id peer.ID, ps peer.Peerstore) (p2phost.Host, error) { + + // no addresses to begin with. we'll start later. + network, err := swarm.NewNetwork(ctx, nil, id, ps) if err != nil { return nil, debugerror.Wrap(err) } + host := p2pbhost.New(network, p2pbhost.NATPortMap) + return host, nil +} + +// startListening on the network addresses +func startListening(ctx context.Context, host p2phost.Host, cfg *config.Config) error { + listenAddrs, err := listenAddresses(cfg) + if err != nil { + return debugerror.Wrap(err) + } + // make sure we error out if our config does not have addresses we can use log.Debugf("Config.Addresses.Swarm:%s", listenAddrs) filteredAddrs := addrutil.FilterUsableAddrs(listenAddrs) log.Debugf("Config.Addresses.Swarm:%s (filtered)", filteredAddrs) if len(filteredAddrs) < 1 { - return nil, debugerror.Errorf("addresses in config not usable: %s", listenAddrs) + return debugerror.Errorf("addresses in config not usable: %s", listenAddrs) } - network, err := swarm.NewNetwork(ctx, filteredAddrs, id, ps) - if err != nil { - return nil, debugerror.Wrap(err) + // Actually start listening: + if err := host.Network().Listen(filteredAddrs...); err != nil { + return err } - peerhost := p2pbhost.New(network, p2pbhost.NATPortMap) // explicitly set these as our listen addrs. // (why not do it inside inet.NewNetwork? because this way we can // listen on addresses without necessarily advertising those publicly.) - addrs, err := peerhost.Network().InterfaceListenAddresses() + addrs, err := host.Network().InterfaceListenAddresses() if err != nil { - return nil, debugerror.Wrap(err) + return debugerror.Wrap(err) } log.Infof("Swarm listening at: %s", addrs) - ps.AddAddresses(id, addrs) - return peerhost, nil + host.Peerstore().AddAddresses(host.ID(), addrs) + return nil } func constructDHTRouting(ctx context.Context, host p2phost.Host, ds datastore.ThreadSafeDatastore) (*dht.IpfsDHT, error) { diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index bab465c72..92743f916 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -2,6 +2,7 @@ package network import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" host "github.com/jbenet/go-ipfs/p2p/host" @@ -171,3 +172,5 @@ func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) { func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {} func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) {} +func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) {} +func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) {} diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index c1c7f2eb1..b9314c399 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -1,15 +1,12 @@ package basichost import ( - "sync" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog" - inat "github.com/jbenet/go-ipfs/p2p/nat" inet "github.com/jbenet/go-ipfs/p2p/net" peer "github.com/jbenet/go-ipfs/p2p/peer" protocol "github.com/jbenet/go-ipfs/p2p/protocol" @@ -42,9 +39,7 @@ type BasicHost struct { mux *protocol.Mux ids *identify.IDService relay *relay.RelayService - - natmu sync.Mutex - nat *inat.NAT + natmgr *natManager proc goprocess.Process } @@ -57,6 +52,10 @@ func New(net inet.Network, opts ...Option) *BasicHost { } h.proc = goprocess.WithTeardown(func() error { + if h.natmgr != nil { + h.natmgr.Close() + } + return h.Network().Close() }) @@ -70,45 +69,13 @@ func New(net inet.Network, opts ...Option) *BasicHost { for _, o := range opts { switch o { case NATPortMap: - h.setupNATPortMap() + h.natmgr = newNatManager(h) } } return h } -func (h *BasicHost) setupNATPortMap() { - // do this asynchronously to avoid blocking daemon startup - - h.proc.Go(func(worker goprocess.Process) { - nat := inat.DiscoverNAT() - if nat == nil { // no nat, or failed to get it. - return - } - - select { - case <-worker.Closing(): - nat.Close() - return - default: - } - - // wire up the nat to close when proc closes. - h.proc.AddChild(nat.Process()) - - h.natmu.Lock() - h.nat = nat - h.natmu.Unlock() - - addrs := h.Network().ListenAddresses() - nat.PortMapAddrs(addrs) - mapAddrs := nat.ExternalAddrs() - if len(mapAddrs) > 0 { - log.Infof("NAT mapping addrs: %s", mapAddrs) - } - }) -} - // newConnHandler is the remote-opened conn handler for inet.Network func (h *BasicHost) newConnHandler(c inet.Conn) { h.ids.IdentifyConn(c) @@ -214,17 +181,19 @@ func (h *BasicHost) dialPeer(ctx context.Context, p peer.ID) error { return nil } +// Addrs returns all the addresses of BasicHost at this moment in time. +// It's ok to not include addresses if they're not available to be used now. func (h *BasicHost) Addrs() []ma.Multiaddr { addrs, err := h.Network().InterfaceListenAddresses() if err != nil { log.Debug("error retrieving network interface addrs") } - h.natmu.Lock() - nat := h.nat - h.natmu.Unlock() - if nat != nil { - addrs = append(addrs, nat.ExternalAddrs()...) + if h.natmgr != nil { // natmgr is nil if we do not use nat option. + nat := h.natmgr.NAT() + if nat != nil { // nat is nil if not ready, or no nat is available. + addrs = append(addrs, nat.ExternalAddrs()...) + } } return addrs diff --git a/p2p/host/basic/natmgr.go b/p2p/host/basic/natmgr.go new file mode 100644 index 000000000..492841a9d --- /dev/null +++ b/p2p/host/basic/natmgr.go @@ -0,0 +1,224 @@ +package basichost + +import ( + "sync" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + + inat "github.com/jbenet/go-ipfs/p2p/nat" + inet "github.com/jbenet/go-ipfs/p2p/net" + lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables" +) + +// natManager takes care of adding + removing port mappings to the nat. +// Initialized with the host if it has a NATPortMap option enabled. +// natManager receives signals from the network, and check on nat mappings: +// * natManager listens to the network and adds or closes port mappings +// as the network signals Listen() or ListenClose(). +// * closing the natManager closes the nat and its mappings. +type natManager struct { + host *BasicHost + natmu sync.RWMutex // guards nat (ready could obviate this mutex, but safety first.) + nat *inat.NAT + + ready chan struct{} // closed once the nat is ready to process port mappings + proc goprocess.Process // natManager has a process + children. can be closed. +} + +func newNatManager(host *BasicHost) *natManager { + nmgr := &natManager{ + host: host, + ready: make(chan struct{}), + proc: goprocess.WithParent(host.proc), + } + + // teardown + nmgr.proc = goprocess.WithTeardown(func() error { + // on closing, unregister from network notifications. + host.Network().StopNotify((*nmgrNetNotifiee)(nmgr)) + return nil + }) + + // host is our parent. close when host closes. + host.proc.AddChild(nmgr.proc) + + // discover the nat. + nmgr.discoverNAT() + return nmgr +} + +// Close closes the natManager, closing the underlying nat +// and unregistering from network events. +func (nmgr *natManager) Close() error { + return nmgr.proc.Close() +} + +// Ready returns a channel which will be closed when the NAT has been found +// and is ready to be used, or the search process is done. +func (nmgr *natManager) Ready() <-chan struct{} { + return nmgr.ready +} + +func (nmgr *natManager) discoverNAT() { + + nmgr.proc.Go(func(worker goprocess.Process) { + // inat.DiscoverNAT blocks until the nat is found or a timeout + // is reached. we unfortunately cannot specify timeouts-- the + // library we're using just blocks. + // + // Note: on early shutdown, there may be a case where we're trying + // to close before DiscoverNAT() returns. Since we cant cancel it + // (library) we can choose to (1) drop the result and return early, + // or (2) wait until it times out to exit. For now we choose (2), + // to avoid leaking resources in a non-obvious way. the only case + // this affects is when the daemon is being started up and _immediately_ + // asked to close. other services are also starting up, so ok to wait. + nat := inat.DiscoverNAT() + if nat == nil { // no nat, or failed to get it. + return + } + + // by this point -- after finding the NAT -- we may have already + // be closing. if so, just exit. + select { + case <-worker.Closing(): + nat.Close() + return + default: + } + + // wire up the nat to close when nmgr closes. + // nmgr.proc is our parent, and waiting for us. + nmgr.proc.AddChild(nat.Process()) + + // set the nat. + nmgr.natmu.Lock() + nmgr.nat = nat + nmgr.natmu.Unlock() + + // signal that we're ready to process nat mappings: + close(nmgr.ready) + + // sign natManager up for network notifications + // we need to sign up here to avoid missing some notifs + // before the NAT has been found. + nmgr.host.Network().Notify((*nmgrNetNotifiee)(nmgr)) + + // if any interfaces were brought up while we were setting up + // the nat, now is the time to setup port mappings for them. + // we release ready, then grab them to avoid losing any. adding + // a port mapping is idempotent, so its ok to add the same twice. + addrs := nmgr.host.Network().ListenAddresses() + for _, addr := range addrs { + // we do it async because it's slow and we may want to close beforehand + go addPortMapping(nmgr, addr) + } + }) +} + +// NAT returns the natManager's nat object. this may be nil, if +// (a) the search process is still ongoing, or (b) the search process +// found no nat. Clients must check whether the return value is nil. +func (nmgr *natManager) NAT() *inat.NAT { + nmgr.natmu.Lock() + defer nmgr.natmu.Unlock() + return nmgr.nat +} + +func addPortMapping(nmgr *natManager, intaddr ma.Multiaddr) { + nat := nmgr.NAT() + if nat == nil { + panic("natManager addPortMapping called without a nat.") + } + + // first, check if the port mapping already exists. + for _, mapping := range nat.Mappings() { + if mapping.InternalAddr().Equal(intaddr) { + return // it exists! return. + } + } + + ctx := context.TODO() + lm := make(lgbl.DeferredMap) + lm["internalAddr"] = func() interface{} { return intaddr.String() } + + defer log.EventBegin(ctx, "natMgrAddPortMappingWait", lm).Done() + + select { + case <-nmgr.proc.Closing(): + lm["outcome"] = "cancelled" + return // no use. + case <-nmgr.ready: // wait until it's ready. + } + + // actually start the port map (sub-event because waiting may take a while) + defer log.EventBegin(ctx, "natMgrAddPortMapping", lm).Done() + + // get the nat + m, err := nat.NewMapping(intaddr) + if err != nil { + lm["outcome"] = "failure" + lm["error"] = err + return + } + + extaddr, err := m.ExternalAddr() + if err != nil { + lm["outcome"] = "failure" + lm["error"] = err + return + } + + lm["outcome"] = "success" + lm["externalAddr"] = func() interface{} { return extaddr.String() } + log.Infof("established nat port mapping: %s <--> %s", intaddr, extaddr) +} + +func rmPortMapping(nmgr *natManager, intaddr ma.Multiaddr) { + nat := nmgr.NAT() + if nat == nil { + panic("natManager rmPortMapping called without a nat.") + } + + // list the port mappings (it may be gone on it's own, so we need to + // check this list, and not store it ourselves behind the scenes) + + // close mappings for this internal address. + for _, mapping := range nat.Mappings() { + if mapping.InternalAddr().Equal(intaddr) { + mapping.Close() + } + } +} + +// nmgrNetNotifiee implements the network notification listening part +// of the natManager. this is merely listening to Listen() and ListenClose() +// events. +type nmgrNetNotifiee natManager + +func (nn *nmgrNetNotifiee) natManager() *natManager { + return (*natManager)(nn) +} + +func (nn *nmgrNetNotifiee) Listen(n inet.Network, addr ma.Multiaddr) { + if nn.natManager().NAT() == nil { + return // not ready or doesnt exist. + } + + addPortMapping(nn.natManager(), addr) +} + +func (nn *nmgrNetNotifiee) ListenClose(n inet.Network, addr ma.Multiaddr) { + if nn.natManager().NAT() == nil { + return // not ready or doesnt exist. + } + + rmPortMapping(nn.natManager(), addr) +} + +func (nn *nmgrNetNotifiee) Connected(inet.Network, inet.Conn) {} +func (nn *nmgrNetNotifiee) Disconnected(inet.Network, inet.Conn) {} +func (nn *nmgrNetNotifiee) OpenedStream(inet.Network, inet.Stream) {} +func (nn *nmgrNetNotifiee) ClosedStream(inet.Network, inet.Stream) {} diff --git a/p2p/nat/nat.go b/p2p/nat/nat.go index 1340d6ee2..48ef0b7ce 100644 --- a/p2p/nat/nat.go +++ b/p2p/nat/nat.go @@ -55,15 +55,16 @@ type NAT struct { proc goprocess.Process // manages nat mappings lifecycle mappingmu sync.RWMutex // guards mappings - mappings []*mapping + mappings map[*mapping]struct{} Notifier } func newNAT(realNAT nat.NAT) *NAT { return &NAT{ - nat: realNAT, - proc: goprocess.WithParent(goprocess.Background()), + nat: realNAT, + proc: goprocess.WithParent(goprocess.Background()), + mappings: make(map[*mapping]struct{}), } } @@ -143,6 +144,9 @@ type Mapping interface { // ExternalAddr returns the external facing address. If the mapping is not // established, addr will be nil, and and ErrNoMapping will be returned. ExternalAddr() (addr ma.Multiaddr, err error) + + // Close closes the port mapping + Close() error } // keeps republishing @@ -230,9 +234,9 @@ func (m *mapping) Close() error { // Mappings returns a slice of all NAT mappings func (nat *NAT) Mappings() []Mapping { nat.mappingmu.Lock() - maps2 := make([]Mapping, len(nat.mappings)) - for i, m := range nat.mappings { - maps2[i] = m + maps2 := make([]Mapping, 0, len(nat.mappings)) + for m := range nat.mappings { + maps2 = append(maps2, m) } nat.mappingmu.Unlock() return maps2 @@ -243,7 +247,13 @@ func (nat *NAT) addMapping(m *mapping) { nat.proc.AddChild(m.proc) nat.mappingmu.Lock() - nat.mappings = append(nat.mappings, m) + nat.mappings[m] = struct{}{} + nat.mappingmu.Unlock() +} + +func (nat *NAT) rmMapping(m *mapping) { + nat.mappingmu.Lock() + delete(nat.mappings, m) nat.mappingmu.Unlock() } @@ -286,10 +296,16 @@ func (nat *NAT) NewMapping(maddr ma.Multiaddr) (Mapping, error) { intport: intport, intaddr: maddr, } - m.proc = periodic.Every(MappingDuration/3, func(worker goprocess.Process) { - nat.establishMapping(m) + m.proc = goprocess.WithTeardown(func() error { + nat.rmMapping(m) + return nil }) nat.addMapping(m) + + m.proc.AddChild(periodic.Every(MappingDuration/3, func(worker goprocess.Process) { + nat.establishMapping(m) + })) + // do it once synchronously, so first mapping is done right away, and before exiting, // allowing users -- in the optimistic case -- to use results right after. nat.establishMapping(m) diff --git a/p2p/net/interface.go b/p2p/net/interface.go index 5998f2a63..a5a006be2 100644 --- a/p2p/net/interface.go +++ b/p2p/net/interface.go @@ -142,10 +142,12 @@ const ( // Notifiee is an interface for an object wishing to receive // notifications from a Network. type Notifiee interface { - Connected(Network, Conn) // called when a connection opened - Disconnected(Network, Conn) // called when a connection closed - OpenedStream(Network, Stream) // called when a stream opened - ClosedStream(Network, Stream) // called when a stream closed + Listen(Network, ma.Multiaddr) // called when network starts listening on an addr + ListenClose(Network, ma.Multiaddr) // called when network starts listening on an addr + Connected(Network, Conn) // called when a connection opened + Disconnected(Network, Conn) // called when a connection closed + OpenedStream(Network, Stream) // called when a stream opened + ClosedStream(Network, Stream) // called when a stream closed // TODO // PeerConnected(Network, peer.ID) // called when a peer connected diff --git a/p2p/net/mock/mock_notif_test.go b/p2p/net/mock/mock_notif_test.go index 1b80ecb96..37b143850 100644 --- a/p2p/net/mock/mock_notif_test.go +++ b/p2p/net/mock/mock_notif_test.go @@ -4,9 +4,10 @@ import ( "testing" "time" - inet "github.com/jbenet/go-ipfs/p2p/net" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + + inet "github.com/jbenet/go-ipfs/p2p/net" ) func TestNotifications(t *testing.T) { @@ -169,6 +170,8 @@ func TestNotifications(t *testing.T) { } type netNotifiee struct { + listen chan ma.Multiaddr + listenClose chan ma.Multiaddr connected chan inet.Conn disconnected chan inet.Conn openedStream chan inet.Stream @@ -177,6 +180,8 @@ type netNotifiee struct { func newNetNotifiee() *netNotifiee { return &netNotifiee{ + listen: make(chan ma.Multiaddr), + listenClose: make(chan ma.Multiaddr), connected: make(chan inet.Conn), disconnected: make(chan inet.Conn), openedStream: make(chan inet.Stream), @@ -184,6 +189,12 @@ func newNetNotifiee() *netNotifiee { } } +func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) { + nn.listen <- a +} +func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) { + nn.listenClose <- a +} func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { nn.connected <- v } diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 6512f67e5..e41177247 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -202,6 +202,15 @@ func (s *Swarm) LocalPeer() peer.ID { return s.local } +// notifyAll sends a signal to all Notifiees +func (s *Swarm) notifyAll(notify func(inet.Notifiee)) { + s.notifmu.RLock() + for f := range s.notifs { + go notify(f) + } + s.notifmu.RUnlock() +} + // Notify signs up Notifiee to receive signals when events happen func (s *Swarm) Notify(f inet.Notifiee) { // wrap with our notifiee, to translate function calls diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 216493b34..bad1422cc 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -382,7 +382,7 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote for i := 0; i < len(remoteAddrs); i++ { select { case err = <-errs: - log.Info(err) + log.Debug(err) case connC := <-conns: // take the first + return asap close(foundConn) diff --git a/p2p/net/swarm/swarm_listen.go b/p2p/net/swarm/swarm_listen.go index d3db85005..734726fac 100644 --- a/p2p/net/swarm/swarm_listen.go +++ b/p2p/net/swarm/swarm_listen.go @@ -3,6 +3,7 @@ package swarm import ( "fmt" + inet "github.com/jbenet/go-ipfs/p2p/net" conn "github.com/jbenet/go-ipfs/p2p/net/conn" addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr" lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables" @@ -60,7 +61,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { // may be fine for sk to be nil, just log a warning. log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.") } - log.Infof("Swarm Listening at %s", maddr) + log.Debugf("Swarm Listening at %s", maddr) list, err := conn.Listen(s.cg.Context(), maddr, s.local, sk) if err != nil { return err @@ -72,20 +73,31 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { if err != nil { return err } - log.Infof("Swarm Listeners at %s", s.ListenAddresses()) + log.Debugf("Swarm Listeners at %s", s.ListenAddresses()) + + // signal to our notifiees on successful conn. + s.notifyAll(func(n inet.Notifiee) { + n.Listen((*Network)(s), maddr) + }) // go consume peerstream's listen accept errors. note, these ARE errors. // they may be killing the listener, and if we get _any_ we should be // fixing this in our conn.Listener (to ignore them or handle them // differently.) go func(ctx context.Context, sl *ps.Listener) { + + // signal to our notifiees closing + defer s.notifyAll(func(n inet.Notifiee) { + n.ListenClose((*Network)(s), maddr) + }) + for { select { case err, more := <-sl.AcceptErrors(): if !more { return } - log.Info(err) + log.Debugf("swarm listener accept error: %s", err) case <-ctx.Done(): return } diff --git a/p2p/net/swarm/swarm_notif_test.go b/p2p/net/swarm/swarm_notif_test.go index 3469a1b32..dccf69b2e 100644 --- a/p2p/net/swarm/swarm_notif_test.go +++ b/p2p/net/swarm/swarm_notif_test.go @@ -4,9 +4,10 @@ import ( "testing" "time" - inet "github.com/jbenet/go-ipfs/p2p/net" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + + inet "github.com/jbenet/go-ipfs/p2p/net" ) func TestNotifications(t *testing.T) { @@ -157,6 +158,8 @@ func TestNotifications(t *testing.T) { } type netNotifiee struct { + listen chan ma.Multiaddr + listenClose chan ma.Multiaddr connected chan inet.Conn disconnected chan inet.Conn openedStream chan inet.Stream @@ -165,6 +168,8 @@ type netNotifiee struct { func newNetNotifiee() *netNotifiee { return &netNotifiee{ + listen: make(chan ma.Multiaddr), + listenClose: make(chan ma.Multiaddr), connected: make(chan inet.Conn), disconnected: make(chan inet.Conn), openedStream: make(chan inet.Stream), @@ -172,6 +177,12 @@ func newNetNotifiee() *netNotifiee { } } +func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) { + nn.listen <- a +} +func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) { + nn.listenClose <- a +} func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { nn.connected <- v } diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 42e0cd7b7..d34ac720b 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -76,11 +76,6 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip return nil }) - // sanity check. this should **never** happen - if len(dht.peerstore.Addresses(dht.self)) < 1 { - panic("attempt to initialize dht without addresses for self") - } - h.SetStreamHandler(ProtocolDHT, dht.handleNewStream) dht.providers = NewProviderManager(dht.Context(), dht.self) dht.AddChildGroup(dht.providers) diff --git a/routing/dht/notif.go b/routing/dht/notif.go index 318db12ea..82e097753 100644 --- a/routing/dht/notif.go +++ b/routing/dht/notif.go @@ -1,6 +1,8 @@ package dht import ( + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + inet "github.com/jbenet/go-ipfs/p2p/net" ) @@ -31,3 +33,5 @@ func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) { func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {} func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) {} +func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) {} +func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) {}