mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-01 14:28:02 +08:00
Merge pull request #703 from jbenet/net-set-listener
online services started before network listens
This commit is contained in:
commit
f43cf2d917
61
core/core.go
61
core/core.go
@ -214,12 +214,30 @@ func (n *IpfsNode) StartOnlineServices(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
peerhost, err := constructPeerHost(ctx, n.Repo.Config(), n.Identity, n.Peerstore)
|
||||
peerhost, err := constructPeerHost(ctx, n.Identity, n.Peerstore)
|
||||
if err != nil {
|
||||
return debugerror.Wrap(err)
|
||||
}
|
||||
n.PeerHost = peerhost
|
||||
|
||||
if err := n.startOnlineServicesWithHost(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Ok, now we're ready to listen.
|
||||
if err := startListening(ctx, n.PeerHost, n.Repo.Config()); err != nil {
|
||||
return debugerror.Wrap(err)
|
||||
}
|
||||
|
||||
n.Reprovider = rp.NewReprovider(n.Routing, n.Blockstore)
|
||||
go n.Reprovider.ProvideEvery(ctx, kReprovideFrequency)
|
||||
|
||||
return n.Bootstrap(DefaultBootstrapConfig)
|
||||
}
|
||||
|
||||
// startOnlineServicesWithHost is the set of services which need to be
|
||||
// initialized with the host and _before_ we start listening.
|
||||
func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context) error {
|
||||
// setup diagnostics service
|
||||
n.Diagnostics = diag.NewDiagnostics(n.Identity, n.PeerHost)
|
||||
|
||||
@ -236,13 +254,8 @@ func (n *IpfsNode) StartOnlineServices(ctx context.Context) error {
|
||||
n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Blockstore, alwaysSendToPeer)
|
||||
|
||||
// setup name system
|
||||
// TODO implement an offline namesys that serves only local names.
|
||||
n.Namesys = namesys.NewNameSystem(n.Routing)
|
||||
|
||||
n.Reprovider = rp.NewReprovider(n.Routing, n.Blockstore)
|
||||
go n.Reprovider.ProvideEvery(ctx, kReprovideFrequency)
|
||||
|
||||
return n.Bootstrap(DefaultBootstrapConfig)
|
||||
return nil
|
||||
}
|
||||
|
||||
// teardown closes owned children. If any errors occur, this function returns
|
||||
@ -405,37 +418,49 @@ func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) {
|
||||
}
|
||||
|
||||
// isolates the complex initialization steps
|
||||
func constructPeerHost(ctx context.Context, cfg *config.Config, id peer.ID, ps peer.Peerstore) (p2phost.Host, error) {
|
||||
listenAddrs, err := listenAddresses(cfg)
|
||||
func constructPeerHost(ctx context.Context, id peer.ID, ps peer.Peerstore) (p2phost.Host, error) {
|
||||
|
||||
// no addresses to begin with. we'll start later.
|
||||
network, err := swarm.NewNetwork(ctx, nil, id, ps)
|
||||
if err != nil {
|
||||
return nil, debugerror.Wrap(err)
|
||||
}
|
||||
|
||||
host := p2pbhost.New(network, p2pbhost.NATPortMap)
|
||||
return host, nil
|
||||
}
|
||||
|
||||
// startListening on the network addresses
|
||||
func startListening(ctx context.Context, host p2phost.Host, cfg *config.Config) error {
|
||||
listenAddrs, err := listenAddresses(cfg)
|
||||
if err != nil {
|
||||
return debugerror.Wrap(err)
|
||||
}
|
||||
|
||||
// make sure we error out if our config does not have addresses we can use
|
||||
log.Debugf("Config.Addresses.Swarm:%s", listenAddrs)
|
||||
filteredAddrs := addrutil.FilterUsableAddrs(listenAddrs)
|
||||
log.Debugf("Config.Addresses.Swarm:%s (filtered)", filteredAddrs)
|
||||
if len(filteredAddrs) < 1 {
|
||||
return nil, debugerror.Errorf("addresses in config not usable: %s", listenAddrs)
|
||||
return debugerror.Errorf("addresses in config not usable: %s", listenAddrs)
|
||||
}
|
||||
|
||||
network, err := swarm.NewNetwork(ctx, filteredAddrs, id, ps)
|
||||
if err != nil {
|
||||
return nil, debugerror.Wrap(err)
|
||||
// Actually start listening:
|
||||
if err := host.Network().Listen(filteredAddrs...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
peerhost := p2pbhost.New(network, p2pbhost.NATPortMap)
|
||||
// explicitly set these as our listen addrs.
|
||||
// (why not do it inside inet.NewNetwork? because this way we can
|
||||
// listen on addresses without necessarily advertising those publicly.)
|
||||
addrs, err := peerhost.Network().InterfaceListenAddresses()
|
||||
addrs, err := host.Network().InterfaceListenAddresses()
|
||||
if err != nil {
|
||||
return nil, debugerror.Wrap(err)
|
||||
return debugerror.Wrap(err)
|
||||
}
|
||||
log.Infof("Swarm listening at: %s", addrs)
|
||||
|
||||
ps.AddAddresses(id, addrs)
|
||||
return peerhost, nil
|
||||
host.Peerstore().AddAddresses(host.ID(), addrs)
|
||||
return nil
|
||||
}
|
||||
|
||||
func constructDHTRouting(ctx context.Context, host p2phost.Host, ds datastore.ThreadSafeDatastore) (*dht.IpfsDHT, error) {
|
||||
|
||||
@ -2,6 +2,7 @@ package network
|
||||
|
||||
import (
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
|
||||
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
||||
host "github.com/jbenet/go-ipfs/p2p/host"
|
||||
@ -171,3 +172,5 @@ func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
|
||||
|
||||
func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {}
|
||||
func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) {}
|
||||
func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) {}
|
||||
func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) {}
|
||||
|
||||
@ -1,15 +1,12 @@
|
||||
package basichost
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
||||
|
||||
eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
|
||||
|
||||
inat "github.com/jbenet/go-ipfs/p2p/nat"
|
||||
inet "github.com/jbenet/go-ipfs/p2p/net"
|
||||
peer "github.com/jbenet/go-ipfs/p2p/peer"
|
||||
protocol "github.com/jbenet/go-ipfs/p2p/protocol"
|
||||
@ -42,9 +39,7 @@ type BasicHost struct {
|
||||
mux *protocol.Mux
|
||||
ids *identify.IDService
|
||||
relay *relay.RelayService
|
||||
|
||||
natmu sync.Mutex
|
||||
nat *inat.NAT
|
||||
natmgr *natManager
|
||||
|
||||
proc goprocess.Process
|
||||
}
|
||||
@ -57,6 +52,10 @@ func New(net inet.Network, opts ...Option) *BasicHost {
|
||||
}
|
||||
|
||||
h.proc = goprocess.WithTeardown(func() error {
|
||||
if h.natmgr != nil {
|
||||
h.natmgr.Close()
|
||||
}
|
||||
|
||||
return h.Network().Close()
|
||||
})
|
||||
|
||||
@ -70,45 +69,13 @@ func New(net inet.Network, opts ...Option) *BasicHost {
|
||||
for _, o := range opts {
|
||||
switch o {
|
||||
case NATPortMap:
|
||||
h.setupNATPortMap()
|
||||
h.natmgr = newNatManager(h)
|
||||
}
|
||||
}
|
||||
|
||||
return h
|
||||
}
|
||||
|
||||
func (h *BasicHost) setupNATPortMap() {
|
||||
// do this asynchronously to avoid blocking daemon startup
|
||||
|
||||
h.proc.Go(func(worker goprocess.Process) {
|
||||
nat := inat.DiscoverNAT()
|
||||
if nat == nil { // no nat, or failed to get it.
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-worker.Closing():
|
||||
nat.Close()
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// wire up the nat to close when proc closes.
|
||||
h.proc.AddChild(nat.Process())
|
||||
|
||||
h.natmu.Lock()
|
||||
h.nat = nat
|
||||
h.natmu.Unlock()
|
||||
|
||||
addrs := h.Network().ListenAddresses()
|
||||
nat.PortMapAddrs(addrs)
|
||||
mapAddrs := nat.ExternalAddrs()
|
||||
if len(mapAddrs) > 0 {
|
||||
log.Infof("NAT mapping addrs: %s", mapAddrs)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// newConnHandler is the remote-opened conn handler for inet.Network
|
||||
func (h *BasicHost) newConnHandler(c inet.Conn) {
|
||||
h.ids.IdentifyConn(c)
|
||||
@ -214,17 +181,19 @@ func (h *BasicHost) dialPeer(ctx context.Context, p peer.ID) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Addrs returns all the addresses of BasicHost at this moment in time.
|
||||
// It's ok to not include addresses if they're not available to be used now.
|
||||
func (h *BasicHost) Addrs() []ma.Multiaddr {
|
||||
addrs, err := h.Network().InterfaceListenAddresses()
|
||||
if err != nil {
|
||||
log.Debug("error retrieving network interface addrs")
|
||||
}
|
||||
|
||||
h.natmu.Lock()
|
||||
nat := h.nat
|
||||
h.natmu.Unlock()
|
||||
if nat != nil {
|
||||
addrs = append(addrs, nat.ExternalAddrs()...)
|
||||
if h.natmgr != nil { // natmgr is nil if we do not use nat option.
|
||||
nat := h.natmgr.NAT()
|
||||
if nat != nil { // nat is nil if not ready, or no nat is available.
|
||||
addrs = append(addrs, nat.ExternalAddrs()...)
|
||||
}
|
||||
}
|
||||
|
||||
return addrs
|
||||
|
||||
224
p2p/host/basic/natmgr.go
Normal file
224
p2p/host/basic/natmgr.go
Normal file
@ -0,0 +1,224 @@
|
||||
package basichost
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
||||
|
||||
inat "github.com/jbenet/go-ipfs/p2p/nat"
|
||||
inet "github.com/jbenet/go-ipfs/p2p/net"
|
||||
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
|
||||
)
|
||||
|
||||
// natManager takes care of adding + removing port mappings to the nat.
|
||||
// Initialized with the host if it has a NATPortMap option enabled.
|
||||
// natManager receives signals from the network, and check on nat mappings:
|
||||
// * natManager listens to the network and adds or closes port mappings
|
||||
// as the network signals Listen() or ListenClose().
|
||||
// * closing the natManager closes the nat and its mappings.
|
||||
type natManager struct {
|
||||
host *BasicHost
|
||||
natmu sync.RWMutex // guards nat (ready could obviate this mutex, but safety first.)
|
||||
nat *inat.NAT
|
||||
|
||||
ready chan struct{} // closed once the nat is ready to process port mappings
|
||||
proc goprocess.Process // natManager has a process + children. can be closed.
|
||||
}
|
||||
|
||||
func newNatManager(host *BasicHost) *natManager {
|
||||
nmgr := &natManager{
|
||||
host: host,
|
||||
ready: make(chan struct{}),
|
||||
proc: goprocess.WithParent(host.proc),
|
||||
}
|
||||
|
||||
// teardown
|
||||
nmgr.proc = goprocess.WithTeardown(func() error {
|
||||
// on closing, unregister from network notifications.
|
||||
host.Network().StopNotify((*nmgrNetNotifiee)(nmgr))
|
||||
return nil
|
||||
})
|
||||
|
||||
// host is our parent. close when host closes.
|
||||
host.proc.AddChild(nmgr.proc)
|
||||
|
||||
// discover the nat.
|
||||
nmgr.discoverNAT()
|
||||
return nmgr
|
||||
}
|
||||
|
||||
// Close closes the natManager, closing the underlying nat
|
||||
// and unregistering from network events.
|
||||
func (nmgr *natManager) Close() error {
|
||||
return nmgr.proc.Close()
|
||||
}
|
||||
|
||||
// Ready returns a channel which will be closed when the NAT has been found
|
||||
// and is ready to be used, or the search process is done.
|
||||
func (nmgr *natManager) Ready() <-chan struct{} {
|
||||
return nmgr.ready
|
||||
}
|
||||
|
||||
func (nmgr *natManager) discoverNAT() {
|
||||
|
||||
nmgr.proc.Go(func(worker goprocess.Process) {
|
||||
// inat.DiscoverNAT blocks until the nat is found or a timeout
|
||||
// is reached. we unfortunately cannot specify timeouts-- the
|
||||
// library we're using just blocks.
|
||||
//
|
||||
// Note: on early shutdown, there may be a case where we're trying
|
||||
// to close before DiscoverNAT() returns. Since we cant cancel it
|
||||
// (library) we can choose to (1) drop the result and return early,
|
||||
// or (2) wait until it times out to exit. For now we choose (2),
|
||||
// to avoid leaking resources in a non-obvious way. the only case
|
||||
// this affects is when the daemon is being started up and _immediately_
|
||||
// asked to close. other services are also starting up, so ok to wait.
|
||||
nat := inat.DiscoverNAT()
|
||||
if nat == nil { // no nat, or failed to get it.
|
||||
return
|
||||
}
|
||||
|
||||
// by this point -- after finding the NAT -- we may have already
|
||||
// be closing. if so, just exit.
|
||||
select {
|
||||
case <-worker.Closing():
|
||||
nat.Close()
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// wire up the nat to close when nmgr closes.
|
||||
// nmgr.proc is our parent, and waiting for us.
|
||||
nmgr.proc.AddChild(nat.Process())
|
||||
|
||||
// set the nat.
|
||||
nmgr.natmu.Lock()
|
||||
nmgr.nat = nat
|
||||
nmgr.natmu.Unlock()
|
||||
|
||||
// signal that we're ready to process nat mappings:
|
||||
close(nmgr.ready)
|
||||
|
||||
// sign natManager up for network notifications
|
||||
// we need to sign up here to avoid missing some notifs
|
||||
// before the NAT has been found.
|
||||
nmgr.host.Network().Notify((*nmgrNetNotifiee)(nmgr))
|
||||
|
||||
// if any interfaces were brought up while we were setting up
|
||||
// the nat, now is the time to setup port mappings for them.
|
||||
// we release ready, then grab them to avoid losing any. adding
|
||||
// a port mapping is idempotent, so its ok to add the same twice.
|
||||
addrs := nmgr.host.Network().ListenAddresses()
|
||||
for _, addr := range addrs {
|
||||
// we do it async because it's slow and we may want to close beforehand
|
||||
go addPortMapping(nmgr, addr)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// NAT returns the natManager's nat object. this may be nil, if
|
||||
// (a) the search process is still ongoing, or (b) the search process
|
||||
// found no nat. Clients must check whether the return value is nil.
|
||||
func (nmgr *natManager) NAT() *inat.NAT {
|
||||
nmgr.natmu.Lock()
|
||||
defer nmgr.natmu.Unlock()
|
||||
return nmgr.nat
|
||||
}
|
||||
|
||||
func addPortMapping(nmgr *natManager, intaddr ma.Multiaddr) {
|
||||
nat := nmgr.NAT()
|
||||
if nat == nil {
|
||||
panic("natManager addPortMapping called without a nat.")
|
||||
}
|
||||
|
||||
// first, check if the port mapping already exists.
|
||||
for _, mapping := range nat.Mappings() {
|
||||
if mapping.InternalAddr().Equal(intaddr) {
|
||||
return // it exists! return.
|
||||
}
|
||||
}
|
||||
|
||||
ctx := context.TODO()
|
||||
lm := make(lgbl.DeferredMap)
|
||||
lm["internalAddr"] = func() interface{} { return intaddr.String() }
|
||||
|
||||
defer log.EventBegin(ctx, "natMgrAddPortMappingWait", lm).Done()
|
||||
|
||||
select {
|
||||
case <-nmgr.proc.Closing():
|
||||
lm["outcome"] = "cancelled"
|
||||
return // no use.
|
||||
case <-nmgr.ready: // wait until it's ready.
|
||||
}
|
||||
|
||||
// actually start the port map (sub-event because waiting may take a while)
|
||||
defer log.EventBegin(ctx, "natMgrAddPortMapping", lm).Done()
|
||||
|
||||
// get the nat
|
||||
m, err := nat.NewMapping(intaddr)
|
||||
if err != nil {
|
||||
lm["outcome"] = "failure"
|
||||
lm["error"] = err
|
||||
return
|
||||
}
|
||||
|
||||
extaddr, err := m.ExternalAddr()
|
||||
if err != nil {
|
||||
lm["outcome"] = "failure"
|
||||
lm["error"] = err
|
||||
return
|
||||
}
|
||||
|
||||
lm["outcome"] = "success"
|
||||
lm["externalAddr"] = func() interface{} { return extaddr.String() }
|
||||
log.Infof("established nat port mapping: %s <--> %s", intaddr, extaddr)
|
||||
}
|
||||
|
||||
func rmPortMapping(nmgr *natManager, intaddr ma.Multiaddr) {
|
||||
nat := nmgr.NAT()
|
||||
if nat == nil {
|
||||
panic("natManager rmPortMapping called without a nat.")
|
||||
}
|
||||
|
||||
// list the port mappings (it may be gone on it's own, so we need to
|
||||
// check this list, and not store it ourselves behind the scenes)
|
||||
|
||||
// close mappings for this internal address.
|
||||
for _, mapping := range nat.Mappings() {
|
||||
if mapping.InternalAddr().Equal(intaddr) {
|
||||
mapping.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// nmgrNetNotifiee implements the network notification listening part
|
||||
// of the natManager. this is merely listening to Listen() and ListenClose()
|
||||
// events.
|
||||
type nmgrNetNotifiee natManager
|
||||
|
||||
func (nn *nmgrNetNotifiee) natManager() *natManager {
|
||||
return (*natManager)(nn)
|
||||
}
|
||||
|
||||
func (nn *nmgrNetNotifiee) Listen(n inet.Network, addr ma.Multiaddr) {
|
||||
if nn.natManager().NAT() == nil {
|
||||
return // not ready or doesnt exist.
|
||||
}
|
||||
|
||||
addPortMapping(nn.natManager(), addr)
|
||||
}
|
||||
|
||||
func (nn *nmgrNetNotifiee) ListenClose(n inet.Network, addr ma.Multiaddr) {
|
||||
if nn.natManager().NAT() == nil {
|
||||
return // not ready or doesnt exist.
|
||||
}
|
||||
|
||||
rmPortMapping(nn.natManager(), addr)
|
||||
}
|
||||
|
||||
func (nn *nmgrNetNotifiee) Connected(inet.Network, inet.Conn) {}
|
||||
func (nn *nmgrNetNotifiee) Disconnected(inet.Network, inet.Conn) {}
|
||||
func (nn *nmgrNetNotifiee) OpenedStream(inet.Network, inet.Stream) {}
|
||||
func (nn *nmgrNetNotifiee) ClosedStream(inet.Network, inet.Stream) {}
|
||||
@ -55,15 +55,16 @@ type NAT struct {
|
||||
proc goprocess.Process // manages nat mappings lifecycle
|
||||
|
||||
mappingmu sync.RWMutex // guards mappings
|
||||
mappings []*mapping
|
||||
mappings map[*mapping]struct{}
|
||||
|
||||
Notifier
|
||||
}
|
||||
|
||||
func newNAT(realNAT nat.NAT) *NAT {
|
||||
return &NAT{
|
||||
nat: realNAT,
|
||||
proc: goprocess.WithParent(goprocess.Background()),
|
||||
nat: realNAT,
|
||||
proc: goprocess.WithParent(goprocess.Background()),
|
||||
mappings: make(map[*mapping]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
@ -143,6 +144,9 @@ type Mapping interface {
|
||||
// ExternalAddr returns the external facing address. If the mapping is not
|
||||
// established, addr will be nil, and and ErrNoMapping will be returned.
|
||||
ExternalAddr() (addr ma.Multiaddr, err error)
|
||||
|
||||
// Close closes the port mapping
|
||||
Close() error
|
||||
}
|
||||
|
||||
// keeps republishing
|
||||
@ -230,9 +234,9 @@ func (m *mapping) Close() error {
|
||||
// Mappings returns a slice of all NAT mappings
|
||||
func (nat *NAT) Mappings() []Mapping {
|
||||
nat.mappingmu.Lock()
|
||||
maps2 := make([]Mapping, len(nat.mappings))
|
||||
for i, m := range nat.mappings {
|
||||
maps2[i] = m
|
||||
maps2 := make([]Mapping, 0, len(nat.mappings))
|
||||
for m := range nat.mappings {
|
||||
maps2 = append(maps2, m)
|
||||
}
|
||||
nat.mappingmu.Unlock()
|
||||
return maps2
|
||||
@ -243,7 +247,13 @@ func (nat *NAT) addMapping(m *mapping) {
|
||||
nat.proc.AddChild(m.proc)
|
||||
|
||||
nat.mappingmu.Lock()
|
||||
nat.mappings = append(nat.mappings, m)
|
||||
nat.mappings[m] = struct{}{}
|
||||
nat.mappingmu.Unlock()
|
||||
}
|
||||
|
||||
func (nat *NAT) rmMapping(m *mapping) {
|
||||
nat.mappingmu.Lock()
|
||||
delete(nat.mappings, m)
|
||||
nat.mappingmu.Unlock()
|
||||
}
|
||||
|
||||
@ -286,10 +296,16 @@ func (nat *NAT) NewMapping(maddr ma.Multiaddr) (Mapping, error) {
|
||||
intport: intport,
|
||||
intaddr: maddr,
|
||||
}
|
||||
m.proc = periodic.Every(MappingDuration/3, func(worker goprocess.Process) {
|
||||
nat.establishMapping(m)
|
||||
m.proc = goprocess.WithTeardown(func() error {
|
||||
nat.rmMapping(m)
|
||||
return nil
|
||||
})
|
||||
nat.addMapping(m)
|
||||
|
||||
m.proc.AddChild(periodic.Every(MappingDuration/3, func(worker goprocess.Process) {
|
||||
nat.establishMapping(m)
|
||||
}))
|
||||
|
||||
// do it once synchronously, so first mapping is done right away, and before exiting,
|
||||
// allowing users -- in the optimistic case -- to use results right after.
|
||||
nat.establishMapping(m)
|
||||
|
||||
@ -142,10 +142,12 @@ const (
|
||||
// Notifiee is an interface for an object wishing to receive
|
||||
// notifications from a Network.
|
||||
type Notifiee interface {
|
||||
Connected(Network, Conn) // called when a connection opened
|
||||
Disconnected(Network, Conn) // called when a connection closed
|
||||
OpenedStream(Network, Stream) // called when a stream opened
|
||||
ClosedStream(Network, Stream) // called when a stream closed
|
||||
Listen(Network, ma.Multiaddr) // called when network starts listening on an addr
|
||||
ListenClose(Network, ma.Multiaddr) // called when network starts listening on an addr
|
||||
Connected(Network, Conn) // called when a connection opened
|
||||
Disconnected(Network, Conn) // called when a connection closed
|
||||
OpenedStream(Network, Stream) // called when a stream opened
|
||||
ClosedStream(Network, Stream) // called when a stream closed
|
||||
|
||||
// TODO
|
||||
// PeerConnected(Network, peer.ID) // called when a peer connected
|
||||
|
||||
@ -4,9 +4,10 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
inet "github.com/jbenet/go-ipfs/p2p/net"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
|
||||
inet "github.com/jbenet/go-ipfs/p2p/net"
|
||||
)
|
||||
|
||||
func TestNotifications(t *testing.T) {
|
||||
@ -169,6 +170,8 @@ func TestNotifications(t *testing.T) {
|
||||
}
|
||||
|
||||
type netNotifiee struct {
|
||||
listen chan ma.Multiaddr
|
||||
listenClose chan ma.Multiaddr
|
||||
connected chan inet.Conn
|
||||
disconnected chan inet.Conn
|
||||
openedStream chan inet.Stream
|
||||
@ -177,6 +180,8 @@ type netNotifiee struct {
|
||||
|
||||
func newNetNotifiee() *netNotifiee {
|
||||
return &netNotifiee{
|
||||
listen: make(chan ma.Multiaddr),
|
||||
listenClose: make(chan ma.Multiaddr),
|
||||
connected: make(chan inet.Conn),
|
||||
disconnected: make(chan inet.Conn),
|
||||
openedStream: make(chan inet.Stream),
|
||||
@ -184,6 +189,12 @@ func newNetNotifiee() *netNotifiee {
|
||||
}
|
||||
}
|
||||
|
||||
func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) {
|
||||
nn.listen <- a
|
||||
}
|
||||
func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) {
|
||||
nn.listenClose <- a
|
||||
}
|
||||
func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
|
||||
nn.connected <- v
|
||||
}
|
||||
|
||||
@ -202,6 +202,15 @@ func (s *Swarm) LocalPeer() peer.ID {
|
||||
return s.local
|
||||
}
|
||||
|
||||
// notifyAll sends a signal to all Notifiees
|
||||
func (s *Swarm) notifyAll(notify func(inet.Notifiee)) {
|
||||
s.notifmu.RLock()
|
||||
for f := range s.notifs {
|
||||
go notify(f)
|
||||
}
|
||||
s.notifmu.RUnlock()
|
||||
}
|
||||
|
||||
// Notify signs up Notifiee to receive signals when events happen
|
||||
func (s *Swarm) Notify(f inet.Notifiee) {
|
||||
// wrap with our notifiee, to translate function calls
|
||||
|
||||
@ -382,7 +382,7 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote
|
||||
for i := 0; i < len(remoteAddrs); i++ {
|
||||
select {
|
||||
case err = <-errs:
|
||||
log.Info(err)
|
||||
log.Debug(err)
|
||||
case connC := <-conns:
|
||||
// take the first + return asap
|
||||
close(foundConn)
|
||||
|
||||
@ -3,6 +3,7 @@ package swarm
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
inet "github.com/jbenet/go-ipfs/p2p/net"
|
||||
conn "github.com/jbenet/go-ipfs/p2p/net/conn"
|
||||
addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
|
||||
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
|
||||
@ -60,7 +61,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
|
||||
// may be fine for sk to be nil, just log a warning.
|
||||
log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.")
|
||||
}
|
||||
log.Infof("Swarm Listening at %s", maddr)
|
||||
log.Debugf("Swarm Listening at %s", maddr)
|
||||
list, err := conn.Listen(s.cg.Context(), maddr, s.local, sk)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -72,20 +73,31 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("Swarm Listeners at %s", s.ListenAddresses())
|
||||
log.Debugf("Swarm Listeners at %s", s.ListenAddresses())
|
||||
|
||||
// signal to our notifiees on successful conn.
|
||||
s.notifyAll(func(n inet.Notifiee) {
|
||||
n.Listen((*Network)(s), maddr)
|
||||
})
|
||||
|
||||
// go consume peerstream's listen accept errors. note, these ARE errors.
|
||||
// they may be killing the listener, and if we get _any_ we should be
|
||||
// fixing this in our conn.Listener (to ignore them or handle them
|
||||
// differently.)
|
||||
go func(ctx context.Context, sl *ps.Listener) {
|
||||
|
||||
// signal to our notifiees closing
|
||||
defer s.notifyAll(func(n inet.Notifiee) {
|
||||
n.ListenClose((*Network)(s), maddr)
|
||||
})
|
||||
|
||||
for {
|
||||
select {
|
||||
case err, more := <-sl.AcceptErrors():
|
||||
if !more {
|
||||
return
|
||||
}
|
||||
log.Info(err)
|
||||
log.Debugf("swarm listener accept error: %s", err)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
@ -4,9 +4,10 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
inet "github.com/jbenet/go-ipfs/p2p/net"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
|
||||
inet "github.com/jbenet/go-ipfs/p2p/net"
|
||||
)
|
||||
|
||||
func TestNotifications(t *testing.T) {
|
||||
@ -157,6 +158,8 @@ func TestNotifications(t *testing.T) {
|
||||
}
|
||||
|
||||
type netNotifiee struct {
|
||||
listen chan ma.Multiaddr
|
||||
listenClose chan ma.Multiaddr
|
||||
connected chan inet.Conn
|
||||
disconnected chan inet.Conn
|
||||
openedStream chan inet.Stream
|
||||
@ -165,6 +168,8 @@ type netNotifiee struct {
|
||||
|
||||
func newNetNotifiee() *netNotifiee {
|
||||
return &netNotifiee{
|
||||
listen: make(chan ma.Multiaddr),
|
||||
listenClose: make(chan ma.Multiaddr),
|
||||
connected: make(chan inet.Conn),
|
||||
disconnected: make(chan inet.Conn),
|
||||
openedStream: make(chan inet.Stream),
|
||||
@ -172,6 +177,12 @@ func newNetNotifiee() *netNotifiee {
|
||||
}
|
||||
}
|
||||
|
||||
func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) {
|
||||
nn.listen <- a
|
||||
}
|
||||
func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) {
|
||||
nn.listenClose <- a
|
||||
}
|
||||
func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
|
||||
nn.connected <- v
|
||||
}
|
||||
|
||||
@ -76,11 +76,6 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip
|
||||
return nil
|
||||
})
|
||||
|
||||
// sanity check. this should **never** happen
|
||||
if len(dht.peerstore.Addresses(dht.self)) < 1 {
|
||||
panic("attempt to initialize dht without addresses for self")
|
||||
}
|
||||
|
||||
h.SetStreamHandler(ProtocolDHT, dht.handleNewStream)
|
||||
dht.providers = NewProviderManager(dht.Context(), dht.self)
|
||||
dht.AddChildGroup(dht.providers)
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
package dht
|
||||
|
||||
import (
|
||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
|
||||
inet "github.com/jbenet/go-ipfs/p2p/net"
|
||||
)
|
||||
|
||||
@ -31,3 +33,5 @@ func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
|
||||
|
||||
func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {}
|
||||
func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) {}
|
||||
func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) {}
|
||||
func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) {}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user