diff --git a/core/builder.go b/core/builder.go index 330fb9498..358e76194 100644 --- a/core/builder.go +++ b/core/builder.go @@ -129,6 +129,8 @@ func defaultRepo(dstore repo.Datastore) (repo.Repo, error) { }, nil } +type MetricsCtx context.Context + // NewNode constructs and returns an IpfsNode using the given cfg. func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) { if cfg == nil { @@ -157,9 +159,14 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) { return cfg }) + metricsCtx := fx.Provide(func() MetricsCtx { + return MetricsCtx(ctx) + }) + params := fx.Options( repoOption, cfgOption, + metricsCtx, ) storage := fx.Options( diff --git a/core/ncore.go b/core/ncore.go index 88f8ebfa4..6b8130a57 100644 --- a/core/ncore.go +++ b/core/ncore.go @@ -117,7 +117,7 @@ func datastoreCtor(repo repo.Repo) ds.Datastore { type BaseBlocks bstore.Blockstore -func baseBlockstoreCtor(repo repo.Repo, cfg *iconfig.Config, bcfg *BuildCfg, lc fx.Lifecycle) (bs BaseBlocks, err error) { +func baseBlockstoreCtor(mctx MetricsCtx, repo repo.Repo, cfg *iconfig.Config, bcfg *BuildCfg, lc fx.Lifecycle) (bs BaseBlocks, err error) { rds := &retry.Datastore{ Batching: repo.Datastore(), Delay: time.Millisecond * 200, @@ -135,7 +135,7 @@ func baseBlockstoreCtor(repo repo.Repo, cfg *iconfig.Config, bcfg *BuildCfg, lc } if !bcfg.NilRepo { - ctx, cancel := context.WithCancel(context.TODO()) //TODO: needed for mertics + ctx, cancel := context.WithCancel(mctx) lc.Append(fx.Hook{ OnStop: func(context context.Context) error { @@ -395,13 +395,13 @@ type p2pHostOut struct { } // TODO: move some of this into params struct -func p2pHost(lc fx.Lifecycle, params p2pHostIn) (out p2pHostOut, err error) { +func p2pHost(mctx MetricsCtx, lc fx.Lifecycle, params p2pHostIn) (out p2pHostOut, err error) { opts := []libp2p.Option{libp2p.NoListenAddrs} for _, o := range params.Opts { opts = append(opts, o...) } - ctx, cancel := context.WithCancel(context.TODO()) + ctx, cancel := context.WithCancel(mctx) lc.Append(fx.Hook{ OnStop: func(_ context.Context) error { cancel() @@ -471,12 +471,12 @@ type p2pRoutingOut struct { PSRouter *psrouter.PubsubValueStore //TODO: optional } -func p2pOnlineRouting(lc fx.Lifecycle, in p2pRoutingIn) (out p2pRoutingOut) { +func p2pOnlineRouting(mctx MetricsCtx, lc fx.Lifecycle, in p2pRoutingIn) (out p2pRoutingOut) { out.IpfsRouting = in.BaseRouting if in.BCfg.getOpt("ipnsps") { out.PSRouter = psrouter.NewPubsubValueStore( - lifecycleCtx(lc), + lifecycleCtx(mctx, lc), in.Host, in.BaseRouting, in.PubSub, @@ -503,7 +503,7 @@ func p2pOnlineRouting(lc fx.Lifecycle, in p2pRoutingIn) (out p2pRoutingOut) { //////////// // P2P services -func autoNATService(lc fx.Lifecycle, cfg *iconfig.Config, host p2phost.Host) error { +func autoNATService(mctx MetricsCtx, lc fx.Lifecycle, cfg *iconfig.Config, host p2phost.Host) error { if !cfg.Swarm.EnableAutoNATService { return nil } @@ -512,11 +512,11 @@ func autoNATService(lc fx.Lifecycle, cfg *iconfig.Config, host p2phost.Host) err opts = append(opts, libp2p.DefaultTransports, libp2p.Transport(quic.NewTransport)) } - _, err := autonat.NewAutoNATService(lifecycleCtx(lc), host, opts...) + _, err := autonat.NewAutoNATService(lifecycleCtx(mctx, lc), host, opts...) return err } -func pubsubCtor(lc fx.Lifecycle, host p2phost.Host, bcfg *BuildCfg, cfg *iconfig.Config) (service *pubsub.PubSub, err error) { +func pubsubCtor(mctx MetricsCtx, lc fx.Lifecycle, host p2phost.Host, bcfg *BuildCfg, cfg *iconfig.Config) (service *pubsub.PubSub, err error) { if !(bcfg.getOpt("pubsub") || bcfg.getOpt("ipnsps")) { return nil, nil // TODO: mark optional } @@ -534,10 +534,10 @@ func pubsubCtor(lc fx.Lifecycle, host p2phost.Host, bcfg *BuildCfg, cfg *iconfig case "": fallthrough case "floodsub": - service, err = pubsub.NewFloodSub(lifecycleCtx(lc), host, pubsubOptions...) + service, err = pubsub.NewFloodSub(lifecycleCtx(mctx, lc), host, pubsubOptions...) case "gossipsub": - service, err = pubsub.NewGossipSub(lifecycleCtx(lc), host, pubsubOptions...) + service, err = pubsub.NewGossipSub(lifecycleCtx(mctx, lc), host, pubsubOptions...) default: err = fmt.Errorf("Unknown pubsub router %s", cfg.Pubsub.Router) @@ -577,9 +577,9 @@ func dagCtor(bs bserv.BlockService) format.DAGService { return merkledag.NewDAGService(bs) } -func onlineExchangeCtor(lc fx.Lifecycle, host p2phost.Host, rt routing.IpfsRouting, bs bstore.GCBlockstore) exchange.Interface { +func onlineExchangeCtor(mctx MetricsCtx, lc fx.Lifecycle, host p2phost.Host, rt routing.IpfsRouting, bs bstore.GCBlockstore) exchange.Interface { bitswapNetwork := bsnet.NewFromIpfsHost(host, rt) - return bitswap.New(lifecycleCtx(lc), bitswapNetwork, bs) + return bitswap.New(lifecycleCtx(mctx, lc), bitswapNetwork, bs) } func onlineNamesysCtor(rt routing.IpfsRouting, repo repo.Repo, cfg *iconfig.Config) (namesys.NameSystem, error) { @@ -636,20 +636,20 @@ func (dh *discoveryHandler) HandlePeerFound(p pstore.PeerInfo) { } } -func newDiscoveryHandler(lc fx.Lifecycle, host p2phost.Host) *discoveryHandler { +func newDiscoveryHandler(mctx MetricsCtx, lc fx.Lifecycle, host p2phost.Host) *discoveryHandler { return &discoveryHandler{ - ctx: lifecycleCtx(lc), + ctx: lifecycleCtx(mctx, lc), host: host, } } -func setupDiscovery(lc fx.Lifecycle, cfg *iconfig.Config, host p2phost.Host, handler *discoveryHandler) error { +func setupDiscovery(mctx MetricsCtx, lc fx.Lifecycle, cfg *iconfig.Config, host p2phost.Host, handler *discoveryHandler) error { if cfg.Discovery.MDNS.Enabled { mdns := cfg.Discovery.MDNS if mdns.Interval == 0 { mdns.Interval = 5 } - service, err := discovery.NewMdnsService(lifecycleCtx(lc), host, time.Duration(mdns.Interval)*time.Second, discovery.ServiceTag) + service, err := discovery.NewMdnsService(lifecycleCtx(mctx, lc), host, time.Duration(mdns.Interval)*time.Second, discovery.ServiceTag) if err != nil { log.Error("mdns error: ", err) return nil @@ -659,15 +659,15 @@ func setupDiscovery(lc fx.Lifecycle, cfg *iconfig.Config, host p2phost.Host, han return nil } -func providerQueue(lc fx.Lifecycle, repo repo.Repo) (*provider.Queue, error) { - return provider.NewQueue(lifecycleCtx(lc), "provider-v1", repo.Datastore()) +func providerQueue(mctx MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (*provider.Queue, error) { + return provider.NewQueue(lifecycleCtx(mctx, lc), "provider-v1", repo.Datastore()) } -func providerCtor(lc fx.Lifecycle, queue *provider.Queue, rt routing.IpfsRouting) provider.Provider { - return provider.NewProvider(lifecycleCtx(lc), queue, rt) +func providerCtor(mctx MetricsCtx, lc fx.Lifecycle, queue *provider.Queue, rt routing.IpfsRouting) provider.Provider { + return provider.NewProvider(lifecycleCtx(mctx, lc), queue, rt) } -func reproviderCtor(lc fx.Lifecycle, cfg *iconfig.Config, bs BaseBlocks, ds format.DAGService, pinning pin.Pinner, rt routing.IpfsRouting) (*rp.Reprovider, error) { +func reproviderCtor(mctx MetricsCtx, lc fx.Lifecycle, cfg *iconfig.Config, bs BaseBlocks, ds format.DAGService, pinning pin.Pinner, rt routing.IpfsRouting) (*rp.Reprovider, error) { var keyProvider rp.KeyChanFunc switch cfg.Reprovider.Strategy { @@ -682,7 +682,7 @@ func reproviderCtor(lc fx.Lifecycle, cfg *iconfig.Config, bs BaseBlocks, ds form default: return nil, fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy) } - return rp.NewReprovider(lifecycleCtx(lc), rt, keyProvider), nil + return rp.NewReprovider(lifecycleCtx(mctx, lc), rt, keyProvider), nil } func reprovider(cfg *iconfig.Config, reprovider *rp.Reprovider) error { @@ -700,7 +700,7 @@ func reprovider(cfg *iconfig.Config, reprovider *rp.Reprovider) error { return nil } -func files(lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, error) { +func files(mctx MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, error) { dsk := ds.NewKey("/local/filesroot") pf := func(ctx context.Context, c cid.Cid) error { return repo.Datastore().Put(dsk, c.Bytes()) @@ -708,7 +708,7 @@ func files(lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, e var nd *merkledag.ProtoNode val, err := repo.Datastore().Get(dsk) - ctx := lifecycleCtx(lc) + ctx := lifecycleCtx(mctx, lc) switch { case err == ds.ErrNotFound || val == nil: @@ -748,8 +748,8 @@ func files(lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, e // // This is a hack which we need because most of our services use contexts in a // wrong way -func lifecycleCtx(lc fx.Lifecycle) context.Context { - ctx, cancel := context.WithCancel(context.TODO()) // TODO: really wire this context up, things (like metrics) may depend on it +func lifecycleCtx(mctx MetricsCtx, lc fx.Lifecycle) context.Context { + ctx, cancel := context.WithCancel(mctx) lc.Append(fx.Hook{ OnStop: func(_ context.Context) error { cancel() @@ -774,7 +774,7 @@ func (lp *lcProcess) Run(f goprocess.ProcessFunc) { return nil }, OnStop: func(ctx context.Context) error { - return (<-proc).Close() // todo: respect ctx + return (<-proc).Close() // todo: respect ctx, somehow }, }) }