Fix goprocess / lifecycle / ctx relations

License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:
Łukasz Magiera 2019-03-29 15:55:52 +01:00 committed by Steven Allen
parent ccc576b693
commit cc2be2e73a
8 changed files with 47 additions and 25 deletions

View File

@ -372,7 +372,7 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
if err != nil {
return err
}
node.Process().AddChild(goprocess.WithTeardown(cctx.Plugins.Close))
node.Process.AddChild(goprocess.WithTeardown(cctx.Plugins.Close))
// construct api endpoint - every time
apiErrc, err := serveHTTPApi(req, cctx)

View File

@ -5,13 +5,15 @@ import (
"crypto/rand"
"encoding/base64"
"errors"
"github.com/ipfs/go-ipfs/p2p"
"github.com/ipfs/go-ipfs/provider"
"go.uber.org/fx"
"os"
"syscall"
"time"
"go.uber.org/fx"
"github.com/ipfs/go-ipfs/p2p"
"github.com/ipfs/go-ipfs/provider"
filestore "github.com/ipfs/go-ipfs/filestore"
namesys "github.com/ipfs/go-ipfs/namesys"
pin "github.com/ipfs/go-ipfs/pin"
@ -219,6 +221,8 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) {
}
app := fx.New(
fx.Provide(baseProcess),
params,
storage,
ident,
@ -226,12 +230,18 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) {
online,
fx.Invoke(setupSharding),
fx.NopLogger,
core,
fx.Extract(n),
)
go func() {
<-ctx.Done()
app.Stop(context.Background())
}()
n.IsOnline = cfg.Online
n.app = app

View File

@ -21,7 +21,7 @@ var daemonShutdownCmd = &cmds.Command{
return cmdkit.Errorf(cmdkit.ErrClient, "daemon not running")
}
if err := nd.Process().Close(); err != nil {
if err := nd.Close(); err != nil {
log.Error("error while shutting down ipfs daemon:", err)
}

View File

@ -135,7 +135,7 @@ type IpfsNode struct {
DHT *dht.IpfsDHT `optional:"true"`
P2P *p2p.P2P `optional:"true"`
proc goprocess.Process //TODO: remove
Process goprocess.Process
ctx context.Context
app *fx.App
@ -206,9 +206,9 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
log.Warning("This might be configuration mistake.")
}
}
case <-n.Process().Closing():
t.Stop()
return
//case <-n.Process().Closing():
// t.Stop()
// return
}
}
}()
@ -642,16 +642,11 @@ func (n *IpfsNode) setupIpnsRepublisher() error {
n.IpnsRepub.RecordLifetime = d
}
n.Process().Go(n.IpnsRepub.Run)
//n.Process().Go(n.IpnsRepub.Run)
return nil
}
// Process returns the Process object
func (n *IpfsNode) Process() goprocess.Process {
return n.proc
}
// Close calls Close() on the App object
func (n *IpfsNode) Close() error {
return n.app.Stop(n.ctx)

View File

@ -85,7 +85,7 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error
}
select {
case <-node.Process().Closing():
case <-node.Process.Closing():
return fmt.Errorf("failed to start server, process closing")
default:
}
@ -95,7 +95,7 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error
}
var serverError error
serverProc := node.Process().Go(func(p goprocess.Process) {
serverProc := node.Process.Go(func(p goprocess.Process) {
serverError = server.Serve(lis)
})
@ -103,7 +103,7 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error
select {
case <-serverProc.Closed():
// if node being closed before server exits, close server
case <-node.Process().Closing():
case <-node.Process.Closing():
log.Infof("server at %s terminating...", addr)
warnProc := periodicproc.Tick(5*time.Second, func(_ goprocess.Process) {

View File

@ -591,7 +591,7 @@ func onlineNamesysCtor(rt routing.IpfsRouting, repo repo.Repo, cfg *iconfig.Conf
return namesys.NewNameSystem(rt, repo.Datastore(), cs), nil
}
func ipnsRepublisher(lc fx.Lifecycle, cfg *iconfig.Config, namesys namesys.NameSystem, repo repo.Repo, privKey ic.PrivKey) error {
func ipnsRepublisher(lc lcProcess, cfg *iconfig.Config, namesys namesys.NameSystem, repo repo.Repo, privKey ic.PrivKey) error {
repub := ipnsrp.NewRepublisher(namesys, repo.Datastore(), privKey, repo.Keystore())
if cfg.Ipns.RepublishPeriod != "" {
@ -616,7 +616,7 @@ func ipnsRepublisher(lc fx.Lifecycle, cfg *iconfig.Config, namesys namesys.NameS
repub.RecordLifetime = d
}
lcGoProc(lc, repub.Run)
lc.Run(repub.Run)
return nil
}
@ -762,11 +762,18 @@ func lifecycleCtx(lc fx.Lifecycle) context.Context {
return ctx
}
func lcGoProc(lc fx.Lifecycle, processFunc goprocess.ProcessFunc) {
type lcProcess struct {
fx.In
LC fx.Lifecycle
Proc goprocess.Process
}
func (lp *lcProcess) Run(f goprocess.ProcessFunc) {
proc := make(chan goprocess.Process, 1)
lc.Append(fx.Hook{
lp.LC.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
proc <- goprocess.Go(processFunc)
proc <- lp.Proc.Go(f)
return nil
},
OnStop: func(ctx context.Context) error {
@ -775,6 +782,16 @@ func lcGoProc(lc fx.Lifecycle, processFunc goprocess.ProcessFunc) {
})
}
func baseProcess(lc fx.Lifecycle) goprocess.Process {
p := goprocess.WithParent(goprocess.Background())
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return p.Close()
},
})
return p
}
func setupSharding(cfg *iconfig.Config) {
// TEMP: setting global sharding switch here
uio.UseHAMTSharding = cfg.Experimental.ShardingEnabled

View File

@ -22,5 +22,5 @@ func Mount(ipfs *core.IpfsNode, ipnsmp, ipfsmp string) (mount.Mount, error) {
return nil, err
}
return mount.NewMount(ipfs.Process(), fsys, ipnsmp, allow_other)
return mount.NewMount(ipfs.Process, fsys, ipnsmp, allow_other)
}

View File

@ -16,5 +16,5 @@ func Mount(ipfs *core.IpfsNode, mountpoint string) (mount.Mount, error) {
}
allow_other := cfg.Mounts.FuseAllowOther
fsys := NewFileSystem(ipfs)
return mount.NewMount(ipfs.Process(), fsys, mountpoint, allow_other)
return mount.NewMount(ipfs.Process, fsys, mountpoint, allow_other)
}