From cc2be2e73a5f568643680f876ba3ff377d67f08b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 29 Mar 2019 15:55:52 +0100 Subject: [PATCH] Fix goprocess / lifecycle / ctx relations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Ɓukasz Magiera --- cmd/ipfs/daemon.go | 2 +- core/builder.go | 16 +++++++++++++--- core/commands/shutdown.go | 2 +- core/core.go | 15 +++++---------- core/corehttp/corehttp.go | 6 +++--- core/ncore.go | 27 ++++++++++++++++++++++----- fuse/ipns/mount_unix.go | 2 +- fuse/readonly/mount_unix.go | 2 +- 8 files changed, 47 insertions(+), 25 deletions(-) diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index 2aed209ad..fc90e969f 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -372,7 +372,7 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment if err != nil { return err } - node.Process().AddChild(goprocess.WithTeardown(cctx.Plugins.Close)) + node.Process.AddChild(goprocess.WithTeardown(cctx.Plugins.Close)) // construct api endpoint - every time apiErrc, err := serveHTTPApi(req, cctx) diff --git a/core/builder.go b/core/builder.go index b6be8115f..7eb80c91f 100644 --- a/core/builder.go +++ b/core/builder.go @@ -5,13 +5,15 @@ import ( "crypto/rand" "encoding/base64" "errors" - "github.com/ipfs/go-ipfs/p2p" - "github.com/ipfs/go-ipfs/provider" - "go.uber.org/fx" "os" "syscall" "time" + "go.uber.org/fx" + + "github.com/ipfs/go-ipfs/p2p" + "github.com/ipfs/go-ipfs/provider" + filestore "github.com/ipfs/go-ipfs/filestore" namesys "github.com/ipfs/go-ipfs/namesys" pin "github.com/ipfs/go-ipfs/pin" @@ -219,6 +221,8 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) { } app := fx.New( + fx.Provide(baseProcess), + params, storage, ident, @@ -226,12 +230,18 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) { online, fx.Invoke(setupSharding), + fx.NopLogger, core, fx.Extract(n), ) + go func() { + <-ctx.Done() + app.Stop(context.Background()) + }() + n.IsOnline = cfg.Online n.app = app diff --git a/core/commands/shutdown.go b/core/commands/shutdown.go index 4a6d44dd4..0586e3da4 100644 --- a/core/commands/shutdown.go +++ b/core/commands/shutdown.go @@ -21,7 +21,7 @@ var daemonShutdownCmd = &cmds.Command{ return cmdkit.Errorf(cmdkit.ErrClient, "daemon not running") } - if err := nd.Process().Close(); err != nil { + if err := nd.Close(); err != nil { log.Error("error while shutting down ipfs daemon:", err) } diff --git a/core/core.go b/core/core.go index 5067d1670..9b18782d2 100644 --- a/core/core.go +++ b/core/core.go @@ -135,7 +135,7 @@ type IpfsNode struct { DHT *dht.IpfsDHT `optional:"true"` P2P *p2p.P2P `optional:"true"` - proc goprocess.Process //TODO: remove + Process goprocess.Process ctx context.Context app *fx.App @@ -206,9 +206,9 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin log.Warning("This might be configuration mistake.") } } - case <-n.Process().Closing(): - t.Stop() - return + //case <-n.Process().Closing(): + // t.Stop() + // return } } }() @@ -642,16 +642,11 @@ func (n *IpfsNode) setupIpnsRepublisher() error { n.IpnsRepub.RecordLifetime = d } - n.Process().Go(n.IpnsRepub.Run) + //n.Process().Go(n.IpnsRepub.Run) return nil } -// Process returns the Process object -func (n *IpfsNode) Process() goprocess.Process { - return n.proc -} - // Close calls Close() on the App object func (n *IpfsNode) Close() error { return n.app.Stop(n.ctx) diff --git a/core/corehttp/corehttp.go b/core/corehttp/corehttp.go index 330e8e9c2..c52bea8f5 100644 --- a/core/corehttp/corehttp.go +++ b/core/corehttp/corehttp.go @@ -85,7 +85,7 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error } select { - case <-node.Process().Closing(): + case <-node.Process.Closing(): return fmt.Errorf("failed to start server, process closing") default: } @@ -95,7 +95,7 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error } var serverError error - serverProc := node.Process().Go(func(p goprocess.Process) { + serverProc := node.Process.Go(func(p goprocess.Process) { serverError = server.Serve(lis) }) @@ -103,7 +103,7 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error select { case <-serverProc.Closed(): // if node being closed before server exits, close server - case <-node.Process().Closing(): + case <-node.Process.Closing(): log.Infof("server at %s terminating...", addr) warnProc := periodicproc.Tick(5*time.Second, func(_ goprocess.Process) { diff --git a/core/ncore.go b/core/ncore.go index 725591975..cd716edcb 100644 --- a/core/ncore.go +++ b/core/ncore.go @@ -591,7 +591,7 @@ func onlineNamesysCtor(rt routing.IpfsRouting, repo repo.Repo, cfg *iconfig.Conf return namesys.NewNameSystem(rt, repo.Datastore(), cs), nil } -func ipnsRepublisher(lc fx.Lifecycle, cfg *iconfig.Config, namesys namesys.NameSystem, repo repo.Repo, privKey ic.PrivKey) error { +func ipnsRepublisher(lc lcProcess, cfg *iconfig.Config, namesys namesys.NameSystem, repo repo.Repo, privKey ic.PrivKey) error { repub := ipnsrp.NewRepublisher(namesys, repo.Datastore(), privKey, repo.Keystore()) if cfg.Ipns.RepublishPeriod != "" { @@ -616,7 +616,7 @@ func ipnsRepublisher(lc fx.Lifecycle, cfg *iconfig.Config, namesys namesys.NameS repub.RecordLifetime = d } - lcGoProc(lc, repub.Run) + lc.Run(repub.Run) return nil } @@ -762,11 +762,18 @@ func lifecycleCtx(lc fx.Lifecycle) context.Context { return ctx } -func lcGoProc(lc fx.Lifecycle, processFunc goprocess.ProcessFunc) { +type lcProcess struct { + fx.In + + LC fx.Lifecycle + Proc goprocess.Process +} + +func (lp *lcProcess) Run(f goprocess.ProcessFunc) { proc := make(chan goprocess.Process, 1) - lc.Append(fx.Hook{ + lp.LC.Append(fx.Hook{ OnStart: func(ctx context.Context) error { - proc <- goprocess.Go(processFunc) + proc <- lp.Proc.Go(f) return nil }, OnStop: func(ctx context.Context) error { @@ -775,6 +782,16 @@ func lcGoProc(lc fx.Lifecycle, processFunc goprocess.ProcessFunc) { }) } +func baseProcess(lc fx.Lifecycle) goprocess.Process { + p := goprocess.WithParent(goprocess.Background()) + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + return p.Close() + }, + }) + return p +} + func setupSharding(cfg *iconfig.Config) { // TEMP: setting global sharding switch here uio.UseHAMTSharding = cfg.Experimental.ShardingEnabled diff --git a/fuse/ipns/mount_unix.go b/fuse/ipns/mount_unix.go index e6b551b2d..54e62df60 100644 --- a/fuse/ipns/mount_unix.go +++ b/fuse/ipns/mount_unix.go @@ -22,5 +22,5 @@ func Mount(ipfs *core.IpfsNode, ipnsmp, ipfsmp string) (mount.Mount, error) { return nil, err } - return mount.NewMount(ipfs.Process(), fsys, ipnsmp, allow_other) + return mount.NewMount(ipfs.Process, fsys, ipnsmp, allow_other) } diff --git a/fuse/readonly/mount_unix.go b/fuse/readonly/mount_unix.go index ab7945456..656e23c49 100644 --- a/fuse/readonly/mount_unix.go +++ b/fuse/readonly/mount_unix.go @@ -16,5 +16,5 @@ func Mount(ipfs *core.IpfsNode, mountpoint string) (mount.Mount, error) { } allow_other := cfg.Mounts.FuseAllowOther fsys := NewFileSystem(ipfs) - return mount.NewMount(ipfs.Process(), fsys, mountpoint, allow_other) + return mount.NewMount(ipfs.Process, fsys, mountpoint, allow_other) }