From ccc576b69389cf489a953319c314867a002504a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 28 Mar 2019 02:57:13 +0100 Subject: [PATCH] More constructor fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Ɓukasz Magiera --- core/builder.go | 31 +++++++++++++++++++------------ core/core.go | 32 ++++++++++++++++++-------------- core/ncore.go | 15 ++++++++++----- 3 files changed, 47 insertions(+), 31 deletions(-) diff --git a/core/builder.go b/core/builder.go index 8c073a39c..b6be8115f 100644 --- a/core/builder.go +++ b/core/builder.go @@ -162,6 +162,7 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) { storage := fx.Options( fx.Provide(repoConfig), + fx.Provide(datastoreCtor), fx.Provide(baseBlockstoreCtor), fx.Provide(gcBlockstoreCtor), ) @@ -169,23 +170,39 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) { ident := fx.Options( fx.Provide(identity), fx.Provide(privateKey), + fx.Provide(peerstore), ) ipns := fx.Options( fx.Provide(recordValidator), ) + providers := fx.Options( + fx.Provide(providerQueue), + fx.Provide(providerCtor), + fx.Provide(reproviderCtor), + + fx.Invoke(reprovider), + fx.Invoke(provider.Provider.Run), + ) + online := fx.Options( fx.Provide(onlineExchangeCtor), fx.Provide(onlineNamesysCtor), fx.Invoke(ipnsRepublisher), - fx.Invoke(provider.Provider.Run), + + fx.Provide(p2p.NewP2P), + + ipfsp2p, + providers, ) if !cfg.Online { online = fx.Options( fx.Provide(offline.Exchange), fx.Provide(offlineNamesysCtor), + fx.Provide(offroute.NewOfflineRouter), + fx.Provide(provider.NewOfflineProvider), ) } @@ -197,13 +214,6 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) { fx.Provide(files), ) - providers := fx.Options( - fx.Provide(providerQueue), - fx.Provide(providerCtor), - fx.Provide(reproviderCtor), - fx.Invoke(reprovider), - ) - n := &IpfsNode{ ctx: ctx, } @@ -212,21 +222,18 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) { params, storage, ident, - ipfsp2p, ipns, online, fx.Invoke(setupSharding), core, - providers, - - fx.Provide(p2p.NewP2P), fx.Extract(n), ) n.IsOnline = cfg.Online + n.app = app /* n := &IpfsNode{ IsOnline: cfg.Online, diff --git a/core/core.go b/core/core.go index 133279755..5067d1670 100644 --- a/core/core.go +++ b/core/core.go @@ -20,6 +20,8 @@ import ( "strings" "time" + "go.uber.org/fx" + version "github.com/ipfs/go-ipfs" rp "github.com/ipfs/go-ipfs/exchange/reprovide" filestore "github.com/ipfs/go-ipfs/filestore" @@ -101,10 +103,10 @@ type IpfsNode struct { Pinning pin.Pinner // the pinning manager Mounts Mounts `optional:"true"` // current mount state, if any. PrivateKey ic.PrivKey // the local node's private Key - PNetFingerprint PNetFingerprint // fingerprint of private network + PNetFingerprint PNetFingerprint `optional:"true"` // fingerprint of private network // Services - Peerstore pstore.Peerstore // storage for other Peer instances + Peerstore pstore.Peerstore `optional:"true"` // storage for other Peer instances Blockstore bstore.GCBlockstore // the block store (lower level) Filestore *filestore.Filestore // the filestore blockstore BaseBlocks bstore.Blockstore // the raw blockstore, no filestore wrapping @@ -112,30 +114,32 @@ type IpfsNode struct { Blocks bserv.BlockService // the block service, get/add blocks. DAG ipld.DAGService // the merkle dag service, get/add objects. Resolver *resolver.Resolver // the path resolution system - Reporter metrics.Reporter + Reporter metrics.Reporter `optional:"true"` Discovery discovery.Service `optional:"true"` FilesRoot *mfs.Root RecordValidator record.Validator // Online - PeerHost p2phost.Host // the network host (server+client) + PeerHost p2phost.Host `optional:"true"` // the network host (server+client) Bootstrapper io.Closer `optional:"true"` // the periodic bootstrapper - Routing routing.IpfsRouting // the routing system. recommend ipfs-dht + Routing routing.IpfsRouting `optional:"true"` // the routing system. recommend ipfs-dht Exchange exchange.Interface // the block exchange + strategy (bitswap) Namesys namesys.NameSystem // the name system, resolves paths to hashes - Provider provider.Provider // the value provider system - Reprovider *rp.Reprovider // the value reprovider system + Provider provider.Provider // the value provider system + Reprovider *rp.Reprovider `optional:"true"` // the value reprovider system IpnsRepub *ipnsrp.Republisher `optional:"true"` AutoNAT *autonat.AutoNATService `optional:"true"` - PubSub *pubsub.PubSub - PSRouter *psrouter.PubsubValueStore - DHT *dht.IpfsDHT - P2P *p2p.P2P + PubSub *pubsub.PubSub `optional:"true"` + PSRouter *psrouter.PubsubValueStore `optional:"true"` + DHT *dht.IpfsDHT `optional:"true"` + P2P *p2p.P2P `optional:"true"` - proc goprocess.Process + proc goprocess.Process //TODO: remove ctx context.Context + app *fx.App + // Flags IsOnline bool `optional:"true"` // Online is set when networking is enabled. IsDaemon bool `optional:"true"` // Daemon is set when running on a long-running daemon. @@ -648,9 +652,9 @@ func (n *IpfsNode) Process() goprocess.Process { return n.proc } -// Close calls Close() on the Process object +// Close calls Close() on the App object func (n *IpfsNode) Close() error { - return n.proc.Close() + return n.app.Stop(n.ctx) } // Context returns the IpfsNode context diff --git a/core/ncore.go b/core/ncore.go index 1aa300722..725591975 100644 --- a/core/ncore.go +++ b/core/ncore.go @@ -111,6 +111,10 @@ func privateKey(cfg *iconfig.Config, id peer.ID) (ic.PrivKey, error) { return sk, nil } +func datastoreCtor(repo repo.Repo) ds.Datastore { + return repo.Datastore() +} + func baseBlockstoreCtor(repo repo.Repo, cfg *iconfig.Config, bcfg *BuildCfg, lc fx.Lifecycle) (bs bstore.Blockstore, err error) { rds := &retry.Datastore{ Batching: repo.Datastore(), @@ -180,8 +184,6 @@ func recordValidator(ps pstore.Peerstore) record.Validator { // libp2p var ipfsp2p = fx.Options( - fx.Provide(peerstore), - fx.Provide(p2pAddrFilters), fx.Provide(p2pBandwidthCounter), fx.Provide(p2pPNet), @@ -411,6 +413,9 @@ func p2pHost(lc fx.Lifecycle, params p2pHostIn) (out p2pHostOut, err error) { })) out.Host, err = params.HostOption(ctx, params.ID, params.Peerstore, opts...) + if err != nil { + return p2pHostOut{}, err + } // this code is necessary just for tests: mock network constructions // ignore the libp2p constructor options that actually construct the routing! @@ -758,14 +763,14 @@ func lifecycleCtx(lc fx.Lifecycle) context.Context { } func lcGoProc(lc fx.Lifecycle, processFunc goprocess.ProcessFunc) { - proc := goprocess.Background() + proc := make(chan goprocess.Process, 1) lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { - proc.Go(processFunc) + proc <- goprocess.Go(processFunc) return nil }, OnStop: func(ctx context.Context) error { - return proc.Close() // todo: respect ctx + return (<-proc).Close() // todo: respect ctx }, }) }