diff --git a/core/core.go b/core/core.go index b2bae4849..b1098bfb6 100644 --- a/core/core.go +++ b/core/core.go @@ -106,7 +106,8 @@ type IpfsNode struct { IpnsFs *ipnsfs.Filesystem - goprocess.Process + proc goprocess.Process + ctx context.Context mode mode } @@ -121,22 +122,23 @@ type Mounts struct { type ConfigOption func(ctx context.Context) (*IpfsNode, error) -func NewIPFSNode(parent context.Context, option ConfigOption) (*IpfsNode, error) { - procctx := goprocessctx.WithContext(parent) - ctx := parent - success := false // flip to true after all sub-system inits succeed - defer func() { - if !success { - procctx.Close() - } - }() - +func NewIPFSNode(ctx context.Context, option ConfigOption) (*IpfsNode, error) { node, err := option(ctx) if err != nil { return nil, err } - node.Process = procctx - ctxg.SetTeardown(node.teardown) + + proc := goprocessctx.WithContext(ctx) + proc.SetTeardown(node.teardown) + node.proc = proc + node.ctx = ctx + + success := false // flip to true after all sub-system inits succeed + defer func() { + if !success { + proc.Close() + } + }() // Need to make sure it's perfectly clear 1) which variables are expected // to be initialized at this point, and 2) which variables will be @@ -346,6 +348,21 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost return nil } +// Process returns the Process object +func (n *IpfsNode) Process() goprocess.Process { + return n.proc +} + +// Close calls Close() on the Process object +func (n *IpfsNode) Close() error { + return n.proc.Close() +} + +// Context returns the IpfsNode context +func (n *IpfsNode) Context() context.Context { + return n.ctx +} + // teardown closes owned children. If any errors occur, this function returns // the first error. func (n *IpfsNode) teardown() error { @@ -372,7 +389,7 @@ func (n *IpfsNode) teardown() error { } if dht, ok := n.Routing.(*dht.IpfsDHT); ok { - closers = append(closers, dht) + closers = append(closers, dht.Process()) } if n.PeerHost != nil { diff --git a/core/corehttp/corehttp.go b/core/corehttp/corehttp.go index 95a159fa2..042f056ad 100644 --- a/core/corehttp/corehttp.go +++ b/core/corehttp/corehttp.go @@ -12,6 +12,7 @@ import ( ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" core "github.com/ipfs/go-ipfs/core" eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog" ) @@ -78,20 +79,17 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error var serverError error serverExited := make(chan struct{}) - node.Children().Add(1) - defer node.Children().Done() - - go func() { + node.Process().Go(func(p goprocess.Process) { serverError = http.Serve(lis, handler) close(serverExited) - }() + }) // wait for server to exit. select { case <-serverExited: // if node being closed before server exits, close server - case <-node.Closing(): + case <-node.Process().Closing(): log.Infof("server at %s terminating...", addr) lis.Close() diff --git a/core/coreunix/cat.go b/core/coreunix/cat.go index cc495582f..5bbd0f56e 100644 --- a/core/coreunix/cat.go +++ b/core/coreunix/cat.go @@ -10,9 +10,9 @@ import ( func Cat(n *core.IpfsNode, pstr string) (io.Reader, error) { p := path.FromString(pstr) - dagNode, err := n.Resolver.ResolvePath(n.ContextGroup.Context(), p) + dagNode, err := n.Resolver.ResolvePath(n.Context(), p) if err != nil { return nil, err } - return uio.NewDagReader(n.ContextGroup.Context(), dagNode, n.DAG) + return uio.NewDagReader(n.Context(), dagNode, n.DAG) } diff --git a/core/mock/mock.go b/core/mock/mock.go index 42b225eab..14f90f56c 100644 --- a/core/mock/mock.go +++ b/core/mock/mock.go @@ -3,7 +3,6 @@ package coremock import ( "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" syncds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" - goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" "github.com/ipfs/go-ipfs/blocks/blockstore" blockservice "github.com/ipfs/go-ipfs/blockservice" @@ -16,6 +15,7 @@ import ( path "github.com/ipfs/go-ipfs/path" pin "github.com/ipfs/go-ipfs/pin" "github.com/ipfs/go-ipfs/repo" + config "github.com/ipfs/go-ipfs/repo/config" offrt "github.com/ipfs/go-ipfs/routing/offline" ds2 "github.com/ipfs/go-ipfs/util/datastore2" testutil "github.com/ipfs/go-ipfs/util/testutil" @@ -28,31 +28,36 @@ import ( // NewMockNode constructs an IpfsNode for use in tests. func NewMockNode() (*core.IpfsNode, error) { ctx := context.TODO() - nd := new(core.IpfsNode) // Generate Identity ident, err := testutil.RandIdentity() if err != nil { return nil, err } - p := ident.ID() - nd.Identity = p - nd.PrivateKey = ident.PrivateKey() - nd.Peerstore = peer.NewPeerstore() - nd.Peerstore.AddPrivKey(p, ident.PrivateKey()) - nd.Peerstore.AddPubKey(p, ident.PublicKey()) - nd.Process = goprocessctx.WithContext(ctx) - nd.PeerHost, err = mocknet.New(ctx).AddPeer(ident.PrivateKey(), ident.Address()) // effectively offline + c := config.Config{ + Identity: config.Identity{ + PeerID: p.String(), + }, + } + + nd, err := core.Offline(&repo.Mock{ + C: c, + D: ds2.CloserWrap(syncds.MutexWrap(datastore.NewMapDatastore())), + })(ctx) if err != nil { return nil, err } - // Temp Datastore - nd.Repo = &repo.Mock{ - // TODO C: conf, - D: ds2.CloserWrap(syncds.MutexWrap(datastore.NewMapDatastore())), + nd.PrivateKey = ident.PrivateKey() + nd.Peerstore = peer.NewPeerstore() + nd.Peerstore.AddPrivKey(p, ident.PrivateKey()) + nd.Peerstore.AddPubKey(p, ident.PublicKey()) + + nd.PeerHost, err = mocknet.New(ctx).AddPeer(ident.PrivateKey(), ident.Address()) // effectively offline + if err != nil { + return nil, err } // Routing diff --git a/fuse/ipns/mount_unix.go b/fuse/ipns/mount_unix.go index 343653d0a..9b4133341 100644 --- a/fuse/ipns/mount_unix.go +++ b/fuse/ipns/mount_unix.go @@ -18,5 +18,5 @@ func Mount(ipfs *core.IpfsNode, ipnsmp, ipfsmp string) (mount.Mount, error) { return nil, err } - return mount.NewMount(ipfs, fsys, ipnsmp, allow_other) + return mount.NewMount(ipfs.Process(), fsys, ipnsmp, allow_other) } diff --git a/fuse/readonly/mount_unix.go b/fuse/readonly/mount_unix.go index 60d14ffe8..dc5a56e4e 100644 --- a/fuse/readonly/mount_unix.go +++ b/fuse/readonly/mount_unix.go @@ -13,5 +13,5 @@ func Mount(ipfs *core.IpfsNode, mountpoint string) (mount.Mount, error) { cfg := ipfs.Repo.Config() allow_other := cfg.Mounts.FuseAllowOther fsys := NewFileSystem(ipfs) - return mount.NewMount(ipfs, fsys, mountpoint, allow_other) + return mount.NewMount(ipfs.Process(), fsys, mountpoint, allow_other) } diff --git a/p2p/net/mock/mock_net.go b/p2p/net/mock/mock_net.go index 8817f20ba..f7cabf463 100644 --- a/p2p/net/mock/mock_net.go +++ b/p2p/net/mock/mock_net.go @@ -33,6 +33,7 @@ type mocknet struct { linkDefaults LinkOptions proc goprocess.Process // for Context closing + ctx context.Context sync.RWMutex } @@ -42,6 +43,7 @@ func New(ctx context.Context) Mocknet { hosts: map[peer.ID]*bhost.BasicHost{}, links: map[peer.ID]map[peer.ID]map[*link]struct{}{}, proc: goprocessctx.WithContext(ctx), + ctx: ctx, } } @@ -62,7 +64,7 @@ func (mn *mocknet) GenPeer() (host.Host, error) { } func (mn *mocknet) AddPeer(k ic.PrivKey, a ma.Multiaddr) (host.Host, error) { - n, err := newPeernet(mn.cg.Context(), mn, k, a) + n, err := newPeernet(mn.ctx, mn, k, a) if err != nil { return nil, err } @@ -70,7 +72,7 @@ func (mn *mocknet) AddPeer(k ic.PrivKey, a ma.Multiaddr) (host.Host, error) { h := bhost.New(n) log.Debugf("mocknet added listen addr for peer: %s -- %s", n.LocalPeer(), a) - mn.cg.AddChild(n.cg) + mn.proc.AddChild(n.proc) mn.Lock() mn.nets[n.peer] = n @@ -298,11 +300,11 @@ func (mn *mocknet) ConnectAll() error { } func (mn *mocknet) ConnectPeers(a, b peer.ID) (inet.Conn, error) { - return mn.Net(a).DialPeer(mn.cg.Context(), b) + return mn.Net(a).DialPeer(mn.ctx, b) } func (mn *mocknet) ConnectNets(a, b inet.Network) (inet.Conn, error) { - return a.DialPeer(mn.cg.Context(), b.LocalPeer()) + return a.DialPeer(mn.ctx, b.LocalPeer()) } func (mn *mocknet) DisconnectPeers(p1, p2 peer.ID) error { diff --git a/p2p/net/mock/mock_peernet.go b/p2p/net/mock/mock_peernet.go index d2fa78ede..a95c53f14 100644 --- a/p2p/net/mock/mock_peernet.go +++ b/p2p/net/mock/mock_peernet.go @@ -66,7 +66,7 @@ func newPeernet(ctx context.Context, m *mocknet, k ic.PrivKey, notifs: make(map[inet.Notifiee]struct{}), } - n.cg.SetTeardown(n.teardown) + n.proc.SetTeardown(n.teardown) return n, nil } @@ -94,7 +94,7 @@ func (pn *peernet) allConns() []*conn { // Close calls the ContextCloser func func (pn *peernet) Close() error { - return pn.cg.Close() + return pn.proc.Close() } func (pn *peernet) Peerstore() peer.Peerstore { diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 752748816..3810545fd 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -65,6 +65,7 @@ type Swarm struct { Filters *filter.Filters proc goprocess.Process + ctx context.Context bwc metrics.Reporter } @@ -82,6 +83,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, local: local, peers: peers, proc: goprocessctx.WithContext(ctx), + ctx: ctx, dialT: DialTimeout, notifs: make(map[inet.Notifiee]ps.Notifiee), bwc: bwc, @@ -137,6 +139,11 @@ func (s *Swarm) Process() goprocess.Process { return s.proc } +// Context returns the context of the swarm +func (s *Swarm) Context() context.Context { + return s.ctx +} + // Close stops the Swarm. func (s *Swarm) Close() error { return s.proc.Close() diff --git a/p2p/net/swarm/swarm_listen.go b/p2p/net/swarm/swarm_listen.go index 4a8f4dd4d..d1bcb0752 100644 --- a/p2p/net/swarm/swarm_listen.go +++ b/p2p/net/swarm/swarm_listen.go @@ -64,7 +64,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.") } log.Debugf("Swarm Listening at %s", maddr) - list, err := conn.Listen(s.cg.Context(), maddr, s.local, sk) + list, err := conn.Listen(s.Context(), maddr, s.local, sk) if err != nil { return err } @@ -112,7 +112,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { return } } - }(s.cg.Context(), sl) + }(s.Context(), sl) return nil } diff --git a/routing/dht/dht.go b/routing/dht/dht.go index fcc14273f..d6a07073e 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -58,8 +58,8 @@ type IpfsDHT struct { Validator record.Validator // record validator funcs - Context context.Context - goprocess.Process + ctx context.Context + proc goprocess.Process } // NewDHT creates a new DHT object with the given peer as the 'local' host @@ -73,18 +73,18 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip // register for network notifs. dht.host.Network().Notify((*netNotifiee)(dht)) - procctx = goprocessctx.WithContext(ctx) - procctx.SetTeardown(func() error { + proc := goprocessctx.WithContext(ctx) + proc.SetTeardown(func() error { // remove ourselves from network notifs. dht.host.Network().StopNotify((*netNotifiee)(dht)) return nil }) - dht.Process = procctx - dht.Context = ctx + dht.proc = proc + dht.ctx = ctx h.SetStreamHandler(ProtocolDHT, dht.handleNewStream) - dht.providers = NewProviderManager(dht.Context, dht.self) - dht.AddChild(dht.providers) + dht.providers = NewProviderManager(dht.ctx, dht.self) + dht.proc.AddChild(dht.providers.proc) dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(dht.self), time.Minute, dht.peerstore) dht.birth = time.Now() @@ -93,7 +93,9 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip dht.Validator["pk"] = record.PublicKeyValidator if doPinging { - dht.Go(func() { dht.PingRoutine(time.Second * 10) }) + dht.proc.Go(func(p goprocess.Process) { + dht.PingRoutine(time.Second * 10) + }) } return dht } @@ -360,15 +362,30 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) { rand.Read(id) peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.Key(id)), 5) for _, p := range peers { - ctx, cancel := context.WithTimeout(dht.Context, time.Second*5) + ctx, cancel := context.WithTimeout(dht.Context(), time.Second*5) _, err := dht.Ping(ctx, p) if err != nil { log.Debugf("Ping error: %s", err) } cancel() } - case <-dht.Closing(): + case <-dht.proc.Closing(): return } } } + +// Context return dht's context +func (dht *IpfsDHT) Context() context.Context { + return dht.ctx +} + +// Process return dht's process +func (dht *IpfsDHT) Process() goprocess.Process { + return dht.proc +} + +// Close calls Process Close +func (dht *IpfsDHT) Close() error { + return dht.proc.Close() +} diff --git a/routing/dht/notif.go b/routing/dht/notif.go index 70144481a..cfe411c38 100644 --- a/routing/dht/notif.go +++ b/routing/dht/notif.go @@ -16,7 +16,7 @@ func (nn *netNotifiee) DHT() *IpfsDHT { func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { dht := nn.DHT() select { - case <-dht.Closing(): + case <-dht.Process().Closing(): return default: } @@ -26,7 +26,7 @@ func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) { dht := nn.DHT() select { - case <-dht.Closing(): + case <-dht.Process().Closing(): return default: } diff --git a/routing/dht/providers.go b/routing/dht/providers.go index 46675604a..17455b336 100644 --- a/routing/dht/providers.go +++ b/routing/dht/providers.go @@ -22,7 +22,7 @@ type ProviderManager struct { newprovs chan *addProv getprovs chan *getProv period time.Duration - goprocess.Process + proc goprocess.Process } type providerSet struct { @@ -47,8 +47,8 @@ func NewProviderManager(ctx context.Context, local peer.ID) *ProviderManager { pm.providers = make(map[key.Key]*providerSet) pm.getlocal = make(chan chan []key.Key) pm.local = make(map[key.Key]struct{}) - pm.Process = goprocessctx.WithContext(ctx) - pm.Go(pm.run) + pm.proc = goprocessctx.WithContext(ctx) + pm.proc.Go(func(p goprocess.Process) { pm.run() }) return pm } @@ -97,7 +97,7 @@ func (pm *ProviderManager) run() { provs.providers = filtered } - case <-pm.Closing(): + case <-pm.proc.Closing(): return } } diff --git a/routing/dht/providers_test.go b/routing/dht/providers_test.go index ecf937962..7e2e47d93 100644 --- a/routing/dht/providers_test.go +++ b/routing/dht/providers_test.go @@ -19,5 +19,5 @@ func TestProviderManager(t *testing.T) { if len(resp) != 1 { t.Fatal("Could not retrieve provider.") } - p.Close() + p.proc.Close() }