From 98f2b0779f9a2c42b10415c969e1d14106c36bfd Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Fri, 30 Jan 2015 20:17:55 -0800 Subject: [PATCH 1/6] p2p/net: notify on listens Network now signals when it successfully listens on some address or when an address shuts down. This will be used to establish and close nat port mappings. It could also be used to notify peers of address changes. --- exchange/bitswap/network/ipfs_impl.go | 3 +++ p2p/net/interface.go | 10 ++++++---- p2p/net/mock/mock_notif_test.go | 15 +++++++++++++-- p2p/net/swarm/swarm.go | 9 +++++++++ p2p/net/swarm/swarm_dial.go | 2 +- p2p/net/swarm/swarm_listen.go | 18 +++++++++++++++--- p2p/net/swarm/swarm_notif_test.go | 15 +++++++++++++-- routing/dht/notif.go | 4 ++++ 8 files changed, 64 insertions(+), 12 deletions(-) diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index bab465c72..92743f916 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -2,6 +2,7 @@ package network import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" host "github.com/jbenet/go-ipfs/p2p/host" @@ -171,3 +172,5 @@ func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) { func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {} func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) {} +func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) {} +func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) {} diff --git a/p2p/net/interface.go b/p2p/net/interface.go index 5998f2a63..a5a006be2 100644 --- a/p2p/net/interface.go +++ b/p2p/net/interface.go @@ -142,10 +142,12 @@ const ( // Notifiee is an interface for an object wishing to receive // notifications from a Network. type Notifiee interface { - Connected(Network, Conn) // called when a connection opened - Disconnected(Network, Conn) // called when a connection closed - OpenedStream(Network, Stream) // called when a stream opened - ClosedStream(Network, Stream) // called when a stream closed + Listen(Network, ma.Multiaddr) // called when network starts listening on an addr + ListenClose(Network, ma.Multiaddr) // called when network starts listening on an addr + Connected(Network, Conn) // called when a connection opened + Disconnected(Network, Conn) // called when a connection closed + OpenedStream(Network, Stream) // called when a stream opened + ClosedStream(Network, Stream) // called when a stream closed // TODO // PeerConnected(Network, peer.ID) // called when a peer connected diff --git a/p2p/net/mock/mock_notif_test.go b/p2p/net/mock/mock_notif_test.go index 1b80ecb96..37b143850 100644 --- a/p2p/net/mock/mock_notif_test.go +++ b/p2p/net/mock/mock_notif_test.go @@ -4,9 +4,10 @@ import ( "testing" "time" - inet "github.com/jbenet/go-ipfs/p2p/net" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + + inet "github.com/jbenet/go-ipfs/p2p/net" ) func TestNotifications(t *testing.T) { @@ -169,6 +170,8 @@ func TestNotifications(t *testing.T) { } type netNotifiee struct { + listen chan ma.Multiaddr + listenClose chan ma.Multiaddr connected chan inet.Conn disconnected chan inet.Conn openedStream chan inet.Stream @@ -177,6 +180,8 @@ type netNotifiee struct { func newNetNotifiee() *netNotifiee { return &netNotifiee{ + listen: make(chan ma.Multiaddr), + listenClose: make(chan ma.Multiaddr), connected: make(chan inet.Conn), disconnected: make(chan inet.Conn), openedStream: make(chan inet.Stream), @@ -184,6 +189,12 @@ func newNetNotifiee() *netNotifiee { } } +func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) { + nn.listen <- a +} +func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) { + nn.listenClose <- a +} func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { nn.connected <- v } diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 6512f67e5..e41177247 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -202,6 +202,15 @@ func (s *Swarm) LocalPeer() peer.ID { return s.local } +// notifyAll sends a signal to all Notifiees +func (s *Swarm) notifyAll(notify func(inet.Notifiee)) { + s.notifmu.RLock() + for f := range s.notifs { + go notify(f) + } + s.notifmu.RUnlock() +} + // Notify signs up Notifiee to receive signals when events happen func (s *Swarm) Notify(f inet.Notifiee) { // wrap with our notifiee, to translate function calls diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 216493b34..bad1422cc 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -382,7 +382,7 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote for i := 0; i < len(remoteAddrs); i++ { select { case err = <-errs: - log.Info(err) + log.Debug(err) case connC := <-conns: // take the first + return asap close(foundConn) diff --git a/p2p/net/swarm/swarm_listen.go b/p2p/net/swarm/swarm_listen.go index d3db85005..734726fac 100644 --- a/p2p/net/swarm/swarm_listen.go +++ b/p2p/net/swarm/swarm_listen.go @@ -3,6 +3,7 @@ package swarm import ( "fmt" + inet "github.com/jbenet/go-ipfs/p2p/net" conn "github.com/jbenet/go-ipfs/p2p/net/conn" addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr" lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables" @@ -60,7 +61,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { // may be fine for sk to be nil, just log a warning. log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.") } - log.Infof("Swarm Listening at %s", maddr) + log.Debugf("Swarm Listening at %s", maddr) list, err := conn.Listen(s.cg.Context(), maddr, s.local, sk) if err != nil { return err @@ -72,20 +73,31 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { if err != nil { return err } - log.Infof("Swarm Listeners at %s", s.ListenAddresses()) + log.Debugf("Swarm Listeners at %s", s.ListenAddresses()) + + // signal to our notifiees on successful conn. + s.notifyAll(func(n inet.Notifiee) { + n.Listen((*Network)(s), maddr) + }) // go consume peerstream's listen accept errors. note, these ARE errors. // they may be killing the listener, and if we get _any_ we should be // fixing this in our conn.Listener (to ignore them or handle them // differently.) go func(ctx context.Context, sl *ps.Listener) { + + // signal to our notifiees closing + defer s.notifyAll(func(n inet.Notifiee) { + n.ListenClose((*Network)(s), maddr) + }) + for { select { case err, more := <-sl.AcceptErrors(): if !more { return } - log.Info(err) + log.Debugf("swarm listener accept error: %s", err) case <-ctx.Done(): return } diff --git a/p2p/net/swarm/swarm_notif_test.go b/p2p/net/swarm/swarm_notif_test.go index 3469a1b32..dccf69b2e 100644 --- a/p2p/net/swarm/swarm_notif_test.go +++ b/p2p/net/swarm/swarm_notif_test.go @@ -4,9 +4,10 @@ import ( "testing" "time" - inet "github.com/jbenet/go-ipfs/p2p/net" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + + inet "github.com/jbenet/go-ipfs/p2p/net" ) func TestNotifications(t *testing.T) { @@ -157,6 +158,8 @@ func TestNotifications(t *testing.T) { } type netNotifiee struct { + listen chan ma.Multiaddr + listenClose chan ma.Multiaddr connected chan inet.Conn disconnected chan inet.Conn openedStream chan inet.Stream @@ -165,6 +168,8 @@ type netNotifiee struct { func newNetNotifiee() *netNotifiee { return &netNotifiee{ + listen: make(chan ma.Multiaddr), + listenClose: make(chan ma.Multiaddr), connected: make(chan inet.Conn), disconnected: make(chan inet.Conn), openedStream: make(chan inet.Stream), @@ -172,6 +177,12 @@ func newNetNotifiee() *netNotifiee { } } +func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) { + nn.listen <- a +} +func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) { + nn.listenClose <- a +} func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { nn.connected <- v } diff --git a/routing/dht/notif.go b/routing/dht/notif.go index 318db12ea..82e097753 100644 --- a/routing/dht/notif.go +++ b/routing/dht/notif.go @@ -1,6 +1,8 @@ package dht import ( + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + inet "github.com/jbenet/go-ipfs/p2p/net" ) @@ -31,3 +33,5 @@ func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) { func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {} func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) {} +func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) {} +func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) {} From d0432f9e75b6bad6c3d496de95496f31851fe8b9 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Fri, 30 Jan 2015 20:19:48 -0800 Subject: [PATCH 2/6] dht: removing addrs sanity check About to allow dht to start without local addresses. this is so that we can initialize the dht and sign it up to listen on the muxer, before our node starts accepting incoming connections. otherwise, we lose some (we're observing this happening already). I looked through the dht's use of the peerstore, and the check here doesnt seem to be as important as the panic implies. I believe the panic was used for debugging weird "dont have any address" conditions we had earlier. --- routing/dht/dht.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 42e0cd7b7..d34ac720b 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -76,11 +76,6 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip return nil }) - // sanity check. this should **never** happen - if len(dht.peerstore.Addresses(dht.self)) < 1 { - panic("attempt to initialize dht without addresses for self") - } - h.SetStreamHandler(ProtocolDHT, dht.handleNewStream) dht.providers = NewProviderManager(dht.Context(), dht.self) dht.AddChildGroup(dht.providers) From d7c9ae12ac4282099b27294be15201eda5077a5a Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Fri, 30 Jan 2015 20:22:56 -0800 Subject: [PATCH 3/6] p2p/nat: make nat mappings closable by client After this commit, client can close individual mappings (without closing the whole NAT) --- p2p/nat/nat.go | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/p2p/nat/nat.go b/p2p/nat/nat.go index 1340d6ee2..48ef0b7ce 100644 --- a/p2p/nat/nat.go +++ b/p2p/nat/nat.go @@ -55,15 +55,16 @@ type NAT struct { proc goprocess.Process // manages nat mappings lifecycle mappingmu sync.RWMutex // guards mappings - mappings []*mapping + mappings map[*mapping]struct{} Notifier } func newNAT(realNAT nat.NAT) *NAT { return &NAT{ - nat: realNAT, - proc: goprocess.WithParent(goprocess.Background()), + nat: realNAT, + proc: goprocess.WithParent(goprocess.Background()), + mappings: make(map[*mapping]struct{}), } } @@ -143,6 +144,9 @@ type Mapping interface { // ExternalAddr returns the external facing address. If the mapping is not // established, addr will be nil, and and ErrNoMapping will be returned. ExternalAddr() (addr ma.Multiaddr, err error) + + // Close closes the port mapping + Close() error } // keeps republishing @@ -230,9 +234,9 @@ func (m *mapping) Close() error { // Mappings returns a slice of all NAT mappings func (nat *NAT) Mappings() []Mapping { nat.mappingmu.Lock() - maps2 := make([]Mapping, len(nat.mappings)) - for i, m := range nat.mappings { - maps2[i] = m + maps2 := make([]Mapping, 0, len(nat.mappings)) + for m := range nat.mappings { + maps2 = append(maps2, m) } nat.mappingmu.Unlock() return maps2 @@ -243,7 +247,13 @@ func (nat *NAT) addMapping(m *mapping) { nat.proc.AddChild(m.proc) nat.mappingmu.Lock() - nat.mappings = append(nat.mappings, m) + nat.mappings[m] = struct{}{} + nat.mappingmu.Unlock() +} + +func (nat *NAT) rmMapping(m *mapping) { + nat.mappingmu.Lock() + delete(nat.mappings, m) nat.mappingmu.Unlock() } @@ -286,10 +296,16 @@ func (nat *NAT) NewMapping(maddr ma.Multiaddr) (Mapping, error) { intport: intport, intaddr: maddr, } - m.proc = periodic.Every(MappingDuration/3, func(worker goprocess.Process) { - nat.establishMapping(m) + m.proc = goprocess.WithTeardown(func() error { + nat.rmMapping(m) + return nil }) nat.addMapping(m) + + m.proc.AddChild(periodic.Every(MappingDuration/3, func(worker goprocess.Process) { + nat.establishMapping(m) + })) + // do it once synchronously, so first mapping is done right away, and before exiting, // allowing users -- in the optimistic case -- to use results right after. nat.establishMapping(m) From 763cc945c01eba903249a9ce23a6c37bbe7f9e0d Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Fri, 30 Jan 2015 20:23:43 -0800 Subject: [PATCH 4/6] p2p/host: nat manager this commit moves management of the nat to its own object. perhaps this can be general enough to work with any host (not just BasicHost), but for now keeping here. the nat manager: - discovers and sets up the nat asynchronously. - adds any port mappings necessary if/after nat has been found. - listens to the network Listen() changes, adding/closing mappings --- p2p/host/basic/basic_host.go | 57 ++------- p2p/host/basic/natmgr.go | 224 +++++++++++++++++++++++++++++++++++ 2 files changed, 237 insertions(+), 44 deletions(-) create mode 100644 p2p/host/basic/natmgr.go diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index c1c7f2eb1..b9314c399 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -1,15 +1,12 @@ package basichost import ( - "sync" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog" - inat "github.com/jbenet/go-ipfs/p2p/nat" inet "github.com/jbenet/go-ipfs/p2p/net" peer "github.com/jbenet/go-ipfs/p2p/peer" protocol "github.com/jbenet/go-ipfs/p2p/protocol" @@ -42,9 +39,7 @@ type BasicHost struct { mux *protocol.Mux ids *identify.IDService relay *relay.RelayService - - natmu sync.Mutex - nat *inat.NAT + natmgr *natManager proc goprocess.Process } @@ -57,6 +52,10 @@ func New(net inet.Network, opts ...Option) *BasicHost { } h.proc = goprocess.WithTeardown(func() error { + if h.natmgr != nil { + h.natmgr.Close() + } + return h.Network().Close() }) @@ -70,45 +69,13 @@ func New(net inet.Network, opts ...Option) *BasicHost { for _, o := range opts { switch o { case NATPortMap: - h.setupNATPortMap() + h.natmgr = newNatManager(h) } } return h } -func (h *BasicHost) setupNATPortMap() { - // do this asynchronously to avoid blocking daemon startup - - h.proc.Go(func(worker goprocess.Process) { - nat := inat.DiscoverNAT() - if nat == nil { // no nat, or failed to get it. - return - } - - select { - case <-worker.Closing(): - nat.Close() - return - default: - } - - // wire up the nat to close when proc closes. - h.proc.AddChild(nat.Process()) - - h.natmu.Lock() - h.nat = nat - h.natmu.Unlock() - - addrs := h.Network().ListenAddresses() - nat.PortMapAddrs(addrs) - mapAddrs := nat.ExternalAddrs() - if len(mapAddrs) > 0 { - log.Infof("NAT mapping addrs: %s", mapAddrs) - } - }) -} - // newConnHandler is the remote-opened conn handler for inet.Network func (h *BasicHost) newConnHandler(c inet.Conn) { h.ids.IdentifyConn(c) @@ -214,17 +181,19 @@ func (h *BasicHost) dialPeer(ctx context.Context, p peer.ID) error { return nil } +// Addrs returns all the addresses of BasicHost at this moment in time. +// It's ok to not include addresses if they're not available to be used now. func (h *BasicHost) Addrs() []ma.Multiaddr { addrs, err := h.Network().InterfaceListenAddresses() if err != nil { log.Debug("error retrieving network interface addrs") } - h.natmu.Lock() - nat := h.nat - h.natmu.Unlock() - if nat != nil { - addrs = append(addrs, nat.ExternalAddrs()...) + if h.natmgr != nil { // natmgr is nil if we do not use nat option. + nat := h.natmgr.NAT() + if nat != nil { // nat is nil if not ready, or no nat is available. + addrs = append(addrs, nat.ExternalAddrs()...) + } } return addrs diff --git a/p2p/host/basic/natmgr.go b/p2p/host/basic/natmgr.go new file mode 100644 index 000000000..492841a9d --- /dev/null +++ b/p2p/host/basic/natmgr.go @@ -0,0 +1,224 @@ +package basichost + +import ( + "sync" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + + inat "github.com/jbenet/go-ipfs/p2p/nat" + inet "github.com/jbenet/go-ipfs/p2p/net" + lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables" +) + +// natManager takes care of adding + removing port mappings to the nat. +// Initialized with the host if it has a NATPortMap option enabled. +// natManager receives signals from the network, and check on nat mappings: +// * natManager listens to the network and adds or closes port mappings +// as the network signals Listen() or ListenClose(). +// * closing the natManager closes the nat and its mappings. +type natManager struct { + host *BasicHost + natmu sync.RWMutex // guards nat (ready could obviate this mutex, but safety first.) + nat *inat.NAT + + ready chan struct{} // closed once the nat is ready to process port mappings + proc goprocess.Process // natManager has a process + children. can be closed. +} + +func newNatManager(host *BasicHost) *natManager { + nmgr := &natManager{ + host: host, + ready: make(chan struct{}), + proc: goprocess.WithParent(host.proc), + } + + // teardown + nmgr.proc = goprocess.WithTeardown(func() error { + // on closing, unregister from network notifications. + host.Network().StopNotify((*nmgrNetNotifiee)(nmgr)) + return nil + }) + + // host is our parent. close when host closes. + host.proc.AddChild(nmgr.proc) + + // discover the nat. + nmgr.discoverNAT() + return nmgr +} + +// Close closes the natManager, closing the underlying nat +// and unregistering from network events. +func (nmgr *natManager) Close() error { + return nmgr.proc.Close() +} + +// Ready returns a channel which will be closed when the NAT has been found +// and is ready to be used, or the search process is done. +func (nmgr *natManager) Ready() <-chan struct{} { + return nmgr.ready +} + +func (nmgr *natManager) discoverNAT() { + + nmgr.proc.Go(func(worker goprocess.Process) { + // inat.DiscoverNAT blocks until the nat is found or a timeout + // is reached. we unfortunately cannot specify timeouts-- the + // library we're using just blocks. + // + // Note: on early shutdown, there may be a case where we're trying + // to close before DiscoverNAT() returns. Since we cant cancel it + // (library) we can choose to (1) drop the result and return early, + // or (2) wait until it times out to exit. For now we choose (2), + // to avoid leaking resources in a non-obvious way. the only case + // this affects is when the daemon is being started up and _immediately_ + // asked to close. other services are also starting up, so ok to wait. + nat := inat.DiscoverNAT() + if nat == nil { // no nat, or failed to get it. + return + } + + // by this point -- after finding the NAT -- we may have already + // be closing. if so, just exit. + select { + case <-worker.Closing(): + nat.Close() + return + default: + } + + // wire up the nat to close when nmgr closes. + // nmgr.proc is our parent, and waiting for us. + nmgr.proc.AddChild(nat.Process()) + + // set the nat. + nmgr.natmu.Lock() + nmgr.nat = nat + nmgr.natmu.Unlock() + + // signal that we're ready to process nat mappings: + close(nmgr.ready) + + // sign natManager up for network notifications + // we need to sign up here to avoid missing some notifs + // before the NAT has been found. + nmgr.host.Network().Notify((*nmgrNetNotifiee)(nmgr)) + + // if any interfaces were brought up while we were setting up + // the nat, now is the time to setup port mappings for them. + // we release ready, then grab them to avoid losing any. adding + // a port mapping is idempotent, so its ok to add the same twice. + addrs := nmgr.host.Network().ListenAddresses() + for _, addr := range addrs { + // we do it async because it's slow and we may want to close beforehand + go addPortMapping(nmgr, addr) + } + }) +} + +// NAT returns the natManager's nat object. this may be nil, if +// (a) the search process is still ongoing, or (b) the search process +// found no nat. Clients must check whether the return value is nil. +func (nmgr *natManager) NAT() *inat.NAT { + nmgr.natmu.Lock() + defer nmgr.natmu.Unlock() + return nmgr.nat +} + +func addPortMapping(nmgr *natManager, intaddr ma.Multiaddr) { + nat := nmgr.NAT() + if nat == nil { + panic("natManager addPortMapping called without a nat.") + } + + // first, check if the port mapping already exists. + for _, mapping := range nat.Mappings() { + if mapping.InternalAddr().Equal(intaddr) { + return // it exists! return. + } + } + + ctx := context.TODO() + lm := make(lgbl.DeferredMap) + lm["internalAddr"] = func() interface{} { return intaddr.String() } + + defer log.EventBegin(ctx, "natMgrAddPortMappingWait", lm).Done() + + select { + case <-nmgr.proc.Closing(): + lm["outcome"] = "cancelled" + return // no use. + case <-nmgr.ready: // wait until it's ready. + } + + // actually start the port map (sub-event because waiting may take a while) + defer log.EventBegin(ctx, "natMgrAddPortMapping", lm).Done() + + // get the nat + m, err := nat.NewMapping(intaddr) + if err != nil { + lm["outcome"] = "failure" + lm["error"] = err + return + } + + extaddr, err := m.ExternalAddr() + if err != nil { + lm["outcome"] = "failure" + lm["error"] = err + return + } + + lm["outcome"] = "success" + lm["externalAddr"] = func() interface{} { return extaddr.String() } + log.Infof("established nat port mapping: %s <--> %s", intaddr, extaddr) +} + +func rmPortMapping(nmgr *natManager, intaddr ma.Multiaddr) { + nat := nmgr.NAT() + if nat == nil { + panic("natManager rmPortMapping called without a nat.") + } + + // list the port mappings (it may be gone on it's own, so we need to + // check this list, and not store it ourselves behind the scenes) + + // close mappings for this internal address. + for _, mapping := range nat.Mappings() { + if mapping.InternalAddr().Equal(intaddr) { + mapping.Close() + } + } +} + +// nmgrNetNotifiee implements the network notification listening part +// of the natManager. this is merely listening to Listen() and ListenClose() +// events. +type nmgrNetNotifiee natManager + +func (nn *nmgrNetNotifiee) natManager() *natManager { + return (*natManager)(nn) +} + +func (nn *nmgrNetNotifiee) Listen(n inet.Network, addr ma.Multiaddr) { + if nn.natManager().NAT() == nil { + return // not ready or doesnt exist. + } + + addPortMapping(nn.natManager(), addr) +} + +func (nn *nmgrNetNotifiee) ListenClose(n inet.Network, addr ma.Multiaddr) { + if nn.natManager().NAT() == nil { + return // not ready or doesnt exist. + } + + rmPortMapping(nn.natManager(), addr) +} + +func (nn *nmgrNetNotifiee) Connected(inet.Network, inet.Conn) {} +func (nn *nmgrNetNotifiee) Disconnected(inet.Network, inet.Conn) {} +func (nn *nmgrNetNotifiee) OpenedStream(inet.Network, inet.Stream) {} +func (nn *nmgrNetNotifiee) ClosedStream(inet.Network, inet.Stream) {} From 1a3752b81f939b80a6429e170b9acad4e06db92e Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Fri, 30 Jan 2015 20:25:48 -0800 Subject: [PATCH 5/6] core: setup peerhost + listen separate steps We had a problem: we were starting all the services with the network live, and so would miss early messages. We were noticing bitswap messages not handled (not in muxer). Many of the subsystems expect the network to _exist_ when they start up, so we split construction and starting to listen into two separate steps. --- core/core.go | 75 +++++++++++++++++++++++++++++++++------------------- 1 file changed, 48 insertions(+), 27 deletions(-) diff --git a/core/core.go b/core/core.go index af95cac98..058a68b39 100644 --- a/core/core.go +++ b/core/core.go @@ -214,30 +214,39 @@ func (n *IpfsNode) StartOnlineServices(ctx context.Context) error { return err } - peerhost, err := constructPeerHost(ctx, n.Repo.Config(), n.Identity, n.Peerstore) + peerhost, err := constructPeerHost(ctx, n.Identity, n.Peerstore) if err != nil { return debugerror.Wrap(err) } n.PeerHost = peerhost - // setup diagnostics service - n.Diagnostics = diag.NewDiagnostics(n.Identity, n.PeerHost) + // this block is the set of services which need to be initialized with the host + // and _before_ we start listening. + { + // setup diagnostics service + n.Diagnostics = diag.NewDiagnostics(n.Identity, n.PeerHost) - // setup routing service - dhtRouting, err := constructDHTRouting(ctx, n.PeerHost, n.Repo.Datastore()) - if err != nil { + // setup routing service + dhtRouting, err := constructDHTRouting(ctx, n.PeerHost, n.Repo.Datastore()) + if err != nil { + return debugerror.Wrap(err) + } + n.Routing = dhtRouting + + // setup exchange service + const alwaysSendToPeer = true // use YesManStrategy + bitswapNetwork := bsnet.NewFromIpfsHost(n.PeerHost, n.Routing) + n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Blockstore, alwaysSendToPeer) + + // setup name system + // TODO implement an offline namesys that serves only local names. + n.Namesys = namesys.NewNameSystem(n.Routing) + } + + // Ok, now we're ready to listen. + if err := startListening(ctx, n.PeerHost, n.Repo.Config()); err != nil { return debugerror.Wrap(err) } - n.Routing = dhtRouting - - // setup exchange service - const alwaysSendToPeer = true // use YesManStrategy - bitswapNetwork := bsnet.NewFromIpfsHost(n.PeerHost, n.Routing) - n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Blockstore, alwaysSendToPeer) - - // setup name system - // TODO implement an offline namesys that serves only local names. - n.Namesys = namesys.NewNameSystem(n.Routing) n.Reprovider = rp.NewReprovider(n.Routing, n.Blockstore) go n.Reprovider.ProvideEvery(ctx, kReprovideFrequency) @@ -405,37 +414,49 @@ func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) { } // isolates the complex initialization steps -func constructPeerHost(ctx context.Context, cfg *config.Config, id peer.ID, ps peer.Peerstore) (p2phost.Host, error) { - listenAddrs, err := listenAddresses(cfg) +func constructPeerHost(ctx context.Context, id peer.ID, ps peer.Peerstore) (p2phost.Host, error) { + + // no addresses to begin with. we'll start later. + network, err := swarm.NewNetwork(ctx, nil, id, ps) if err != nil { return nil, debugerror.Wrap(err) } + host := p2pbhost.New(network, p2pbhost.NATPortMap) + return host, nil +} + +// startListening on the network addresses +func startListening(ctx context.Context, host p2phost.Host, cfg *config.Config) error { + listenAddrs, err := listenAddresses(cfg) + if err != nil { + return debugerror.Wrap(err) + } + // make sure we error out if our config does not have addresses we can use log.Debugf("Config.Addresses.Swarm:%s", listenAddrs) filteredAddrs := addrutil.FilterUsableAddrs(listenAddrs) log.Debugf("Config.Addresses.Swarm:%s (filtered)", filteredAddrs) if len(filteredAddrs) < 1 { - return nil, debugerror.Errorf("addresses in config not usable: %s", listenAddrs) + return debugerror.Errorf("addresses in config not usable: %s", listenAddrs) } - network, err := swarm.NewNetwork(ctx, filteredAddrs, id, ps) - if err != nil { - return nil, debugerror.Wrap(err) + // Actually start listening: + if err := host.Network().Listen(filteredAddrs...); err != nil { + return err } - peerhost := p2pbhost.New(network, p2pbhost.NATPortMap) // explicitly set these as our listen addrs. // (why not do it inside inet.NewNetwork? because this way we can // listen on addresses without necessarily advertising those publicly.) - addrs, err := peerhost.Network().InterfaceListenAddresses() + addrs, err := host.Network().InterfaceListenAddresses() if err != nil { - return nil, debugerror.Wrap(err) + return debugerror.Wrap(err) } log.Infof("Swarm listening at: %s", addrs) - ps.AddAddresses(id, addrs) - return peerhost, nil + host.Peerstore().AddAddresses(host.ID(), addrs) + return nil } func constructDHTRouting(ctx context.Context, host p2phost.Host, ds datastore.ThreadSafeDatastore) (*dht.IpfsDHT, error) { From 7b85579d7a00a01febc0073a76ce49552ba70f46 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 31 Jan 2015 17:20:34 -0800 Subject: [PATCH 6/6] core: move online service init block into own func addresses CR comments --- core/core.go | 46 +++++++++++++++++++++++++--------------------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/core/core.go b/core/core.go index 058a68b39..93411c4a4 100644 --- a/core/core.go +++ b/core/core.go @@ -220,27 +220,8 @@ func (n *IpfsNode) StartOnlineServices(ctx context.Context) error { } n.PeerHost = peerhost - // this block is the set of services which need to be initialized with the host - // and _before_ we start listening. - { - // setup diagnostics service - n.Diagnostics = diag.NewDiagnostics(n.Identity, n.PeerHost) - - // setup routing service - dhtRouting, err := constructDHTRouting(ctx, n.PeerHost, n.Repo.Datastore()) - if err != nil { - return debugerror.Wrap(err) - } - n.Routing = dhtRouting - - // setup exchange service - const alwaysSendToPeer = true // use YesManStrategy - bitswapNetwork := bsnet.NewFromIpfsHost(n.PeerHost, n.Routing) - n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Blockstore, alwaysSendToPeer) - - // setup name system - // TODO implement an offline namesys that serves only local names. - n.Namesys = namesys.NewNameSystem(n.Routing) + if err := n.startOnlineServicesWithHost(ctx); err != nil { + return err } // Ok, now we're ready to listen. @@ -254,6 +235,29 @@ func (n *IpfsNode) StartOnlineServices(ctx context.Context) error { return n.Bootstrap(DefaultBootstrapConfig) } +// startOnlineServicesWithHost is the set of services which need to be +// initialized with the host and _before_ we start listening. +func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context) error { + // setup diagnostics service + n.Diagnostics = diag.NewDiagnostics(n.Identity, n.PeerHost) + + // setup routing service + dhtRouting, err := constructDHTRouting(ctx, n.PeerHost, n.Repo.Datastore()) + if err != nil { + return debugerror.Wrap(err) + } + n.Routing = dhtRouting + + // setup exchange service + const alwaysSendToPeer = true // use YesManStrategy + bitswapNetwork := bsnet.NewFromIpfsHost(n.PeerHost, n.Routing) + n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Blockstore, alwaysSendToPeer) + + // setup name system + n.Namesys = namesys.NewNameSystem(n.Routing) + return nil +} + // teardown closes owned children. If any errors occur, this function returns // the first error. func (n *IpfsNode) teardown() error {