diff --git a/core/core.go b/core/core.go index 23656e722..d7ededb82 100644 --- a/core/core.go +++ b/core/core.go @@ -71,13 +71,13 @@ type IpfsNode struct { // Local node Pinning pin.Pinner // the pinning manager Mounts Mounts `optional:"true"` // current mount state, if any. - PrivateKey ic.PrivKey // the local node's private Key + PrivateKey ic.PrivKey `optional:"true"` // the local node's private Key PNetFingerprint libp2p.PNetFingerprint `optional:"true"` // fingerprint of private network // Services Peerstore pstore.Peerstore `optional:"true"` // storage for other Peer instances Blockstore bstore.GCBlockstore // the block store (lower level) - Filestore *filestore.Filestore // the filestore blockstore + Filestore *filestore.Filestore `optional:"true"` // the filestore blockstore BaseBlocks node.BaseBlocks // the raw blockstore, no filestore wrapping GCLocker bstore.GCLocker // the locker used to protect the blockstore during gc Blocks bserv.BlockService // the block service, get/add blocks. diff --git a/core/node/groups.go b/core/node/groups.go index 56f2f3538..549aef81d 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -2,72 +2,187 @@ package node import ( "context" + "errors" + "fmt" + "time" + + blockstore "github.com/ipfs/go-ipfs-blockstore" + "github.com/ipfs/go-ipfs-config" + util "github.com/ipfs/go-ipfs-util" + peer "github.com/libp2p/go-libp2p-peer" + "github.com/libp2p/go-libp2p-peerstore/pstoremem" + pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/ipfs/go-ipfs/core/node/libp2p" "github.com/ipfs/go-ipfs/p2p" "github.com/ipfs/go-ipfs/provider" + "github.com/ipfs/go-ipfs/reprovide" offline "github.com/ipfs/go-ipfs-exchange-offline" offroute "github.com/ipfs/go-ipfs-routing/offline" - uio "github.com/ipfs/go-unixfs/io" "github.com/ipfs/go-path/resolver" + uio "github.com/ipfs/go-unixfs/io" "go.uber.org/fx" ) var BaseLibP2P = fx.Options( - fx.Provide(libp2p.AddrFilters), - fx.Provide(libp2p.BandwidthCounter), fx.Provide(libp2p.PNet), - fx.Provide(libp2p.AddrsFactory), fx.Provide(libp2p.ConnectionManager), - fx.Provide(libp2p.NatPortMap), - fx.Provide(libp2p.Relay), - fx.Provide(libp2p.AutoRealy), fx.Provide(libp2p.DefaultTransports), - fx.Provide(libp2p.QUIC), fx.Provide(libp2p.Host), fx.Provide(libp2p.DiscoveryHandler), - fx.Invoke(libp2p.AutoNATService), fx.Invoke(libp2p.PNetChecker), - fx.Invoke(libp2p.StartListening), - fx.Invoke(libp2p.SetupDiscovery), ) -func LibP2P(cfg *BuildCfg) fx.Option { +func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option { + + // parse ConnMgr config + + grace := config.DefaultConnMgrGracePeriod + low := config.DefaultConnMgrHighWater + high := config.DefaultConnMgrHighWater + + connmgr := fx.Options() + + if cfg.Swarm.ConnMgr.Type != "none" { + switch cfg.Swarm.ConnMgr.Type { + case "": + // 'default' value is the basic connection manager + break + case "basic": + var err error + grace, err = time.ParseDuration(cfg.Swarm.ConnMgr.GracePeriod) + if err != nil { + return fx.Error(fmt.Errorf("parsing Swarm.ConnMgr.GracePeriod: %s", err)) + } + + low = cfg.Swarm.ConnMgr.LowWater + high = cfg.Swarm.ConnMgr.HighWater + default: + return fx.Error(fmt.Errorf("unrecognized ConnMgr.Type: %q", cfg.Swarm.ConnMgr.Type)) + } + + connmgr = fx.Provide(libp2p.ConnectionManager(low, high, grace)) + } + + // parse PubSub config + + ps := fx.Options() + if bcfg.getOpt("pubsub") || bcfg.getOpt("ipnsps") { + var pubsubOptions []pubsub.Option + if cfg.Pubsub.DisableSigning { + pubsubOptions = append(pubsubOptions, pubsub.WithMessageSigning(false)) + } + + if cfg.Pubsub.StrictSignatureVerification { + pubsubOptions = append(pubsubOptions, pubsub.WithStrictSignatureVerification(true)) + } + + switch cfg.Pubsub.Router { + case "": + fallthrough + case "floodsub": + ps = fx.Provide(libp2p.FloodSub(pubsubOptions...)) + case "gossipsub": + ps = fx.Provide(libp2p.GossipSub(pubsubOptions...)) + default: + return fx.Error(fmt.Errorf("unknown pubsub router %s", cfg.Pubsub.Router)) + } + } + + // Gather all the options + opts := fx.Options( BaseLibP2P, - fx.Provide(libp2p.Security(!cfg.DisableEncryptedConnections)), - maybeProvide(libp2p.Pubsub, cfg.getOpt("pubsub") || cfg.getOpt("ipnsps")), + fx.Provide(libp2p.AddrFilters(cfg.Swarm.AddrFilters)), + fx.Invoke(libp2p.SetupDiscovery(cfg.Discovery.MDNS.Enabled, cfg.Discovery.MDNS.Interval)), + fx.Provide(libp2p.AddrsFactory(cfg.Addresses.Announce, cfg.Addresses.NoAnnounce)), + fx.Provide(libp2p.SmuxTransport(bcfg.getOpt("mplex"))), + fx.Provide(libp2p.Relay(cfg.Swarm.DisableRelay, cfg.Swarm.EnableRelayHop)), + fx.Invoke(libp2p.StartListening(cfg.Addresses.Swarm)), + + fx.Provide(libp2p.Security(!bcfg.DisableEncryptedConnections, cfg.Experimental.PreferTLS)), - fx.Provide(libp2p.SmuxTransport(cfg.getOpt("mplex"))), fx.Provide(libp2p.Routing), fx.Provide(libp2p.BaseRouting), - maybeProvide(libp2p.PubsubRouter, cfg.getOpt("ipnsps")), + maybeProvide(libp2p.PubsubRouter, bcfg.getOpt("ipnsps")), + + maybeProvide(libp2p.BandwidthCounter, !cfg.Swarm.DisableBandwidthMetrics), + maybeProvide(libp2p.NatPortMap, !cfg.Swarm.DisableNatPortMap), + maybeProvide(libp2p.AutoRealy, cfg.Swarm.EnableAutoRelay), + maybeProvide(libp2p.QUIC, cfg.Experimental.QUIC), + maybeProvide(libp2p.AutoNATService(cfg.Experimental.QUIC), cfg.Swarm.EnableAutoNATService), + connmgr, + ps, ) return opts } // Storage groups units which setup datastore based persistence and blockstore layers -func Storage(cfg *BuildCfg) fx.Option { +func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option { + cacheOpts := blockstore.DefaultCacheOpts() + cacheOpts.HasBloomFilterSize = cfg.Datastore.BloomFilterSize + if !bcfg.Permanent { + cacheOpts.HasBloomFilterSize = 0 + } + + finalBstore := fx.Provide(GcBlockstoreCtor) + if cfg.Experimental.FilestoreEnabled || cfg.Experimental.UrlstoreEnabled { + finalBstore = fx.Provide(FilestoreBlockstoreCtor) + } + return fx.Options( fx.Provide(RepoConfig), fx.Provide(Datastore), - fx.Provide(BaseBlockstoreCtor(cfg.Permanent, cfg.NilRepo)), - fx.Provide(GcBlockstoreCtor), + fx.Provide(BaseBlockstoreCtor(cacheOpts, bcfg.NilRepo, cfg.Datastore.HashOnRead)), + finalBstore, ) } // Identity groups units providing cryptographic identity -var Identity = fx.Options( - fx.Provide(PeerID), - fx.Provide(PrivateKey), - fx.Provide(libp2p.Peerstore), -) +func Identity(cfg *config.Config) fx.Option { + // PeerID + + cid := cfg.Identity.PeerID + if cid == "" { + return fx.Error(errors.New("identity was not set in config (was 'ipfs init' run?)")) + } + if len(cid) == 0 { + return fx.Error(errors.New("no peer ID in config! (was 'ipfs init' run?)")) + } + + id, err := peer.IDB58Decode(cid) + if err != nil { + return fx.Error(fmt.Errorf("peer ID invalid: %s", err)) + } + + // Private Key + + if cfg.Identity.PrivKey == "" { + return fx.Options( // No PK (usually in tests) + fx.Provide(PeerID(id)), + fx.Provide(pstoremem.NewPeerstore), + ) + } + + sk, err := cfg.Identity.DecodePrivateKey("passphrase todo!") + if err != nil { + return fx.Error(err) + } + + return fx.Options( // Full identity + fx.Provide(PeerID(id)), + fx.Provide(PrivateKey(sk)), + fx.Provide(pstoremem.NewPeerstore), + + fx.Invoke(libp2p.PstoreAddSelfKeys), + ) +} // IPNS groups namesys related units var IPNS = fx.Options( @@ -75,33 +190,97 @@ var IPNS = fx.Options( ) // Providers groups units managing provider routing records -var Providers = fx.Options( - fx.Provide(ProviderQueue), - fx.Provide(ProviderCtor), - fx.Provide(ReproviderCtor), +func Providers(cfg *config.Config) fx.Option { + reproviderInterval := kReprovideFrequency + if cfg.Reprovider.Interval != "" { + dur, err := time.ParseDuration(cfg.Reprovider.Interval) + if err != nil { + return fx.Error(err) + } - fx.Invoke(Reprovider), -) + reproviderInterval = dur + } + + var keyProvider fx.Option + switch cfg.Reprovider.Strategy { + case "all": + fallthrough + case "": + keyProvider = fx.Provide(reprovide.NewBlockstoreProvider) + case "roots": + keyProvider = fx.Provide(reprovide.NewPinnedProvider(true)) + case "pinned": + keyProvider = fx.Provide(reprovide.NewPinnedProvider(false)) + default: + return fx.Error(fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy)) + } + + return fx.Options( + fx.Provide(ProviderQueue), + fx.Provide(ProviderCtor), + fx.Provide(ReproviderCtor(reproviderInterval)), + keyProvider, + + fx.Invoke(Reprovider), + ) +} // Online groups online-only units -func Online(cfg *BuildCfg) fx.Option { +func Online(bcfg *BuildCfg, cfg *config.Config) fx.Option { + + // Namesys params + + ipnsCacheSize := cfg.Ipns.ResolveCacheSize + if ipnsCacheSize == 0 { + ipnsCacheSize = DefaultIpnsCacheSize + } + if ipnsCacheSize < 0 { + return fx.Error(fmt.Errorf("cannot specify negative resolve cache size")) + } + + // Republisher params + + var repubPeriod, recordLifetime time.Duration + + if cfg.Ipns.RepublishPeriod != "" { + d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod) + if err != nil { + return fx.Error(fmt.Errorf("failure to parse config setting IPNS.RepublishPeriod: %s", err)) + } + + if !util.Debug && (d < time.Minute || d > (time.Hour*24)) { + return fx.Error(fmt.Errorf("config setting IPNS.RepublishPeriod is not between 1min and 1day: %s", d)) + } + + repubPeriod = d + } + + if cfg.Ipns.RecordLifetime != "" { + d, err := time.ParseDuration(cfg.Ipns.RecordLifetime) + if err != nil { + return fx.Error(fmt.Errorf("failure to parse config setting IPNS.RecordLifetime: %s", err)) + } + + recordLifetime = d + } + return fx.Options( fx.Provide(OnlineExchange), - fx.Provide(OnlineNamesys), + fx.Provide(Namesys(ipnsCacheSize)), - fx.Invoke(IpnsRepublisher), + fx.Invoke(IpnsRepublisher(repubPeriod, recordLifetime)), fx.Provide(p2p.New), - LibP2P(cfg), - Providers, + LibP2P(bcfg, cfg), + Providers(cfg), ) } // Offline groups offline alternatives to Online units var Offline = fx.Options( fx.Provide(offline.Exchange), - fx.Provide(OfflineNamesys), + fx.Provide(Namesys(0)), fx.Provide(offroute.NewOfflineRouter), fx.Provide(provider.NewOfflineProvider), ) @@ -115,9 +294,9 @@ var Core = fx.Options( fx.Provide(Files), ) -func Networked(cfg *BuildCfg) fx.Option { - if cfg.Online { - return Online(cfg) +func Networked(bcfg *BuildCfg, cfg *config.Config) fx.Option { + if bcfg.Online { + return Online(bcfg, cfg) } return Offline } @@ -136,20 +315,15 @@ func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option { // TEMP: setting global sharding switch here uio.UseHAMTSharding = cfg.Experimental.ShardingEnabled - - - - - return fx.Options( bcfgOpts, fx.Provide(baseProcess), - Storage(bcfg), - Identity, + Storage(bcfg, cfg), + Identity(cfg), IPNS, - Networked(bcfg), + Networked(bcfg, cfg), Core, ) diff --git a/core/node/identity.go b/core/node/identity.go index 336750082..46baa8494 100644 --- a/core/node/identity.go +++ b/core/node/identity.go @@ -1,50 +1,29 @@ package node import ( - "errors" "fmt" - "github.com/ipfs/go-ipfs-config" "github.com/libp2p/go-libp2p-crypto" "github.com/libp2p/go-libp2p-peer" ) -// PeerID loads peer identity form config -func PeerID(cfg *config.Config) (peer.ID, error) { - cid := cfg.Identity.PeerID - if cid == "" { - return "", errors.New("identity was not set in config (was 'ipfs init' run?)") +func PeerID(id peer.ID) func() peer.ID { + return func() peer.ID { + return id } - if len(cid) == 0 { - return "", errors.New("no peer ID in config! (was 'ipfs init' run?)") - } - - id, err := peer.IDB58Decode(cid) - if err != nil { - return "", fmt.Errorf("peer ID invalid: %s", err) - } - - return id, nil } // PrivateKey loads the private key from config -func PrivateKey(cfg *config.Config, id peer.ID) (crypto.PrivKey, error) { - if cfg.Identity.PrivKey == "" { - return nil, nil - } +func PrivateKey(sk crypto.PrivKey) func(id peer.ID) (crypto.PrivKey, error) { + return func(id peer.ID) (crypto.PrivKey, error) { + id2, err := peer.IDFromPrivateKey(sk) + if err != nil { + return nil, err + } - sk, err := cfg.Identity.DecodePrivateKey("passphrase todo!") - if err != nil { - return nil, err + if id2 != id { + return nil, fmt.Errorf("private key in config does not match id: %s != %s", id, id2) + } + return sk, nil } - - id2, err := peer.IDFromPrivateKey(sk) - if err != nil { - return nil, err - } - - if id2 != id { - return nil, fmt.Errorf("private key in config does not match id: %s != %s", id, id2) - } - return sk, nil } diff --git a/core/node/ipns.go b/core/node/ipns.go index 58e9955f0..1f2457603 100644 --- a/core/node/ipns.go +++ b/core/node/ipns.go @@ -4,7 +4,6 @@ import ( "fmt" "time" - "github.com/ipfs/go-ipfs-config" "github.com/ipfs/go-ipfs-util" "github.com/ipfs/go-ipns" "github.com/libp2p/go-libp2p-crypto" @@ -27,49 +26,31 @@ func RecordValidator(ps peerstore.Peerstore) record.Validator { } } -// OfflineNamesys creates namesys setup for offline operation -func OfflineNamesys(rt routing.IpfsRouting, repo repo.Repo) (namesys.NameSystem, error) { - return namesys.NewNameSystem(rt, repo.Datastore(), 0), nil -} - -// OnlineNamesys createn new namesys setup for online operation -func OnlineNamesys(rt routing.IpfsRouting, repo repo.Repo, cfg *config.Config) (namesys.NameSystem, error) { - cs := cfg.Ipns.ResolveCacheSize - if cs == 0 { - cs = DefaultIpnsCacheSize +// Namesys creates new name system +func Namesys(cacheSize int) func(rt routing.IpfsRouting, repo repo.Repo) (namesys.NameSystem, error) { + return func(rt routing.IpfsRouting, repo repo.Repo) (namesys.NameSystem, error) { + return namesys.NewNameSystem(rt, repo.Datastore(), cacheSize), nil } - if cs < 0 { - return nil, fmt.Errorf("cannot specify negative resolve cache size") - } - return namesys.NewNameSystem(rt, repo.Datastore(), cs), nil } // IpnsRepublisher runs new IPNS republisher service -func IpnsRepublisher(lc lcProcess, cfg *config.Config, namesys namesys.NameSystem, repo repo.Repo, privKey crypto.PrivKey) error { - repub := republisher.NewRepublisher(namesys, repo.Datastore(), privKey, repo.Keystore()) +func IpnsRepublisher(repubPeriod time.Duration, recordLifetime time.Duration) func(lcProcess, namesys.NameSystem, repo.Repo, crypto.PrivKey) error { + return func(lc lcProcess, namesys namesys.NameSystem, repo repo.Repo, privKey crypto.PrivKey) error { + repub := republisher.NewRepublisher(namesys, repo.Datastore(), privKey, repo.Keystore()) - if cfg.Ipns.RepublishPeriod != "" { - d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod) - if err != nil { - return fmt.Errorf("failure to parse config setting IPNS.RepublishPeriod: %s", err) + if repubPeriod != 0 { + if !util.Debug && (repubPeriod < time.Minute || repubPeriod > (time.Hour*24)) { + return fmt.Errorf("config setting IPNS.RepublishPeriod is not between 1min and 1day: %s", repubPeriod) + } + + repub.Interval = repubPeriod } - if !util.Debug && (d < time.Minute || d > (time.Hour*24)) { - return fmt.Errorf("config setting IPNS.RepublishPeriod is not between 1min and 1day: %s", d) + if recordLifetime != 0 { + repub.RecordLifetime = recordLifetime } - repub.Interval = d + lc.Append(repub.Run) + return nil } - - if cfg.Ipns.RecordLifetime != "" { - d, err := time.ParseDuration(cfg.Ipns.RecordLifetime) - if err != nil { - return fmt.Errorf("failure to parse config setting IPNS.RecordLifetime: %s", err) - } - - repub.RecordLifetime = d - } - - lc.Append(repub.Run) - return nil } diff --git a/core/node/libp2p/discovery.go b/core/node/libp2p/discovery.go index 1dd8def2e..f1351d646 100644 --- a/core/node/libp2p/discovery.go +++ b/core/node/libp2p/discovery.go @@ -4,12 +4,12 @@ import ( "context" "time" - "github.com/ipfs/go-ipfs-config" - "github.com/ipfs/go-ipfs/core/node/helpers" "github.com/libp2p/go-libp2p-host" "github.com/libp2p/go-libp2p-peerstore" "github.com/libp2p/go-libp2p/p2p/discovery" "go.uber.org/fx" + + "github.com/ipfs/go-ipfs/core/node/helpers" ) const discoveryConnTimeout = time.Second * 30 @@ -35,18 +35,19 @@ func DiscoveryHandler(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) } } -func SetupDiscovery(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config, host host.Host, handler *discoveryHandler) error { - if cfg.Discovery.MDNS.Enabled { - mdns := cfg.Discovery.MDNS - if mdns.Interval == 0 { - mdns.Interval = 5 +func SetupDiscovery(mdns bool, mdnsInterval int) func(helpers.MetricsCtx, fx.Lifecycle, host.Host, *discoveryHandler) error { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, handler *discoveryHandler) error { + if mdns { + if mdnsInterval == 0 { + mdnsInterval = 5 + } + service, err := discovery.NewMdnsService(helpers.LifecycleCtx(mctx, lc), host, time.Duration(mdnsInterval)*time.Second, discovery.ServiceTag) + if err != nil { + log.Error("mdns error: ", err) + return nil + } + service.RegisterNotifee(handler) } - service, err := discovery.NewMdnsService(helpers.LifecycleCtx(mctx, lc), host, time.Duration(mdns.Interval)*time.Second, discovery.ServiceTag) - if err != nil { - log.Error("mdns error: ", err) - return nil - } - service.RegisterNotifee(handler) + return nil } - return nil } diff --git a/core/node/libp2p/libp2p.go b/core/node/libp2p/libp2p.go index 95497386f..7faccb597 100644 --- a/core/node/libp2p/libp2p.go +++ b/core/node/libp2p/libp2p.go @@ -11,7 +11,6 @@ import ( "time" "github.com/ipfs/go-datastore" - "github.com/ipfs/go-ipfs-config" nilrouting "github.com/ipfs/go-ipfs-routing/none" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p" @@ -25,7 +24,6 @@ import ( "github.com/libp2p/go-libp2p-metrics" "github.com/libp2p/go-libp2p-peer" "github.com/libp2p/go-libp2p-peerstore" - "github.com/libp2p/go-libp2p-peerstore/pstoremem" "github.com/libp2p/go-libp2p-pnet" "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p-pubsub-router" @@ -87,38 +85,30 @@ var DHTOption RoutingOption = constructDHTRouting var DHTClientOption RoutingOption = constructClientDHTRouting var NilRouterOption RoutingOption = nilrouting.ConstructNilRouting -func Peerstore(id peer.ID, sk crypto.PrivKey) (peerstore.Peerstore, error) { - ps := pstoremem.NewPeerstore() - - if sk != nil { - if err := ps.AddPubKey(id, sk.GetPublic()); err != nil { - return nil, err - } - if err := ps.AddPrivKey(id, sk); err != nil { - return nil, err - } +func PstoreAddSelfKeys(id peer.ID, sk crypto.PrivKey, ps peerstore.Peerstore) error { + if err := ps.AddPubKey(id, sk.GetPublic()); err != nil { + return err } - return ps, nil + return ps.AddPrivKey(id, sk) } -func AddrFilters(cfg *config.Config) (opts Libp2pOpts, err error) { - for _, s := range cfg.Swarm.AddrFilters { - f, err := mamask.NewMask(s) - if err != nil { - return opts, fmt.Errorf("incorrectly formatted address filter in config: %s", s) +func AddrFilters(filters []string) func() (opts Libp2pOpts, err error) { + return func() (opts Libp2pOpts, err error) { + for _, s := range filters { + f, err := mamask.NewMask(s) + if err != nil { + return opts, fmt.Errorf("incorrectly formatted address filter in config: %s", s) + } + opts.Opts = append(opts.Opts, libp2p.FilterAddresses(f)) } - opts.Opts = append(opts.Opts, libp2p.FilterAddresses(f)) + return opts, nil } - return opts, nil } -func BandwidthCounter(cfg *config.Config) (opts Libp2pOpts, reporter metrics.Reporter) { +func BandwidthCounter() (opts Libp2pOpts, reporter metrics.Reporter) { reporter = metrics.NewBandwidthCounter() - - if !cfg.Swarm.DisableBandwidthMetrics { - opts.Opts = append(opts.Opts, libp2p.BandwidthReporter(reporter)) - } + opts.Opts = append(opts.Opts, libp2p.BandwidthReporter(reporter)) return opts, reporter } @@ -183,9 +173,9 @@ func PNetChecker(repo repo.Repo, ph host.Host, lc fx.Lifecycle) error { return nil } -func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) { +func makeAddrsFactory(announce []string, noAnnounce []string) (p2pbhost.AddrsFactory, error) { var annAddrs []ma.Multiaddr - for _, addr := range cfg.Announce { + for _, addr := range announce { maddr, err := ma.NewMultiaddr(addr) if err != nil { return nil, err @@ -195,7 +185,7 @@ func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) { filters := mafilter.NewFilters() noAnnAddrs := map[string]bool{} - for _, addr := range cfg.NoAnnounce { + for _, addr := range noAnnounce { f, err := mamask.NewMask(addr) if err == nil { filters.AddDialFilter(f) @@ -229,41 +219,23 @@ func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) { }, nil } -func AddrsFactory(cfg *config.Config) (opts Libp2pOpts, err error) { - addrsFactory, err := makeAddrsFactory(cfg.Addresses) - if err != nil { - return opts, err +func AddrsFactory(announce []string, noAnnounce []string) func() (opts Libp2pOpts, err error) { + return func() (opts Libp2pOpts, err error) { + addrsFactory, err := makeAddrsFactory(announce, noAnnounce) + if err != nil { + return opts, err + } + opts.Opts = append(opts.Opts, libp2p.AddrsFactory(addrsFactory)) + return } - opts.Opts = append(opts.Opts, libp2p.AddrsFactory(addrsFactory)) - return } -func ConnectionManager(cfg *config.Config) (opts Libp2pOpts, err error) { - grace := config.DefaultConnMgrGracePeriod - low := config.DefaultConnMgrHighWater - high := config.DefaultConnMgrHighWater - - switch cfg.Swarm.ConnMgr.Type { - case "": - // 'default' value is the basic connection manager +func ConnectionManager(low, high int, grace time.Duration) func() (opts Libp2pOpts, err error) { + return func() (opts Libp2pOpts, err error) { + cm := connmgr.NewConnManager(low, high, grace) + opts.Opts = append(opts.Opts, libp2p.ConnectionManager(cm)) return - case "none": - return opts, nil - case "basic": - grace, err = time.ParseDuration(cfg.Swarm.ConnMgr.GracePeriod) - if err != nil { - return opts, fmt.Errorf("parsing Swarm.ConnMgr.GracePeriod: %s", err) - } - - low = cfg.Swarm.ConnMgr.LowWater - high = cfg.Swarm.ConnMgr.HighWater - default: - return opts, fmt.Errorf("unrecognized ConnMgr.Type: %q", cfg.Swarm.ConnMgr.Type) } - - cm := connmgr.NewConnManager(low, high, grace) - opts.Opts = append(opts.Opts, libp2p.ConnectionManager(cm)) - return } func makeSmuxTransportOption(mplexExp bool) libp2p.Option { @@ -315,32 +287,29 @@ func SmuxTransport(mplex bool) func() (opts Libp2pOpts, err error) { } } -func NatPortMap(cfg *config.Config) (opts Libp2pOpts, err error) { - if !cfg.Swarm.DisableNatPortMap { - opts.Opts = append(opts.Opts, libp2p.NATPortMap()) - } +func NatPortMap() (opts Libp2pOpts, err error) { + opts.Opts = append(opts.Opts, libp2p.NATPortMap()) return } -func Relay(cfg *config.Config) (opts Libp2pOpts, err error) { - if cfg.Swarm.DisableRelay { - // Enabled by default. - opts.Opts = append(opts.Opts, libp2p.DisableRelay()) - } else { - relayOpts := []relay.RelayOpt{relay.OptDiscovery} - if cfg.Swarm.EnableRelayHop { - relayOpts = append(relayOpts, relay.OptHop) +func Relay(disable, enableHop bool) func() (opts Libp2pOpts, err error) { + return func() (opts Libp2pOpts, err error) { + if disable { + // Enabled by default. + opts.Opts = append(opts.Opts, libp2p.DisableRelay()) + } else { + relayOpts := []relay.RelayOpt{relay.OptDiscovery} + if enableHop { + relayOpts = append(relayOpts, relay.OptHop) + } + opts.Opts = append(opts.Opts, libp2p.EnableRelay(relayOpts...)) } - opts.Opts = append(opts.Opts, libp2p.EnableRelay(relayOpts...)) + return } - return } -func AutoRealy(cfg *config.Config) (opts Libp2pOpts, err error) { - // enable autorelay - if cfg.Swarm.EnableAutoRelay { - opts.Opts = append(opts.Opts, libp2p.EnableAutoRelay()) - } +func AutoRealy() (opts Libp2pOpts, err error) { + opts.Opts = append(opts.Opts, libp2p.EnableAutoRelay()) return } @@ -349,14 +318,12 @@ func DefaultTransports() (opts Libp2pOpts, err error) { return } -func QUIC(cfg *config.Config) (opts Libp2pOpts, err error) { - if cfg.Experimental.QUIC { - opts.Opts = append(opts.Opts, libp2p.Transport(libp2pquic.NewTransport)) - } +func QUIC() (opts Libp2pOpts, err error) { + opts.Opts = append(opts.Opts, libp2p.Transport(libp2pquic.NewTransport)) return } -func Security(enabled bool) interface{} { +func Security(enabled, preferTLS bool) interface{} { if !enabled { return func() (opts Libp2pOpts) { // TODO: shouldn't this be Errorf to guarantee visibility? @@ -366,8 +333,8 @@ func Security(enabled bool) interface{} { return opts } } - return func(cfg *config.Config) (opts Libp2pOpts) { - if cfg.Experimental.PreferTLS { + return func() (opts Libp2pOpts) { + if preferTLS { opts.Opts = append(opts.Opts, libp2p.ChainOptions(libp2p.Security(tls.ID, tls.New), libp2p.Security(secio.ID, secio.New))) } else { opts.Opts = append(opts.Opts, libp2p.ChainOptions(libp2p.Security(secio.ID, secio.New), libp2p.Security(tls.ID, tls.New))) @@ -524,58 +491,42 @@ func PubsubRouter(mctx helpers.MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) ( }, psRouter } -func AutoNATService(repo repo.Repo, mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config, host host.Host) error { - if !cfg.Swarm.EnableAutoNATService { - return nil - } +func AutoNATService(quic bool) func(repo repo.Repo, mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) error { + return func(repo repo.Repo, mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) error { + // collect private net option in case swarm.key is presented + opts, _, err := PNet(repo) + if err != nil { + // swarm key exists but was failed to decode + return err + } - // collect private net option in case swarm.key is presented - opts, _, err := PNet(repo) - if err != nil { - // swarm key exists but was failed to decode + if quic { + opts.Opts = append(opts.Opts, libp2p.DefaultTransports, libp2p.Transport(libp2pquic.NewTransport)) + } + + _, err = autonat.NewAutoNATService(helpers.LifecycleCtx(mctx, lc), host, opts.Opts...) return err } - - if cfg.Experimental.QUIC { - opts.Opts = append(opts.Opts, libp2p.DefaultTransports, libp2p.Transport(libp2pquic.NewTransport)) - } - - _, err = autonat.NewAutoNATService(helpers.LifecycleCtx(mctx, lc), host, opts.Opts...) - return err } -func Pubsub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, cfg *config.Config) (service *pubsub.PubSub, err error) { - var pubsubOptions []pubsub.Option - if cfg.Pubsub.DisableSigning { - pubsubOptions = append(pubsubOptions, pubsub.WithMessageSigning(false)) +func FloodSub(pubsubOptions ...pubsub.Option) interface{} { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) (service *pubsub.PubSub, err error) { + return pubsub.NewFloodSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...) } - - if cfg.Pubsub.StrictSignatureVerification { - pubsubOptions = append(pubsubOptions, pubsub.WithStrictSignatureVerification(true)) - } - - switch cfg.Pubsub.Router { - case "": - fallthrough - case "floodsub": - service, err = pubsub.NewFloodSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...) - - case "gossipsub": - service, err = pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...) - - default: - err = fmt.Errorf("Unknown pubsub router %s", cfg.Pubsub.Router) - } - - return service, err } -func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) { +func GossipSub(pubsubOptions ...pubsub.Option) interface{} { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) (service *pubsub.PubSub, err error) { + return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...) + } +} + +func listenAddresses(addresses []string) ([]ma.Multiaddr, error) { var listen []ma.Multiaddr - for _, addr := range cfg.Addresses.Swarm { + for _, addr := range addresses { maddr, err := ma.NewMultiaddr(addr) if err != nil { - return nil, fmt.Errorf("failure to parse config.Addresses.Swarm: %s", cfg.Addresses.Swarm) + return nil, fmt.Errorf("failure to parse config.Addresses.Swarm: %s", addresses) } listen = append(listen, maddr) } @@ -583,22 +534,24 @@ func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) { return listen, nil } -func StartListening(host host.Host, cfg *config.Config) error { - listenAddrs, err := listenAddresses(cfg) - if err != nil { - return err - } +func StartListening(addresses []string) func(host host.Host) error { + return func(host host.Host) error { + listenAddrs, err := listenAddresses(addresses) + if err != nil { + return err + } - // Actually start listening: - if err := host.Network().Listen(listenAddrs...); err != nil { - return err - } + // Actually start listening: + if err := host.Network().Listen(listenAddrs...); err != nil { + return err + } - // list out our addresses - addrs, err := host.Network().InterfaceListenAddresses() - if err != nil { - return err + // list out our addresses + addrs, err := host.Network().InterfaceListenAddresses() + if err != nil { + return err + } + log.Infof("Swarm listening at: %s", addrs) + return nil } - log.Infof("Swarm listening at: %s", addrs) - return nil } diff --git a/core/node/provider.go b/core/node/provider.go index 37b9637a7..e85a90914 100644 --- a/core/node/provider.go +++ b/core/node/provider.go @@ -2,16 +2,12 @@ package node import ( "context" - "fmt" "time" - "github.com/ipfs/go-ipfs-config" - "github.com/ipfs/go-ipld-format" "github.com/libp2p/go-libp2p-routing" "go.uber.org/fx" "github.com/ipfs/go-ipfs/core/node/helpers" - "github.com/ipfs/go-ipfs/pin" "github.com/ipfs/go-ipfs/provider" "github.com/ipfs/go-ipfs/repo" "github.com/ipfs/go-ipfs/reprovide" @@ -42,32 +38,10 @@ func ProviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *provider.Queu } // ReproviderCtor creates new reprovider -func ReproviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config, bs BaseBlocks, ds format.DAGService, pinning pin.Pinner, rt routing.IpfsRouting) (*reprovide.Reprovider, error) { - var keyProvider reprovide.KeyChanFunc - - reproviderInterval := kReprovideFrequency - if cfg.Reprovider.Interval != "" { - dur, err := time.ParseDuration(cfg.Reprovider.Interval) - if err != nil { - return nil, err - } - - reproviderInterval = dur +func ReproviderCtor(reproviderInterval time.Duration) func(helpers.MetricsCtx, fx.Lifecycle, routing.IpfsRouting, reprovide.KeyChanFunc) (*reprovide.Reprovider, error) { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, rt routing.IpfsRouting, keyProvider reprovide.KeyChanFunc) (*reprovide.Reprovider, error) { + return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil } - - switch cfg.Reprovider.Strategy { - case "all": - fallthrough - case "": - keyProvider = reprovide.NewBlockstoreProvider(bs) - case "roots": - keyProvider = reprovide.NewPinnedProvider(pinning, ds, true) - case "pinned": - keyProvider = reprovide.NewPinnedProvider(pinning, ds, false) - default: - return nil, fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy) - } - return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil } // Reprovider runs the reprovider service diff --git a/core/node/storage.go b/core/node/storage.go index 6b58be60b..5d705359b 100644 --- a/core/node/storage.go +++ b/core/node/storage.go @@ -42,8 +42,8 @@ func Datastore(repo repo.Repo) datastore.Datastore { type BaseBlocks blockstore.Blockstore // BaseBlockstoreCtor creates cached blockstore backed by the provided datastore -func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsCtx, repo repo.Repo, cfg *config.Config, lc fx.Lifecycle) (bs BaseBlocks, err error) { - return func(mctx helpers.MetricsCtx, repo repo.Repo, cfg *config.Config, lc fx.Lifecycle) (bs BaseBlocks, err error) { +func BaseBlockstoreCtor(cacheOpts blockstore.CacheOpts, nilRepo bool, hashOnRead bool) func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) { + return func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) { rds := &retrystore.Datastore{ Batching: repo.Datastore(), Delay: time.Millisecond * 200, @@ -54,12 +54,6 @@ func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsC bs = blockstore.NewBlockstore(rds) bs = &verifbs.VerifBS{Blockstore: bs} - opts := blockstore.DefaultCacheOpts() - opts.HasBloomFilterSize = cfg.Datastore.BloomFilterSize - if !permanent { - opts.HasBloomFilterSize = 0 - } - if !nilRepo { ctx, cancel := context.WithCancel(mctx) @@ -69,7 +63,7 @@ func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsC return nil }, }) - bs, err = blockstore.CachedBlockstore(ctx, bs, opts) + bs, err = blockstore.CachedBlockstore(ctx, bs, cacheOpts) if err != nil { return nil, err } @@ -78,7 +72,7 @@ func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsC bs = blockstore.NewIdStore(bs) bs = cidv0v1.NewBlockstore(bs) - if cfg.Datastore.HashOnRead { // TODO: review: this is how it was done originally, is there a reason we can't just pass this directly? + if hashOnRead { // TODO: review: this is how it was done originally, is there a reason we can't just pass this directly? bs.HashOnRead(true) } @@ -87,16 +81,23 @@ func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsC } // GcBlockstoreCtor wraps the base blockstore with GC and Filestore layers -func GcBlockstoreCtor(repo repo.Repo, bb BaseBlocks, cfg *config.Config) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore, fstore *filestore.Filestore) { +func GcBlockstoreCtor(bb BaseBlocks) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore) { gclocker = blockstore.NewGCLocker() gcbs = blockstore.NewGCBlockstore(bb, gclocker) - if cfg.Experimental.FilestoreEnabled || cfg.Experimental.UrlstoreEnabled { - // hash security - fstore = filestore.NewFilestore(bb, repo.FileManager()) // TODO: mark optional - gcbs = blockstore.NewGCBlockstore(fstore, gclocker) - gcbs = &verifbs.VerifBSGC{GCBlockstore: gcbs} - } + bs = gcbs + return +} + +// GcBlockstoreCtor wraps GcBlockstore and adds Filestore support +func FilestoreBlockstoreCtor(repo repo.Repo, bb BaseBlocks) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore, fstore *filestore.Filestore) { + gclocker, gcbs, bs = GcBlockstoreCtor(bb) + + // hash security + fstore = filestore.NewFilestore(bb, repo.FileManager()) + gcbs = blockstore.NewGCBlockstore(fstore, gclocker) + gcbs = &verifbs.VerifBSGC{GCBlockstore: gcbs} + bs = gcbs return } diff --git a/reprovide/providers.go b/reprovide/providers.go index 77b19e2f8..bef56a0b7 100644 --- a/reprovide/providers.go +++ b/reprovide/providers.go @@ -20,27 +20,29 @@ func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { } // NewPinnedProvider returns provider supplying pinned keys -func NewPinnedProvider(pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) KeyChanFunc { - return func(ctx context.Context) (<-chan cid.Cid, error) { - set, err := pinSet(ctx, pinning, dag, onlyRoots) - if err != nil { - return nil, err - } - - outCh := make(chan cid.Cid) - go func() { - defer close(outCh) - for c := range set.New { - select { - case <-ctx.Done(): - return - case outCh <- c: - } +func NewPinnedProvider(onlyRoots bool) func(pinning pin.Pinner, dag ipld.DAGService) KeyChanFunc { + return func(pinning pin.Pinner, dag ipld.DAGService) KeyChanFunc { + return func(ctx context.Context) (<-chan cid.Cid, error) { + set, err := pinSet(ctx, pinning, dag, onlyRoots) + if err != nil { + return nil, err } - }() + outCh := make(chan cid.Cid) + go func() { + defer close(outCh) + for c := range set.New { + select { + case <-ctx.Done(): + return + case outCh <- c: + } + } - return outCh, nil + }() + + return outCh, nil + } } }