Invert constructor config handling

License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:
Łukasz Magiera 2019-04-29 23:37:37 +02:00
parent 9bcf072ccb
commit ed514b9177
9 changed files with 402 additions and 337 deletions

View File

@ -71,13 +71,13 @@ type IpfsNode struct {
// Local node
Pinning pin.Pinner // the pinning manager
Mounts Mounts `optional:"true"` // current mount state, if any.
PrivateKey ic.PrivKey // the local node's private Key
PrivateKey ic.PrivKey `optional:"true"` // the local node's private Key
PNetFingerprint libp2p.PNetFingerprint `optional:"true"` // fingerprint of private network
// Services
Peerstore pstore.Peerstore `optional:"true"` // storage for other Peer instances
Blockstore bstore.GCBlockstore // the block store (lower level)
Filestore *filestore.Filestore // the filestore blockstore
Filestore *filestore.Filestore `optional:"true"` // the filestore blockstore
BaseBlocks node.BaseBlocks // the raw blockstore, no filestore wrapping
GCLocker bstore.GCLocker // the locker used to protect the blockstore during gc
Blocks bserv.BlockService // the block service, get/add blocks.

View File

@ -2,72 +2,187 @@ package node
import (
"context"
"errors"
"fmt"
"time"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-ipfs-config"
util "github.com/ipfs/go-ipfs-util"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/libp2p/go-libp2p-peerstore/pstoremem"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/ipfs/go-ipfs/core/node/libp2p"
"github.com/ipfs/go-ipfs/p2p"
"github.com/ipfs/go-ipfs/provider"
"github.com/ipfs/go-ipfs/reprovide"
offline "github.com/ipfs/go-ipfs-exchange-offline"
offroute "github.com/ipfs/go-ipfs-routing/offline"
uio "github.com/ipfs/go-unixfs/io"
"github.com/ipfs/go-path/resolver"
uio "github.com/ipfs/go-unixfs/io"
"go.uber.org/fx"
)
var BaseLibP2P = fx.Options(
fx.Provide(libp2p.AddrFilters),
fx.Provide(libp2p.BandwidthCounter),
fx.Provide(libp2p.PNet),
fx.Provide(libp2p.AddrsFactory),
fx.Provide(libp2p.ConnectionManager),
fx.Provide(libp2p.NatPortMap),
fx.Provide(libp2p.Relay),
fx.Provide(libp2p.AutoRealy),
fx.Provide(libp2p.DefaultTransports),
fx.Provide(libp2p.QUIC),
fx.Provide(libp2p.Host),
fx.Provide(libp2p.DiscoveryHandler),
fx.Invoke(libp2p.AutoNATService),
fx.Invoke(libp2p.PNetChecker),
fx.Invoke(libp2p.StartListening),
fx.Invoke(libp2p.SetupDiscovery),
)
func LibP2P(cfg *BuildCfg) fx.Option {
func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option {
// parse ConnMgr config
grace := config.DefaultConnMgrGracePeriod
low := config.DefaultConnMgrHighWater
high := config.DefaultConnMgrHighWater
connmgr := fx.Options()
if cfg.Swarm.ConnMgr.Type != "none" {
switch cfg.Swarm.ConnMgr.Type {
case "":
// 'default' value is the basic connection manager
break
case "basic":
var err error
grace, err = time.ParseDuration(cfg.Swarm.ConnMgr.GracePeriod)
if err != nil {
return fx.Error(fmt.Errorf("parsing Swarm.ConnMgr.GracePeriod: %s", err))
}
low = cfg.Swarm.ConnMgr.LowWater
high = cfg.Swarm.ConnMgr.HighWater
default:
return fx.Error(fmt.Errorf("unrecognized ConnMgr.Type: %q", cfg.Swarm.ConnMgr.Type))
}
connmgr = fx.Provide(libp2p.ConnectionManager(low, high, grace))
}
// parse PubSub config
ps := fx.Options()
if bcfg.getOpt("pubsub") || bcfg.getOpt("ipnsps") {
var pubsubOptions []pubsub.Option
if cfg.Pubsub.DisableSigning {
pubsubOptions = append(pubsubOptions, pubsub.WithMessageSigning(false))
}
if cfg.Pubsub.StrictSignatureVerification {
pubsubOptions = append(pubsubOptions, pubsub.WithStrictSignatureVerification(true))
}
switch cfg.Pubsub.Router {
case "":
fallthrough
case "floodsub":
ps = fx.Provide(libp2p.FloodSub(pubsubOptions...))
case "gossipsub":
ps = fx.Provide(libp2p.GossipSub(pubsubOptions...))
default:
return fx.Error(fmt.Errorf("unknown pubsub router %s", cfg.Pubsub.Router))
}
}
// Gather all the options
opts := fx.Options(
BaseLibP2P,
fx.Provide(libp2p.Security(!cfg.DisableEncryptedConnections)),
maybeProvide(libp2p.Pubsub, cfg.getOpt("pubsub") || cfg.getOpt("ipnsps")),
fx.Provide(libp2p.AddrFilters(cfg.Swarm.AddrFilters)),
fx.Invoke(libp2p.SetupDiscovery(cfg.Discovery.MDNS.Enabled, cfg.Discovery.MDNS.Interval)),
fx.Provide(libp2p.AddrsFactory(cfg.Addresses.Announce, cfg.Addresses.NoAnnounce)),
fx.Provide(libp2p.SmuxTransport(bcfg.getOpt("mplex"))),
fx.Provide(libp2p.Relay(cfg.Swarm.DisableRelay, cfg.Swarm.EnableRelayHop)),
fx.Invoke(libp2p.StartListening(cfg.Addresses.Swarm)),
fx.Provide(libp2p.Security(!bcfg.DisableEncryptedConnections, cfg.Experimental.PreferTLS)),
fx.Provide(libp2p.SmuxTransport(cfg.getOpt("mplex"))),
fx.Provide(libp2p.Routing),
fx.Provide(libp2p.BaseRouting),
maybeProvide(libp2p.PubsubRouter, cfg.getOpt("ipnsps")),
maybeProvide(libp2p.PubsubRouter, bcfg.getOpt("ipnsps")),
maybeProvide(libp2p.BandwidthCounter, !cfg.Swarm.DisableBandwidthMetrics),
maybeProvide(libp2p.NatPortMap, !cfg.Swarm.DisableNatPortMap),
maybeProvide(libp2p.AutoRealy, cfg.Swarm.EnableAutoRelay),
maybeProvide(libp2p.QUIC, cfg.Experimental.QUIC),
maybeProvide(libp2p.AutoNATService(cfg.Experimental.QUIC), cfg.Swarm.EnableAutoNATService),
connmgr,
ps,
)
return opts
}
// Storage groups units which setup datastore based persistence and blockstore layers
func Storage(cfg *BuildCfg) fx.Option {
func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option {
cacheOpts := blockstore.DefaultCacheOpts()
cacheOpts.HasBloomFilterSize = cfg.Datastore.BloomFilterSize
if !bcfg.Permanent {
cacheOpts.HasBloomFilterSize = 0
}
finalBstore := fx.Provide(GcBlockstoreCtor)
if cfg.Experimental.FilestoreEnabled || cfg.Experimental.UrlstoreEnabled {
finalBstore = fx.Provide(FilestoreBlockstoreCtor)
}
return fx.Options(
fx.Provide(RepoConfig),
fx.Provide(Datastore),
fx.Provide(BaseBlockstoreCtor(cfg.Permanent, cfg.NilRepo)),
fx.Provide(GcBlockstoreCtor),
fx.Provide(BaseBlockstoreCtor(cacheOpts, bcfg.NilRepo, cfg.Datastore.HashOnRead)),
finalBstore,
)
}
// Identity groups units providing cryptographic identity
var Identity = fx.Options(
fx.Provide(PeerID),
fx.Provide(PrivateKey),
fx.Provide(libp2p.Peerstore),
)
func Identity(cfg *config.Config) fx.Option {
// PeerID
cid := cfg.Identity.PeerID
if cid == "" {
return fx.Error(errors.New("identity was not set in config (was 'ipfs init' run?)"))
}
if len(cid) == 0 {
return fx.Error(errors.New("no peer ID in config! (was 'ipfs init' run?)"))
}
id, err := peer.IDB58Decode(cid)
if err != nil {
return fx.Error(fmt.Errorf("peer ID invalid: %s", err))
}
// Private Key
if cfg.Identity.PrivKey == "" {
return fx.Options( // No PK (usually in tests)
fx.Provide(PeerID(id)),
fx.Provide(pstoremem.NewPeerstore),
)
}
sk, err := cfg.Identity.DecodePrivateKey("passphrase todo!")
if err != nil {
return fx.Error(err)
}
return fx.Options( // Full identity
fx.Provide(PeerID(id)),
fx.Provide(PrivateKey(sk)),
fx.Provide(pstoremem.NewPeerstore),
fx.Invoke(libp2p.PstoreAddSelfKeys),
)
}
// IPNS groups namesys related units
var IPNS = fx.Options(
@ -75,33 +190,97 @@ var IPNS = fx.Options(
)
// Providers groups units managing provider routing records
var Providers = fx.Options(
fx.Provide(ProviderQueue),
fx.Provide(ProviderCtor),
fx.Provide(ReproviderCtor),
func Providers(cfg *config.Config) fx.Option {
reproviderInterval := kReprovideFrequency
if cfg.Reprovider.Interval != "" {
dur, err := time.ParseDuration(cfg.Reprovider.Interval)
if err != nil {
return fx.Error(err)
}
fx.Invoke(Reprovider),
)
reproviderInterval = dur
}
var keyProvider fx.Option
switch cfg.Reprovider.Strategy {
case "all":
fallthrough
case "":
keyProvider = fx.Provide(reprovide.NewBlockstoreProvider)
case "roots":
keyProvider = fx.Provide(reprovide.NewPinnedProvider(true))
case "pinned":
keyProvider = fx.Provide(reprovide.NewPinnedProvider(false))
default:
return fx.Error(fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy))
}
return fx.Options(
fx.Provide(ProviderQueue),
fx.Provide(ProviderCtor),
fx.Provide(ReproviderCtor(reproviderInterval)),
keyProvider,
fx.Invoke(Reprovider),
)
}
// Online groups online-only units
func Online(cfg *BuildCfg) fx.Option {
func Online(bcfg *BuildCfg, cfg *config.Config) fx.Option {
// Namesys params
ipnsCacheSize := cfg.Ipns.ResolveCacheSize
if ipnsCacheSize == 0 {
ipnsCacheSize = DefaultIpnsCacheSize
}
if ipnsCacheSize < 0 {
return fx.Error(fmt.Errorf("cannot specify negative resolve cache size"))
}
// Republisher params
var repubPeriod, recordLifetime time.Duration
if cfg.Ipns.RepublishPeriod != "" {
d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod)
if err != nil {
return fx.Error(fmt.Errorf("failure to parse config setting IPNS.RepublishPeriod: %s", err))
}
if !util.Debug && (d < time.Minute || d > (time.Hour*24)) {
return fx.Error(fmt.Errorf("config setting IPNS.RepublishPeriod is not between 1min and 1day: %s", d))
}
repubPeriod = d
}
if cfg.Ipns.RecordLifetime != "" {
d, err := time.ParseDuration(cfg.Ipns.RecordLifetime)
if err != nil {
return fx.Error(fmt.Errorf("failure to parse config setting IPNS.RecordLifetime: %s", err))
}
recordLifetime = d
}
return fx.Options(
fx.Provide(OnlineExchange),
fx.Provide(OnlineNamesys),
fx.Provide(Namesys(ipnsCacheSize)),
fx.Invoke(IpnsRepublisher),
fx.Invoke(IpnsRepublisher(repubPeriod, recordLifetime)),
fx.Provide(p2p.New),
LibP2P(cfg),
Providers,
LibP2P(bcfg, cfg),
Providers(cfg),
)
}
// Offline groups offline alternatives to Online units
var Offline = fx.Options(
fx.Provide(offline.Exchange),
fx.Provide(OfflineNamesys),
fx.Provide(Namesys(0)),
fx.Provide(offroute.NewOfflineRouter),
fx.Provide(provider.NewOfflineProvider),
)
@ -115,9 +294,9 @@ var Core = fx.Options(
fx.Provide(Files),
)
func Networked(cfg *BuildCfg) fx.Option {
if cfg.Online {
return Online(cfg)
func Networked(bcfg *BuildCfg, cfg *config.Config) fx.Option {
if bcfg.Online {
return Online(bcfg, cfg)
}
return Offline
}
@ -136,20 +315,15 @@ func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option {
// TEMP: setting global sharding switch here
uio.UseHAMTSharding = cfg.Experimental.ShardingEnabled
return fx.Options(
bcfgOpts,
fx.Provide(baseProcess),
Storage(bcfg),
Identity,
Storage(bcfg, cfg),
Identity(cfg),
IPNS,
Networked(bcfg),
Networked(bcfg, cfg),
Core,
)

View File

@ -1,50 +1,29 @@
package node
import (
"errors"
"fmt"
"github.com/ipfs/go-ipfs-config"
"github.com/libp2p/go-libp2p-crypto"
"github.com/libp2p/go-libp2p-peer"
)
// PeerID loads peer identity form config
func PeerID(cfg *config.Config) (peer.ID, error) {
cid := cfg.Identity.PeerID
if cid == "" {
return "", errors.New("identity was not set in config (was 'ipfs init' run?)")
func PeerID(id peer.ID) func() peer.ID {
return func() peer.ID {
return id
}
if len(cid) == 0 {
return "", errors.New("no peer ID in config! (was 'ipfs init' run?)")
}
id, err := peer.IDB58Decode(cid)
if err != nil {
return "", fmt.Errorf("peer ID invalid: %s", err)
}
return id, nil
}
// PrivateKey loads the private key from config
func PrivateKey(cfg *config.Config, id peer.ID) (crypto.PrivKey, error) {
if cfg.Identity.PrivKey == "" {
return nil, nil
}
func PrivateKey(sk crypto.PrivKey) func(id peer.ID) (crypto.PrivKey, error) {
return func(id peer.ID) (crypto.PrivKey, error) {
id2, err := peer.IDFromPrivateKey(sk)
if err != nil {
return nil, err
}
sk, err := cfg.Identity.DecodePrivateKey("passphrase todo!")
if err != nil {
return nil, err
if id2 != id {
return nil, fmt.Errorf("private key in config does not match id: %s != %s", id, id2)
}
return sk, nil
}
id2, err := peer.IDFromPrivateKey(sk)
if err != nil {
return nil, err
}
if id2 != id {
return nil, fmt.Errorf("private key in config does not match id: %s != %s", id, id2)
}
return sk, nil
}

View File

@ -4,7 +4,6 @@ import (
"fmt"
"time"
"github.com/ipfs/go-ipfs-config"
"github.com/ipfs/go-ipfs-util"
"github.com/ipfs/go-ipns"
"github.com/libp2p/go-libp2p-crypto"
@ -27,49 +26,31 @@ func RecordValidator(ps peerstore.Peerstore) record.Validator {
}
}
// OfflineNamesys creates namesys setup for offline operation
func OfflineNamesys(rt routing.IpfsRouting, repo repo.Repo) (namesys.NameSystem, error) {
return namesys.NewNameSystem(rt, repo.Datastore(), 0), nil
}
// OnlineNamesys createn new namesys setup for online operation
func OnlineNamesys(rt routing.IpfsRouting, repo repo.Repo, cfg *config.Config) (namesys.NameSystem, error) {
cs := cfg.Ipns.ResolveCacheSize
if cs == 0 {
cs = DefaultIpnsCacheSize
// Namesys creates new name system
func Namesys(cacheSize int) func(rt routing.IpfsRouting, repo repo.Repo) (namesys.NameSystem, error) {
return func(rt routing.IpfsRouting, repo repo.Repo) (namesys.NameSystem, error) {
return namesys.NewNameSystem(rt, repo.Datastore(), cacheSize), nil
}
if cs < 0 {
return nil, fmt.Errorf("cannot specify negative resolve cache size")
}
return namesys.NewNameSystem(rt, repo.Datastore(), cs), nil
}
// IpnsRepublisher runs new IPNS republisher service
func IpnsRepublisher(lc lcProcess, cfg *config.Config, namesys namesys.NameSystem, repo repo.Repo, privKey crypto.PrivKey) error {
repub := republisher.NewRepublisher(namesys, repo.Datastore(), privKey, repo.Keystore())
func IpnsRepublisher(repubPeriod time.Duration, recordLifetime time.Duration) func(lcProcess, namesys.NameSystem, repo.Repo, crypto.PrivKey) error {
return func(lc lcProcess, namesys namesys.NameSystem, repo repo.Repo, privKey crypto.PrivKey) error {
repub := republisher.NewRepublisher(namesys, repo.Datastore(), privKey, repo.Keystore())
if cfg.Ipns.RepublishPeriod != "" {
d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod)
if err != nil {
return fmt.Errorf("failure to parse config setting IPNS.RepublishPeriod: %s", err)
if repubPeriod != 0 {
if !util.Debug && (repubPeriod < time.Minute || repubPeriod > (time.Hour*24)) {
return fmt.Errorf("config setting IPNS.RepublishPeriod is not between 1min and 1day: %s", repubPeriod)
}
repub.Interval = repubPeriod
}
if !util.Debug && (d < time.Minute || d > (time.Hour*24)) {
return fmt.Errorf("config setting IPNS.RepublishPeriod is not between 1min and 1day: %s", d)
if recordLifetime != 0 {
repub.RecordLifetime = recordLifetime
}
repub.Interval = d
lc.Append(repub.Run)
return nil
}
if cfg.Ipns.RecordLifetime != "" {
d, err := time.ParseDuration(cfg.Ipns.RecordLifetime)
if err != nil {
return fmt.Errorf("failure to parse config setting IPNS.RecordLifetime: %s", err)
}
repub.RecordLifetime = d
}
lc.Append(repub.Run)
return nil
}

View File

@ -4,12 +4,12 @@ import (
"context"
"time"
"github.com/ipfs/go-ipfs-config"
"github.com/ipfs/go-ipfs/core/node/helpers"
"github.com/libp2p/go-libp2p-host"
"github.com/libp2p/go-libp2p-peerstore"
"github.com/libp2p/go-libp2p/p2p/discovery"
"go.uber.org/fx"
"github.com/ipfs/go-ipfs/core/node/helpers"
)
const discoveryConnTimeout = time.Second * 30
@ -35,18 +35,19 @@ func DiscoveryHandler(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host)
}
}
func SetupDiscovery(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config, host host.Host, handler *discoveryHandler) error {
if cfg.Discovery.MDNS.Enabled {
mdns := cfg.Discovery.MDNS
if mdns.Interval == 0 {
mdns.Interval = 5
func SetupDiscovery(mdns bool, mdnsInterval int) func(helpers.MetricsCtx, fx.Lifecycle, host.Host, *discoveryHandler) error {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, handler *discoveryHandler) error {
if mdns {
if mdnsInterval == 0 {
mdnsInterval = 5
}
service, err := discovery.NewMdnsService(helpers.LifecycleCtx(mctx, lc), host, time.Duration(mdnsInterval)*time.Second, discovery.ServiceTag)
if err != nil {
log.Error("mdns error: ", err)
return nil
}
service.RegisterNotifee(handler)
}
service, err := discovery.NewMdnsService(helpers.LifecycleCtx(mctx, lc), host, time.Duration(mdns.Interval)*time.Second, discovery.ServiceTag)
if err != nil {
log.Error("mdns error: ", err)
return nil
}
service.RegisterNotifee(handler)
return nil
}
return nil
}

View File

@ -11,7 +11,6 @@ import (
"time"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-ipfs-config"
nilrouting "github.com/ipfs/go-ipfs-routing/none"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p"
@ -25,7 +24,6 @@ import (
"github.com/libp2p/go-libp2p-metrics"
"github.com/libp2p/go-libp2p-peer"
"github.com/libp2p/go-libp2p-peerstore"
"github.com/libp2p/go-libp2p-peerstore/pstoremem"
"github.com/libp2p/go-libp2p-pnet"
"github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p-pubsub-router"
@ -87,38 +85,30 @@ var DHTOption RoutingOption = constructDHTRouting
var DHTClientOption RoutingOption = constructClientDHTRouting
var NilRouterOption RoutingOption = nilrouting.ConstructNilRouting
func Peerstore(id peer.ID, sk crypto.PrivKey) (peerstore.Peerstore, error) {
ps := pstoremem.NewPeerstore()
if sk != nil {
if err := ps.AddPubKey(id, sk.GetPublic()); err != nil {
return nil, err
}
if err := ps.AddPrivKey(id, sk); err != nil {
return nil, err
}
func PstoreAddSelfKeys(id peer.ID, sk crypto.PrivKey, ps peerstore.Peerstore) error {
if err := ps.AddPubKey(id, sk.GetPublic()); err != nil {
return err
}
return ps, nil
return ps.AddPrivKey(id, sk)
}
func AddrFilters(cfg *config.Config) (opts Libp2pOpts, err error) {
for _, s := range cfg.Swarm.AddrFilters {
f, err := mamask.NewMask(s)
if err != nil {
return opts, fmt.Errorf("incorrectly formatted address filter in config: %s", s)
func AddrFilters(filters []string) func() (opts Libp2pOpts, err error) {
return func() (opts Libp2pOpts, err error) {
for _, s := range filters {
f, err := mamask.NewMask(s)
if err != nil {
return opts, fmt.Errorf("incorrectly formatted address filter in config: %s", s)
}
opts.Opts = append(opts.Opts, libp2p.FilterAddresses(f))
}
opts.Opts = append(opts.Opts, libp2p.FilterAddresses(f))
return opts, nil
}
return opts, nil
}
func BandwidthCounter(cfg *config.Config) (opts Libp2pOpts, reporter metrics.Reporter) {
func BandwidthCounter() (opts Libp2pOpts, reporter metrics.Reporter) {
reporter = metrics.NewBandwidthCounter()
if !cfg.Swarm.DisableBandwidthMetrics {
opts.Opts = append(opts.Opts, libp2p.BandwidthReporter(reporter))
}
opts.Opts = append(opts.Opts, libp2p.BandwidthReporter(reporter))
return opts, reporter
}
@ -183,9 +173,9 @@ func PNetChecker(repo repo.Repo, ph host.Host, lc fx.Lifecycle) error {
return nil
}
func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) {
func makeAddrsFactory(announce []string, noAnnounce []string) (p2pbhost.AddrsFactory, error) {
var annAddrs []ma.Multiaddr
for _, addr := range cfg.Announce {
for _, addr := range announce {
maddr, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
@ -195,7 +185,7 @@ func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) {
filters := mafilter.NewFilters()
noAnnAddrs := map[string]bool{}
for _, addr := range cfg.NoAnnounce {
for _, addr := range noAnnounce {
f, err := mamask.NewMask(addr)
if err == nil {
filters.AddDialFilter(f)
@ -229,41 +219,23 @@ func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) {
}, nil
}
func AddrsFactory(cfg *config.Config) (opts Libp2pOpts, err error) {
addrsFactory, err := makeAddrsFactory(cfg.Addresses)
if err != nil {
return opts, err
func AddrsFactory(announce []string, noAnnounce []string) func() (opts Libp2pOpts, err error) {
return func() (opts Libp2pOpts, err error) {
addrsFactory, err := makeAddrsFactory(announce, noAnnounce)
if err != nil {
return opts, err
}
opts.Opts = append(opts.Opts, libp2p.AddrsFactory(addrsFactory))
return
}
opts.Opts = append(opts.Opts, libp2p.AddrsFactory(addrsFactory))
return
}
func ConnectionManager(cfg *config.Config) (opts Libp2pOpts, err error) {
grace := config.DefaultConnMgrGracePeriod
low := config.DefaultConnMgrHighWater
high := config.DefaultConnMgrHighWater
switch cfg.Swarm.ConnMgr.Type {
case "":
// 'default' value is the basic connection manager
func ConnectionManager(low, high int, grace time.Duration) func() (opts Libp2pOpts, err error) {
return func() (opts Libp2pOpts, err error) {
cm := connmgr.NewConnManager(low, high, grace)
opts.Opts = append(opts.Opts, libp2p.ConnectionManager(cm))
return
case "none":
return opts, nil
case "basic":
grace, err = time.ParseDuration(cfg.Swarm.ConnMgr.GracePeriod)
if err != nil {
return opts, fmt.Errorf("parsing Swarm.ConnMgr.GracePeriod: %s", err)
}
low = cfg.Swarm.ConnMgr.LowWater
high = cfg.Swarm.ConnMgr.HighWater
default:
return opts, fmt.Errorf("unrecognized ConnMgr.Type: %q", cfg.Swarm.ConnMgr.Type)
}
cm := connmgr.NewConnManager(low, high, grace)
opts.Opts = append(opts.Opts, libp2p.ConnectionManager(cm))
return
}
func makeSmuxTransportOption(mplexExp bool) libp2p.Option {
@ -315,32 +287,29 @@ func SmuxTransport(mplex bool) func() (opts Libp2pOpts, err error) {
}
}
func NatPortMap(cfg *config.Config) (opts Libp2pOpts, err error) {
if !cfg.Swarm.DisableNatPortMap {
opts.Opts = append(opts.Opts, libp2p.NATPortMap())
}
func NatPortMap() (opts Libp2pOpts, err error) {
opts.Opts = append(opts.Opts, libp2p.NATPortMap())
return
}
func Relay(cfg *config.Config) (opts Libp2pOpts, err error) {
if cfg.Swarm.DisableRelay {
// Enabled by default.
opts.Opts = append(opts.Opts, libp2p.DisableRelay())
} else {
relayOpts := []relay.RelayOpt{relay.OptDiscovery}
if cfg.Swarm.EnableRelayHop {
relayOpts = append(relayOpts, relay.OptHop)
func Relay(disable, enableHop bool) func() (opts Libp2pOpts, err error) {
return func() (opts Libp2pOpts, err error) {
if disable {
// Enabled by default.
opts.Opts = append(opts.Opts, libp2p.DisableRelay())
} else {
relayOpts := []relay.RelayOpt{relay.OptDiscovery}
if enableHop {
relayOpts = append(relayOpts, relay.OptHop)
}
opts.Opts = append(opts.Opts, libp2p.EnableRelay(relayOpts...))
}
opts.Opts = append(opts.Opts, libp2p.EnableRelay(relayOpts...))
return
}
return
}
func AutoRealy(cfg *config.Config) (opts Libp2pOpts, err error) {
// enable autorelay
if cfg.Swarm.EnableAutoRelay {
opts.Opts = append(opts.Opts, libp2p.EnableAutoRelay())
}
func AutoRealy() (opts Libp2pOpts, err error) {
opts.Opts = append(opts.Opts, libp2p.EnableAutoRelay())
return
}
@ -349,14 +318,12 @@ func DefaultTransports() (opts Libp2pOpts, err error) {
return
}
func QUIC(cfg *config.Config) (opts Libp2pOpts, err error) {
if cfg.Experimental.QUIC {
opts.Opts = append(opts.Opts, libp2p.Transport(libp2pquic.NewTransport))
}
func QUIC() (opts Libp2pOpts, err error) {
opts.Opts = append(opts.Opts, libp2p.Transport(libp2pquic.NewTransport))
return
}
func Security(enabled bool) interface{} {
func Security(enabled, preferTLS bool) interface{} {
if !enabled {
return func() (opts Libp2pOpts) {
// TODO: shouldn't this be Errorf to guarantee visibility?
@ -366,8 +333,8 @@ func Security(enabled bool) interface{} {
return opts
}
}
return func(cfg *config.Config) (opts Libp2pOpts) {
if cfg.Experimental.PreferTLS {
return func() (opts Libp2pOpts) {
if preferTLS {
opts.Opts = append(opts.Opts, libp2p.ChainOptions(libp2p.Security(tls.ID, tls.New), libp2p.Security(secio.ID, secio.New)))
} else {
opts.Opts = append(opts.Opts, libp2p.ChainOptions(libp2p.Security(secio.ID, secio.New), libp2p.Security(tls.ID, tls.New)))
@ -524,58 +491,42 @@ func PubsubRouter(mctx helpers.MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) (
}, psRouter
}
func AutoNATService(repo repo.Repo, mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config, host host.Host) error {
if !cfg.Swarm.EnableAutoNATService {
return nil
}
func AutoNATService(quic bool) func(repo repo.Repo, mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) error {
return func(repo repo.Repo, mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) error {
// collect private net option in case swarm.key is presented
opts, _, err := PNet(repo)
if err != nil {
// swarm key exists but was failed to decode
return err
}
// collect private net option in case swarm.key is presented
opts, _, err := PNet(repo)
if err != nil {
// swarm key exists but was failed to decode
if quic {
opts.Opts = append(opts.Opts, libp2p.DefaultTransports, libp2p.Transport(libp2pquic.NewTransport))
}
_, err = autonat.NewAutoNATService(helpers.LifecycleCtx(mctx, lc), host, opts.Opts...)
return err
}
if cfg.Experimental.QUIC {
opts.Opts = append(opts.Opts, libp2p.DefaultTransports, libp2p.Transport(libp2pquic.NewTransport))
}
_, err = autonat.NewAutoNATService(helpers.LifecycleCtx(mctx, lc), host, opts.Opts...)
return err
}
func Pubsub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, cfg *config.Config) (service *pubsub.PubSub, err error) {
var pubsubOptions []pubsub.Option
if cfg.Pubsub.DisableSigning {
pubsubOptions = append(pubsubOptions, pubsub.WithMessageSigning(false))
func FloodSub(pubsubOptions ...pubsub.Option) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) (service *pubsub.PubSub, err error) {
return pubsub.NewFloodSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...)
}
if cfg.Pubsub.StrictSignatureVerification {
pubsubOptions = append(pubsubOptions, pubsub.WithStrictSignatureVerification(true))
}
switch cfg.Pubsub.Router {
case "":
fallthrough
case "floodsub":
service, err = pubsub.NewFloodSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...)
case "gossipsub":
service, err = pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...)
default:
err = fmt.Errorf("Unknown pubsub router %s", cfg.Pubsub.Router)
}
return service, err
}
func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) {
func GossipSub(pubsubOptions ...pubsub.Option) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) (service *pubsub.PubSub, err error) {
return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...)
}
}
func listenAddresses(addresses []string) ([]ma.Multiaddr, error) {
var listen []ma.Multiaddr
for _, addr := range cfg.Addresses.Swarm {
for _, addr := range addresses {
maddr, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, fmt.Errorf("failure to parse config.Addresses.Swarm: %s", cfg.Addresses.Swarm)
return nil, fmt.Errorf("failure to parse config.Addresses.Swarm: %s", addresses)
}
listen = append(listen, maddr)
}
@ -583,22 +534,24 @@ func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) {
return listen, nil
}
func StartListening(host host.Host, cfg *config.Config) error {
listenAddrs, err := listenAddresses(cfg)
if err != nil {
return err
}
func StartListening(addresses []string) func(host host.Host) error {
return func(host host.Host) error {
listenAddrs, err := listenAddresses(addresses)
if err != nil {
return err
}
// Actually start listening:
if err := host.Network().Listen(listenAddrs...); err != nil {
return err
}
// Actually start listening:
if err := host.Network().Listen(listenAddrs...); err != nil {
return err
}
// list out our addresses
addrs, err := host.Network().InterfaceListenAddresses()
if err != nil {
return err
// list out our addresses
addrs, err := host.Network().InterfaceListenAddresses()
if err != nil {
return err
}
log.Infof("Swarm listening at: %s", addrs)
return nil
}
log.Infof("Swarm listening at: %s", addrs)
return nil
}

View File

@ -2,16 +2,12 @@ package node
import (
"context"
"fmt"
"time"
"github.com/ipfs/go-ipfs-config"
"github.com/ipfs/go-ipld-format"
"github.com/libp2p/go-libp2p-routing"
"go.uber.org/fx"
"github.com/ipfs/go-ipfs/core/node/helpers"
"github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/provider"
"github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs/reprovide"
@ -42,32 +38,10 @@ func ProviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *provider.Queu
}
// ReproviderCtor creates new reprovider
func ReproviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config, bs BaseBlocks, ds format.DAGService, pinning pin.Pinner, rt routing.IpfsRouting) (*reprovide.Reprovider, error) {
var keyProvider reprovide.KeyChanFunc
reproviderInterval := kReprovideFrequency
if cfg.Reprovider.Interval != "" {
dur, err := time.ParseDuration(cfg.Reprovider.Interval)
if err != nil {
return nil, err
}
reproviderInterval = dur
func ReproviderCtor(reproviderInterval time.Duration) func(helpers.MetricsCtx, fx.Lifecycle, routing.IpfsRouting, reprovide.KeyChanFunc) (*reprovide.Reprovider, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, rt routing.IpfsRouting, keyProvider reprovide.KeyChanFunc) (*reprovide.Reprovider, error) {
return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil
}
switch cfg.Reprovider.Strategy {
case "all":
fallthrough
case "":
keyProvider = reprovide.NewBlockstoreProvider(bs)
case "roots":
keyProvider = reprovide.NewPinnedProvider(pinning, ds, true)
case "pinned":
keyProvider = reprovide.NewPinnedProvider(pinning, ds, false)
default:
return nil, fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy)
}
return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil
}
// Reprovider runs the reprovider service

View File

@ -42,8 +42,8 @@ func Datastore(repo repo.Repo) datastore.Datastore {
type BaseBlocks blockstore.Blockstore
// BaseBlockstoreCtor creates cached blockstore backed by the provided datastore
func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsCtx, repo repo.Repo, cfg *config.Config, lc fx.Lifecycle) (bs BaseBlocks, err error) {
return func(mctx helpers.MetricsCtx, repo repo.Repo, cfg *config.Config, lc fx.Lifecycle) (bs BaseBlocks, err error) {
func BaseBlockstoreCtor(cacheOpts blockstore.CacheOpts, nilRepo bool, hashOnRead bool) func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) {
return func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) {
rds := &retrystore.Datastore{
Batching: repo.Datastore(),
Delay: time.Millisecond * 200,
@ -54,12 +54,6 @@ func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsC
bs = blockstore.NewBlockstore(rds)
bs = &verifbs.VerifBS{Blockstore: bs}
opts := blockstore.DefaultCacheOpts()
opts.HasBloomFilterSize = cfg.Datastore.BloomFilterSize
if !permanent {
opts.HasBloomFilterSize = 0
}
if !nilRepo {
ctx, cancel := context.WithCancel(mctx)
@ -69,7 +63,7 @@ func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsC
return nil
},
})
bs, err = blockstore.CachedBlockstore(ctx, bs, opts)
bs, err = blockstore.CachedBlockstore(ctx, bs, cacheOpts)
if err != nil {
return nil, err
}
@ -78,7 +72,7 @@ func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsC
bs = blockstore.NewIdStore(bs)
bs = cidv0v1.NewBlockstore(bs)
if cfg.Datastore.HashOnRead { // TODO: review: this is how it was done originally, is there a reason we can't just pass this directly?
if hashOnRead { // TODO: review: this is how it was done originally, is there a reason we can't just pass this directly?
bs.HashOnRead(true)
}
@ -87,16 +81,23 @@ func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsC
}
// GcBlockstoreCtor wraps the base blockstore with GC and Filestore layers
func GcBlockstoreCtor(repo repo.Repo, bb BaseBlocks, cfg *config.Config) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore, fstore *filestore.Filestore) {
func GcBlockstoreCtor(bb BaseBlocks) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore) {
gclocker = blockstore.NewGCLocker()
gcbs = blockstore.NewGCBlockstore(bb, gclocker)
if cfg.Experimental.FilestoreEnabled || cfg.Experimental.UrlstoreEnabled {
// hash security
fstore = filestore.NewFilestore(bb, repo.FileManager()) // TODO: mark optional
gcbs = blockstore.NewGCBlockstore(fstore, gclocker)
gcbs = &verifbs.VerifBSGC{GCBlockstore: gcbs}
}
bs = gcbs
return
}
// GcBlockstoreCtor wraps GcBlockstore and adds Filestore support
func FilestoreBlockstoreCtor(repo repo.Repo, bb BaseBlocks) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore, fstore *filestore.Filestore) {
gclocker, gcbs, bs = GcBlockstoreCtor(bb)
// hash security
fstore = filestore.NewFilestore(bb, repo.FileManager())
gcbs = blockstore.NewGCBlockstore(fstore, gclocker)
gcbs = &verifbs.VerifBSGC{GCBlockstore: gcbs}
bs = gcbs
return
}

View File

@ -20,27 +20,29 @@ func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc {
}
// NewPinnedProvider returns provider supplying pinned keys
func NewPinnedProvider(pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) KeyChanFunc {
return func(ctx context.Context) (<-chan cid.Cid, error) {
set, err := pinSet(ctx, pinning, dag, onlyRoots)
if err != nil {
return nil, err
}
outCh := make(chan cid.Cid)
go func() {
defer close(outCh)
for c := range set.New {
select {
case <-ctx.Done():
return
case outCh <- c:
}
func NewPinnedProvider(onlyRoots bool) func(pinning pin.Pinner, dag ipld.DAGService) KeyChanFunc {
return func(pinning pin.Pinner, dag ipld.DAGService) KeyChanFunc {
return func(ctx context.Context) (<-chan cid.Cid, error) {
set, err := pinSet(ctx, pinning, dag, onlyRoots)
if err != nil {
return nil, err
}
}()
outCh := make(chan cid.Cid)
go func() {
defer close(outCh)
for c := range set.New {
select {
case <-ctx.Done():
return
case outCh <- c:
}
}
return outCh, nil
}()
return outCh, nil
}
}
}