Merge pull request #6276 from ipfs/feat/core-cfg-handling

Invert constructor config handling
This commit is contained in:
Steven Allen 2019-04-29 16:09:16 -07:00 committed by GitHub
commit dd16b3ca1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 948 additions and 775 deletions

View File

@ -71,13 +71,13 @@ type IpfsNode struct {
// Local node
Pinning pin.Pinner // the pinning manager
Mounts Mounts `optional:"true"` // current mount state, if any.
PrivateKey ic.PrivKey // the local node's private Key
PrivateKey ic.PrivKey `optional:"true"` // the local node's private Key
PNetFingerprint libp2p.PNetFingerprint `optional:"true"` // fingerprint of private network
// Services
Peerstore pstore.Peerstore `optional:"true"` // storage for other Peer instances
Blockstore bstore.GCBlockstore // the block store (lower level)
Filestore *filestore.Filestore // the filestore blockstore
Filestore *filestore.Filestore `optional:"true"` // the filestore blockstore
BaseBlocks node.BaseBlocks // the raw blockstore, no filestore wrapping
GCLocker bstore.GCLocker // the locker used to protect the blockstore during gc
Blocks bserv.BlockService // the block service, get/add blocks.

View File

@ -84,10 +84,10 @@ func (cfg *BuildCfg) fillDefaults() error {
}
// options creates fx option group from this build config
func (cfg *BuildCfg) options(ctx context.Context) fx.Option {
func (cfg *BuildCfg) options(ctx context.Context) (fx.Option, *cfg.Config) {
err := cfg.fillDefaults()
if err != nil {
return fx.Error(err)
return fx.Error(err), nil
}
repoOption := fx.Provide(func(lc fx.Lifecycle) repo.Repo {
@ -112,12 +112,17 @@ func (cfg *BuildCfg) options(ctx context.Context) fx.Option {
return cfg.Routing
})
conf, err := cfg.Repo.Config()
if err != nil {
return fx.Error(err), nil
}
return fx.Options(
repoOption,
hostOption,
routingOption,
metricsCtx,
)
), conf
}
func defaultRepo(dstore repo.Datastore) (repo.Repo, error) {

View File

@ -2,71 +2,186 @@ package node
import (
"context"
"errors"
"fmt"
"time"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-ipfs-config"
util "github.com/ipfs/go-ipfs-util"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/libp2p/go-libp2p-peerstore/pstoremem"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/ipfs/go-ipfs/core/node/libp2p"
"github.com/ipfs/go-ipfs/p2p"
"github.com/ipfs/go-ipfs/provider"
"github.com/ipfs/go-ipfs/reprovide"
offline "github.com/ipfs/go-ipfs-exchange-offline"
offroute "github.com/ipfs/go-ipfs-routing/offline"
"github.com/ipfs/go-path/resolver"
uio "github.com/ipfs/go-unixfs/io"
"go.uber.org/fx"
)
var BaseLibP2P = fx.Options(
fx.Provide(libp2p.AddrFilters),
fx.Provide(libp2p.BandwidthCounter),
fx.Provide(libp2p.PNet),
fx.Provide(libp2p.AddrsFactory),
fx.Provide(libp2p.ConnectionManager),
fx.Provide(libp2p.NatPortMap),
fx.Provide(libp2p.Relay),
fx.Provide(libp2p.AutoRealy),
fx.Provide(libp2p.DefaultTransports),
fx.Provide(libp2p.QUIC),
fx.Provide(libp2p.Host),
fx.Provide(libp2p.DiscoveryHandler),
fx.Invoke(libp2p.AutoNATService),
fx.Invoke(libp2p.PNetChecker),
fx.Invoke(libp2p.StartListening),
fx.Invoke(libp2p.SetupDiscovery),
)
func LibP2P(cfg *BuildCfg) fx.Option {
func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option {
// parse ConnMgr config
grace := config.DefaultConnMgrGracePeriod
low := config.DefaultConnMgrHighWater
high := config.DefaultConnMgrHighWater
connmgr := fx.Options()
if cfg.Swarm.ConnMgr.Type != "none" {
switch cfg.Swarm.ConnMgr.Type {
case "":
// 'default' value is the basic connection manager
break
case "basic":
var err error
grace, err = time.ParseDuration(cfg.Swarm.ConnMgr.GracePeriod)
if err != nil {
return fx.Error(fmt.Errorf("parsing Swarm.ConnMgr.GracePeriod: %s", err))
}
low = cfg.Swarm.ConnMgr.LowWater
high = cfg.Swarm.ConnMgr.HighWater
default:
return fx.Error(fmt.Errorf("unrecognized ConnMgr.Type: %q", cfg.Swarm.ConnMgr.Type))
}
connmgr = fx.Provide(libp2p.ConnectionManager(low, high, grace))
}
// parse PubSub config
ps := fx.Options()
if bcfg.getOpt("pubsub") || bcfg.getOpt("ipnsps") {
var pubsubOptions []pubsub.Option
if cfg.Pubsub.DisableSigning {
pubsubOptions = append(pubsubOptions, pubsub.WithMessageSigning(false))
}
if cfg.Pubsub.StrictSignatureVerification {
pubsubOptions = append(pubsubOptions, pubsub.WithStrictSignatureVerification(true))
}
switch cfg.Pubsub.Router {
case "":
fallthrough
case "floodsub":
ps = fx.Provide(libp2p.FloodSub(pubsubOptions...))
case "gossipsub":
ps = fx.Provide(libp2p.GossipSub(pubsubOptions...))
default:
return fx.Error(fmt.Errorf("unknown pubsub router %s", cfg.Pubsub.Router))
}
}
// Gather all the options
opts := fx.Options(
BaseLibP2P,
fx.Provide(libp2p.Security(!cfg.DisableEncryptedConnections)),
maybeProvide(libp2p.Pubsub, cfg.getOpt("pubsub") || cfg.getOpt("ipnsps")),
fx.Provide(libp2p.AddrFilters(cfg.Swarm.AddrFilters)),
fx.Invoke(libp2p.SetupDiscovery(cfg.Discovery.MDNS.Enabled, cfg.Discovery.MDNS.Interval)),
fx.Provide(libp2p.AddrsFactory(cfg.Addresses.Announce, cfg.Addresses.NoAnnounce)),
fx.Provide(libp2p.SmuxTransport(bcfg.getOpt("mplex"))),
fx.Provide(libp2p.Relay(cfg.Swarm.DisableRelay, cfg.Swarm.EnableRelayHop)),
fx.Invoke(libp2p.StartListening(cfg.Addresses.Swarm)),
fx.Provide(libp2p.Security(!bcfg.DisableEncryptedConnections, cfg.Experimental.PreferTLS)),
fx.Provide(libp2p.SmuxTransport(cfg.getOpt("mplex"))),
fx.Provide(libp2p.Routing),
fx.Provide(libp2p.BaseRouting),
maybeProvide(libp2p.PubsubRouter, cfg.getOpt("ipnsps")),
maybeProvide(libp2p.PubsubRouter, bcfg.getOpt("ipnsps")),
maybeProvide(libp2p.BandwidthCounter, !cfg.Swarm.DisableBandwidthMetrics),
maybeProvide(libp2p.NatPortMap, !cfg.Swarm.DisableNatPortMap),
maybeProvide(libp2p.AutoRealy, cfg.Swarm.EnableAutoRelay),
maybeProvide(libp2p.QUIC, cfg.Experimental.QUIC),
maybeProvide(libp2p.AutoNATService(cfg.Experimental.QUIC), cfg.Swarm.EnableAutoNATService),
connmgr,
ps,
)
return opts
}
// Storage groups units which setup datastore based persistence and blockstore layers
func Storage(cfg *BuildCfg) fx.Option {
func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option {
cacheOpts := blockstore.DefaultCacheOpts()
cacheOpts.HasBloomFilterSize = cfg.Datastore.BloomFilterSize
if !bcfg.Permanent {
cacheOpts.HasBloomFilterSize = 0
}
finalBstore := fx.Provide(GcBlockstoreCtor)
if cfg.Experimental.FilestoreEnabled || cfg.Experimental.UrlstoreEnabled {
finalBstore = fx.Provide(FilestoreBlockstoreCtor)
}
return fx.Options(
fx.Provide(RepoConfig),
fx.Provide(Datastore),
fx.Provide(BaseBlockstoreCtor(cfg.Permanent, cfg.NilRepo)),
fx.Provide(GcBlockstoreCtor),
fx.Provide(BaseBlockstoreCtor(cacheOpts, bcfg.NilRepo, cfg.Datastore.HashOnRead)),
finalBstore,
)
}
// Identity groups units providing cryptographic identity
var Identity = fx.Options(
fx.Provide(PeerID),
fx.Provide(PrivateKey),
fx.Provide(libp2p.Peerstore),
)
func Identity(cfg *config.Config) fx.Option {
// PeerID
cid := cfg.Identity.PeerID
if cid == "" {
return fx.Error(errors.New("identity was not set in config (was 'ipfs init' run?)"))
}
if len(cid) == 0 {
return fx.Error(errors.New("no peer ID in config! (was 'ipfs init' run?)"))
}
id, err := peer.IDB58Decode(cid)
if err != nil {
return fx.Error(fmt.Errorf("peer ID invalid: %s", err))
}
// Private Key
if cfg.Identity.PrivKey == "" {
return fx.Options( // No PK (usually in tests)
fx.Provide(PeerID(id)),
fx.Provide(pstoremem.NewPeerstore),
)
}
sk, err := cfg.Identity.DecodePrivateKey("passphrase todo!")
if err != nil {
return fx.Error(err)
}
return fx.Options( // Full identity
fx.Provide(PeerID(id)),
fx.Provide(PrivateKey(sk)),
fx.Provide(pstoremem.NewPeerstore),
fx.Invoke(libp2p.PstoreAddSelfKeys),
)
}
// IPNS groups namesys related units
var IPNS = fx.Options(
@ -74,33 +189,97 @@ var IPNS = fx.Options(
)
// Providers groups units managing provider routing records
var Providers = fx.Options(
fx.Provide(ProviderQueue),
fx.Provide(ProviderCtor),
fx.Provide(ReproviderCtor),
func Providers(cfg *config.Config) fx.Option {
reproviderInterval := kReprovideFrequency
if cfg.Reprovider.Interval != "" {
dur, err := time.ParseDuration(cfg.Reprovider.Interval)
if err != nil {
return fx.Error(err)
}
fx.Invoke(Reprovider),
)
reproviderInterval = dur
}
var keyProvider fx.Option
switch cfg.Reprovider.Strategy {
case "all":
fallthrough
case "":
keyProvider = fx.Provide(reprovide.NewBlockstoreProvider)
case "roots":
keyProvider = fx.Provide(reprovide.NewPinnedProvider(true))
case "pinned":
keyProvider = fx.Provide(reprovide.NewPinnedProvider(false))
default:
return fx.Error(fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy))
}
return fx.Options(
fx.Provide(ProviderQueue),
fx.Provide(ProviderCtor),
fx.Provide(ReproviderCtor(reproviderInterval)),
keyProvider,
fx.Invoke(Reprovider),
)
}
// Online groups online-only units
func Online(cfg *BuildCfg) fx.Option {
func Online(bcfg *BuildCfg, cfg *config.Config) fx.Option {
// Namesys params
ipnsCacheSize := cfg.Ipns.ResolveCacheSize
if ipnsCacheSize == 0 {
ipnsCacheSize = DefaultIpnsCacheSize
}
if ipnsCacheSize < 0 {
return fx.Error(fmt.Errorf("cannot specify negative resolve cache size"))
}
// Republisher params
var repubPeriod, recordLifetime time.Duration
if cfg.Ipns.RepublishPeriod != "" {
d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod)
if err != nil {
return fx.Error(fmt.Errorf("failure to parse config setting IPNS.RepublishPeriod: %s", err))
}
if !util.Debug && (d < time.Minute || d > (time.Hour*24)) {
return fx.Error(fmt.Errorf("config setting IPNS.RepublishPeriod is not between 1min and 1day: %s", d))
}
repubPeriod = d
}
if cfg.Ipns.RecordLifetime != "" {
d, err := time.ParseDuration(cfg.Ipns.RecordLifetime)
if err != nil {
return fx.Error(fmt.Errorf("failure to parse config setting IPNS.RecordLifetime: %s", err))
}
recordLifetime = d
}
return fx.Options(
fx.Provide(OnlineExchange),
fx.Provide(OnlineNamesys),
fx.Provide(Namesys(ipnsCacheSize)),
fx.Invoke(IpnsRepublisher),
fx.Invoke(IpnsRepublisher(repubPeriod, recordLifetime)),
fx.Provide(p2p.New),
LibP2P(cfg),
Providers,
LibP2P(bcfg, cfg),
Providers(cfg),
)
}
// Offline groups offline alternatives to Online units
var Offline = fx.Options(
fx.Provide(offline.Exchange),
fx.Provide(OfflineNamesys),
fx.Provide(Namesys(0)),
fx.Provide(offroute.NewOfflineRouter),
fx.Provide(provider.NewOfflineProvider),
)
@ -114,29 +293,36 @@ var Core = fx.Options(
fx.Provide(Files),
)
func Networked(cfg *BuildCfg) fx.Option {
if cfg.Online {
return Online(cfg)
func Networked(bcfg *BuildCfg, cfg *config.Config) fx.Option {
if bcfg.Online {
return Online(bcfg, cfg)
}
return Offline
}
// IPFS builds a group of fx Options based on the passed BuildCfg
func IPFS(ctx context.Context, cfg *BuildCfg) fx.Option {
if cfg == nil {
cfg = new(BuildCfg)
func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option {
if bcfg == nil {
bcfg = new(BuildCfg)
}
bcfgOpts, cfg := bcfg.options(ctx)
if cfg == nil {
return bcfgOpts // error
}
// TEMP: setting global sharding switch here
uio.UseHAMTSharding = cfg.Experimental.ShardingEnabled
return fx.Options(
cfg.options(ctx),
bcfgOpts,
fx.Provide(baseProcess),
fx.Invoke(setupSharding),
Storage(cfg),
Identity,
Storage(bcfg, cfg),
Identity(cfg),
IPNS,
Networked(cfg),
Networked(bcfg, cfg),
Core,
)

View File

@ -3,8 +3,6 @@ package node
import (
"context"
config "github.com/ipfs/go-ipfs-config"
uio "github.com/ipfs/go-unixfs/io"
"github.com/jbenet/goprocess"
"github.com/pkg/errors"
"go.uber.org/fx"
@ -45,11 +43,6 @@ func maybeProvide(opt interface{}, enable bool) fx.Option {
return fx.Options()
}
func setupSharding(cfg *config.Config) {
// TEMP: setting global sharding switch here
uio.UseHAMTSharding = cfg.Experimental.ShardingEnabled
}
// baseProcess creates a goprocess which is closed when the lifecycle signals it to stop
func baseProcess(lc fx.Lifecycle) goprocess.Process {
p := goprocess.WithParent(goprocess.Background())

View File

@ -1,50 +1,29 @@
package node
import (
"errors"
"fmt"
"github.com/ipfs/go-ipfs-config"
"github.com/libp2p/go-libp2p-crypto"
"github.com/libp2p/go-libp2p-peer"
)
// PeerID loads peer identity form config
func PeerID(cfg *config.Config) (peer.ID, error) {
cid := cfg.Identity.PeerID
if cid == "" {
return "", errors.New("identity was not set in config (was 'ipfs init' run?)")
func PeerID(id peer.ID) func() peer.ID {
return func() peer.ID {
return id
}
if len(cid) == 0 {
return "", errors.New("no peer ID in config! (was 'ipfs init' run?)")
}
id, err := peer.IDB58Decode(cid)
if err != nil {
return "", fmt.Errorf("peer ID invalid: %s", err)
}
return id, nil
}
// PrivateKey loads the private key from config
func PrivateKey(cfg *config.Config, id peer.ID) (crypto.PrivKey, error) {
if cfg.Identity.PrivKey == "" {
return nil, nil
}
func PrivateKey(sk crypto.PrivKey) func(id peer.ID) (crypto.PrivKey, error) {
return func(id peer.ID) (crypto.PrivKey, error) {
id2, err := peer.IDFromPrivateKey(sk)
if err != nil {
return nil, err
}
sk, err := cfg.Identity.DecodePrivateKey("passphrase todo!")
if err != nil {
return nil, err
if id2 != id {
return nil, fmt.Errorf("private key in config does not match id: %s != %s", id, id2)
}
return sk, nil
}
id2, err := peer.IDFromPrivateKey(sk)
if err != nil {
return nil, err
}
if id2 != id {
return nil, fmt.Errorf("private key in config does not match id: %s != %s", id, id2)
}
return sk, nil
}

View File

@ -4,7 +4,6 @@ import (
"fmt"
"time"
"github.com/ipfs/go-ipfs-config"
"github.com/ipfs/go-ipfs-util"
"github.com/ipfs/go-ipns"
"github.com/libp2p/go-libp2p-crypto"
@ -27,49 +26,31 @@ func RecordValidator(ps peerstore.Peerstore) record.Validator {
}
}
// OfflineNamesys creates namesys setup for offline operation
func OfflineNamesys(rt routing.IpfsRouting, repo repo.Repo) (namesys.NameSystem, error) {
return namesys.NewNameSystem(rt, repo.Datastore(), 0), nil
}
// OnlineNamesys createn new namesys setup for online operation
func OnlineNamesys(rt routing.IpfsRouting, repo repo.Repo, cfg *config.Config) (namesys.NameSystem, error) {
cs := cfg.Ipns.ResolveCacheSize
if cs == 0 {
cs = DefaultIpnsCacheSize
// Namesys creates new name system
func Namesys(cacheSize int) func(rt routing.IpfsRouting, repo repo.Repo) (namesys.NameSystem, error) {
return func(rt routing.IpfsRouting, repo repo.Repo) (namesys.NameSystem, error) {
return namesys.NewNameSystem(rt, repo.Datastore(), cacheSize), nil
}
if cs < 0 {
return nil, fmt.Errorf("cannot specify negative resolve cache size")
}
return namesys.NewNameSystem(rt, repo.Datastore(), cs), nil
}
// IpnsRepublisher runs new IPNS republisher service
func IpnsRepublisher(lc lcProcess, cfg *config.Config, namesys namesys.NameSystem, repo repo.Repo, privKey crypto.PrivKey) error {
repub := republisher.NewRepublisher(namesys, repo.Datastore(), privKey, repo.Keystore())
func IpnsRepublisher(repubPeriod time.Duration, recordLifetime time.Duration) func(lcProcess, namesys.NameSystem, repo.Repo, crypto.PrivKey) error {
return func(lc lcProcess, namesys namesys.NameSystem, repo repo.Repo, privKey crypto.PrivKey) error {
repub := republisher.NewRepublisher(namesys, repo.Datastore(), privKey, repo.Keystore())
if cfg.Ipns.RepublishPeriod != "" {
d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod)
if err != nil {
return fmt.Errorf("failure to parse config setting IPNS.RepublishPeriod: %s", err)
if repubPeriod != 0 {
if !util.Debug && (repubPeriod < time.Minute || repubPeriod > (time.Hour*24)) {
return fmt.Errorf("config setting IPNS.RepublishPeriod is not between 1min and 1day: %s", repubPeriod)
}
repub.Interval = repubPeriod
}
if !util.Debug && (d < time.Minute || d > (time.Hour*24)) {
return fmt.Errorf("config setting IPNS.RepublishPeriod is not between 1min and 1day: %s", d)
if recordLifetime != 0 {
repub.RecordLifetime = recordLifetime
}
repub.Interval = d
lc.Append(repub.Run)
return nil
}
if cfg.Ipns.RecordLifetime != "" {
d, err := time.ParseDuration(cfg.Ipns.RecordLifetime)
if err != nil {
return fmt.Errorf("failure to parse config setting IPNS.RecordLifetime: %s", err)
}
repub.RecordLifetime = d
}
lc.Append(repub.Run)
return nil
}

117
core/node/libp2p/addrs.go Normal file
View File

@ -0,0 +1,117 @@
package libp2p
import (
"fmt"
"github.com/libp2p/go-libp2p"
host "github.com/libp2p/go-libp2p-host"
p2pbhost "github.com/libp2p/go-libp2p/p2p/host/basic"
mafilter "github.com/libp2p/go-maddr-filter"
ma "github.com/multiformats/go-multiaddr"
mamask "github.com/whyrusleeping/multiaddr-filter"
)
func AddrFilters(filters []string) func() (opts Libp2pOpts, err error) {
return func() (opts Libp2pOpts, err error) {
for _, s := range filters {
f, err := mamask.NewMask(s)
if err != nil {
return opts, fmt.Errorf("incorrectly formatted address filter in config: %s", s)
}
opts.Opts = append(opts.Opts, libp2p.FilterAddresses(f))
}
return opts, nil
}
}
func makeAddrsFactory(announce []string, noAnnounce []string) (p2pbhost.AddrsFactory, error) {
var annAddrs []ma.Multiaddr
for _, addr := range announce {
maddr, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
}
annAddrs = append(annAddrs, maddr)
}
filters := mafilter.NewFilters()
noAnnAddrs := map[string]bool{}
for _, addr := range noAnnounce {
f, err := mamask.NewMask(addr)
if err == nil {
filters.AddDialFilter(f)
continue
}
maddr, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
}
noAnnAddrs[string(maddr.Bytes())] = true
}
return func(allAddrs []ma.Multiaddr) []ma.Multiaddr {
var addrs []ma.Multiaddr
if len(annAddrs) > 0 {
addrs = annAddrs
} else {
addrs = allAddrs
}
var out []ma.Multiaddr
for _, maddr := range addrs {
// check for exact matches
ok := noAnnAddrs[string(maddr.Bytes())]
// check for /ipcidr matches
if !ok && !filters.AddrBlocked(maddr) {
out = append(out, maddr)
}
}
return out
}, nil
}
func AddrsFactory(announce []string, noAnnounce []string) func() (opts Libp2pOpts, err error) {
return func() (opts Libp2pOpts, err error) {
addrsFactory, err := makeAddrsFactory(announce, noAnnounce)
if err != nil {
return opts, err
}
opts.Opts = append(opts.Opts, libp2p.AddrsFactory(addrsFactory))
return
}
}
func listenAddresses(addresses []string) ([]ma.Multiaddr, error) {
var listen []ma.Multiaddr
for _, addr := range addresses {
maddr, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, fmt.Errorf("failure to parse config.Addresses.Swarm: %s", addresses)
}
listen = append(listen, maddr)
}
return listen, nil
}
func StartListening(addresses []string) func(host host.Host) error {
return func(host host.Host) error {
listenAddrs, err := listenAddresses(addresses)
if err != nil {
return err
}
// Actually start listening:
if err := host.Network().Listen(listenAddrs...); err != nil {
return err
}
// list out our addresses
addrs, err := host.Network().InterfaceListenAddresses()
if err != nil {
return err
}
log.Infof("Swarm listening at: %s", addrs)
return nil
}
}

View File

@ -4,12 +4,12 @@ import (
"context"
"time"
"github.com/ipfs/go-ipfs-config"
"github.com/ipfs/go-ipfs/core/node/helpers"
"github.com/libp2p/go-libp2p-host"
"github.com/libp2p/go-libp2p-peerstore"
"github.com/libp2p/go-libp2p/p2p/discovery"
"go.uber.org/fx"
"github.com/ipfs/go-ipfs/core/node/helpers"
)
const discoveryConnTimeout = time.Second * 30
@ -35,18 +35,19 @@ func DiscoveryHandler(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host)
}
}
func SetupDiscovery(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config, host host.Host, handler *discoveryHandler) error {
if cfg.Discovery.MDNS.Enabled {
mdns := cfg.Discovery.MDNS
if mdns.Interval == 0 {
mdns.Interval = 5
func SetupDiscovery(mdns bool, mdnsInterval int) func(helpers.MetricsCtx, fx.Lifecycle, host.Host, *discoveryHandler) error {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, handler *discoveryHandler) error {
if mdns {
if mdnsInterval == 0 {
mdnsInterval = 5
}
service, err := discovery.NewMdnsService(helpers.LifecycleCtx(mctx, lc), host, time.Duration(mdnsInterval)*time.Second, discovery.ServiceTag)
if err != nil {
log.Error("mdns error: ", err)
return nil
}
service.RegisterNotifee(handler)
}
service, err := discovery.NewMdnsService(helpers.LifecycleCtx(mctx, lc), host, time.Duration(mdns.Interval)*time.Second, discovery.ServiceTag)
if err != nil {
log.Error("mdns error: ", err)
return nil
}
service.RegisterNotifee(handler)
return nil
}
return nil
}

76
core/node/libp2p/host.go Normal file
View File

@ -0,0 +1,76 @@
package libp2p
import (
"context"
"github.com/libp2p/go-libp2p"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
record "github.com/libp2p/go-libp2p-record"
routing "github.com/libp2p/go-libp2p-routing"
routedhost "github.com/libp2p/go-libp2p/p2p/host/routed"
"go.uber.org/fx"
"github.com/ipfs/go-ipfs/core/node/helpers"
"github.com/ipfs/go-ipfs/repo"
)
type P2PHostIn struct {
fx.In
Repo repo.Repo
Validator record.Validator
HostOption HostOption
RoutingOption RoutingOption
ID peer.ID
Peerstore peerstore.Peerstore
Opts [][]libp2p.Option `group:"libp2p"`
}
type P2PHostOut struct {
fx.Out
Host host.Host
Routing BaseIpfsRouting
}
func Host(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHostOut, err error) {
opts := []libp2p.Option{libp2p.NoListenAddrs}
for _, o := range params.Opts {
opts = append(opts, o...)
}
ctx := helpers.LifecycleCtx(mctx, lc)
opts = append(opts, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
r, err := params.RoutingOption(ctx, h, params.Repo.Datastore(), params.Validator)
out.Routing = r
return r, err
}))
out.Host, err = params.HostOption(ctx, params.ID, params.Peerstore, opts...)
if err != nil {
return P2PHostOut{}, err
}
// this code is necessary just for tests: mock network constructions
// ignore the libp2p constructor options that actually construct the routing!
if out.Routing == nil {
r, err := params.RoutingOption(ctx, out.Host, params.Repo.Datastore(), params.Validator)
if err != nil {
return P2PHostOut{}, err
}
out.Routing = r
out.Host = routedhost.Wrap(out.Host, out.Routing)
}
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return out.Host.Close()
},
})
return out, err
}

View File

@ -0,0 +1,25 @@
package libp2p
import (
"context"
"fmt"
"github.com/libp2p/go-libp2p"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
)
type HostOption func(ctx context.Context, id peer.ID, ps peerstore.Peerstore, options ...libp2p.Option) (host.Host, error)
var DefaultHostOption HostOption = constructPeerHost
// isolates the complex initialization steps
func constructPeerHost(ctx context.Context, id peer.ID, ps peerstore.Peerstore, options ...libp2p.Option) (host.Host, error) {
pkey := ps.PrivKey(id)
if pkey == nil {
return nil, fmt.Errorf("missing private key for node ID: %s", id.Pretty())
}
options = append([]libp2p.Option{libp2p.Identity(pkey), libp2p.Peerstore(ps)}, options...)
return libp2p.New(ctx, options...)
}

View File

@ -1,604 +1,46 @@
package libp2p
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"os"
"sort"
"strings"
"time"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-ipfs-config"
nilrouting "github.com/ipfs/go-ipfs-routing/none"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-autonat-svc"
"github.com/libp2p/go-libp2p-circuit"
"github.com/libp2p/go-libp2p-connmgr"
"github.com/libp2p/go-libp2p-crypto"
"github.com/libp2p/go-libp2p-host"
"github.com/libp2p/go-libp2p-kad-dht"
dhtopts "github.com/libp2p/go-libp2p-kad-dht/opts"
"github.com/libp2p/go-libp2p-metrics"
"github.com/libp2p/go-libp2p-peer"
"github.com/libp2p/go-libp2p-peerstore"
"github.com/libp2p/go-libp2p-peerstore/pstoremem"
"github.com/libp2p/go-libp2p-pnet"
"github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p-pubsub-router"
"github.com/libp2p/go-libp2p-quic-transport"
"github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p-routing"
"github.com/libp2p/go-libp2p-routing-helpers"
secio "github.com/libp2p/go-libp2p-secio"
tls "github.com/libp2p/go-libp2p-tls"
p2pbhost "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p/p2p/host/routed"
mafilter "github.com/libp2p/go-maddr-filter"
smux "github.com/libp2p/go-stream-muxer"
ma "github.com/multiformats/go-multiaddr"
mplex "github.com/whyrusleeping/go-smux-multiplex"
yamux "github.com/whyrusleeping/go-smux-yamux"
mamask "github.com/whyrusleeping/multiaddr-filter"
"go.uber.org/fx"
"github.com/ipfs/go-ipfs/core/node/helpers"
"github.com/ipfs/go-ipfs/repo"
)
var log = logging.Logger("p2pnode")
type HostOption func(ctx context.Context, id peer.ID, ps peerstore.Peerstore, options ...libp2p.Option) (host.Host, error)
type RoutingOption func(context.Context, host.Host, datastore.Batching, record.Validator) (routing.IpfsRouting, error)
var DefaultHostOption HostOption = constructPeerHost
// isolates the complex initialization steps
func constructPeerHost(ctx context.Context, id peer.ID, ps peerstore.Peerstore, options ...libp2p.Option) (host.Host, error) {
pkey := ps.PrivKey(id)
if pkey == nil {
return nil, fmt.Errorf("missing private key for node ID: %s", id.Pretty())
}
options = append([]libp2p.Option{libp2p.Identity(pkey), libp2p.Peerstore(ps)}, options...)
return libp2p.New(ctx, options...)
}
func constructDHTRouting(ctx context.Context, host host.Host, dstore datastore.Batching, validator record.Validator) (routing.IpfsRouting, error) {
return dht.New(
ctx, host,
dhtopts.Datastore(dstore),
dhtopts.Validator(validator),
)
}
func constructClientDHTRouting(ctx context.Context, host host.Host, dstore datastore.Batching, validator record.Validator) (routing.IpfsRouting, error) {
return dht.New(
ctx, host,
dhtopts.Client(true),
dhtopts.Datastore(dstore),
dhtopts.Validator(validator),
)
}
var DHTOption RoutingOption = constructDHTRouting
var DHTClientOption RoutingOption = constructClientDHTRouting
var NilRouterOption RoutingOption = nilrouting.ConstructNilRouting
func Peerstore(id peer.ID, sk crypto.PrivKey) (peerstore.Peerstore, error) {
ps := pstoremem.NewPeerstore()
if sk != nil {
if err := ps.AddPubKey(id, sk.GetPublic()); err != nil {
return nil, err
}
if err := ps.AddPrivKey(id, sk); err != nil {
return nil, err
}
}
return ps, nil
}
func AddrFilters(cfg *config.Config) (opts Libp2pOpts, err error) {
for _, s := range cfg.Swarm.AddrFilters {
f, err := mamask.NewMask(s)
if err != nil {
return opts, fmt.Errorf("incorrectly formatted address filter in config: %s", s)
}
opts.Opts = append(opts.Opts, libp2p.FilterAddresses(f))
}
return opts, nil
}
func BandwidthCounter(cfg *config.Config) (opts Libp2pOpts, reporter metrics.Reporter) {
reporter = metrics.NewBandwidthCounter()
if !cfg.Swarm.DisableBandwidthMetrics {
opts.Opts = append(opts.Opts, libp2p.BandwidthReporter(reporter))
}
return opts, reporter
}
type Libp2pOpts struct {
fx.Out
Opts []libp2p.Option `group:"libp2p"`
}
type PNetFingerprint []byte
// Misc options
func PNet(repo repo.Repo) (opts Libp2pOpts, fp PNetFingerprint, err error) {
swarmkey, err := repo.SwarmKey()
if err != nil || swarmkey == nil {
return opts, nil, err
}
protec, err := pnet.NewProtector(bytes.NewReader(swarmkey))
if err != nil {
return opts, nil, fmt.Errorf("failed to configure private network: %s", err)
}
fp = protec.Fingerprint()
opts.Opts = append(opts.Opts, libp2p.PrivateNetwork(protec))
return opts, fp, nil
}
func PNetChecker(repo repo.Repo, ph host.Host, lc fx.Lifecycle) error {
// TODO: better check?
swarmkey, err := repo.SwarmKey()
if err != nil || swarmkey == nil {
return err
}
done := make(chan struct{})
lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
go func() {
t := time.NewTicker(30 * time.Second)
defer t.Stop()
<-t.C // swallow one tick
for {
select {
case <-t.C:
if len(ph.Network().Peers()) == 0 {
log.Warning("We are in private network and have no peers.")
log.Warning("This might be configuration mistake.")
}
case <-done:
return
}
}
}()
return nil
},
OnStop: func(_ context.Context) error {
close(done)
return nil
},
})
return nil
}
func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) {
var annAddrs []ma.Multiaddr
for _, addr := range cfg.Announce {
maddr, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
}
annAddrs = append(annAddrs, maddr)
}
filters := mafilter.NewFilters()
noAnnAddrs := map[string]bool{}
for _, addr := range cfg.NoAnnounce {
f, err := mamask.NewMask(addr)
if err == nil {
filters.AddDialFilter(f)
continue
}
maddr, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
}
noAnnAddrs[string(maddr.Bytes())] = true
}
return func(allAddrs []ma.Multiaddr) []ma.Multiaddr {
var addrs []ma.Multiaddr
if len(annAddrs) > 0 {
addrs = annAddrs
} else {
addrs = allAddrs
}
var out []ma.Multiaddr
for _, maddr := range addrs {
// check for exact matches
ok := noAnnAddrs[string(maddr.Bytes())]
// check for /ipcidr matches
if !ok && !filters.AddrBlocked(maddr) {
out = append(out, maddr)
}
}
return out
}, nil
}
func AddrsFactory(cfg *config.Config) (opts Libp2pOpts, err error) {
addrsFactory, err := makeAddrsFactory(cfg.Addresses)
if err != nil {
return opts, err
}
opts.Opts = append(opts.Opts, libp2p.AddrsFactory(addrsFactory))
return
}
func ConnectionManager(cfg *config.Config) (opts Libp2pOpts, err error) {
grace := config.DefaultConnMgrGracePeriod
low := config.DefaultConnMgrHighWater
high := config.DefaultConnMgrHighWater
switch cfg.Swarm.ConnMgr.Type {
case "":
// 'default' value is the basic connection manager
return
case "none":
return opts, nil
case "basic":
grace, err = time.ParseDuration(cfg.Swarm.ConnMgr.GracePeriod)
if err != nil {
return opts, fmt.Errorf("parsing Swarm.ConnMgr.GracePeriod: %s", err)
}
low = cfg.Swarm.ConnMgr.LowWater
high = cfg.Swarm.ConnMgr.HighWater
default:
return opts, fmt.Errorf("unrecognized ConnMgr.Type: %q", cfg.Swarm.ConnMgr.Type)
}
cm := connmgr.NewConnManager(low, high, grace)
opts.Opts = append(opts.Opts, libp2p.ConnectionManager(cm))
return
}
func makeSmuxTransportOption(mplexExp bool) libp2p.Option {
const yamuxID = "/yamux/1.0.0"
const mplexID = "/mplex/6.7.0"
ymxtpt := &yamux.Transport{
AcceptBacklog: 512,
ConnectionWriteTimeout: time.Second * 10,
KeepAliveInterval: time.Second * 30,
EnableKeepAlive: true,
MaxStreamWindowSize: uint32(16 * 1024 * 1024), // 16MiB
LogOutput: ioutil.Discard,
}
if os.Getenv("YAMUX_DEBUG") != "" {
ymxtpt.LogOutput = os.Stderr
}
muxers := map[string]smux.Transport{yamuxID: ymxtpt}
if mplexExp {
muxers[mplexID] = mplex.DefaultTransport
}
// Allow muxer preference order overriding
order := []string{yamuxID, mplexID}
if prefs := os.Getenv("LIBP2P_MUX_PREFS"); prefs != "" {
order = strings.Fields(prefs)
}
opts := make([]libp2p.Option, 0, len(order))
for _, id := range order {
tpt, ok := muxers[id]
if !ok {
log.Warning("unknown or duplicate muxer in LIBP2P_MUX_PREFS: %s", id)
continue
}
delete(muxers, id)
opts = append(opts, libp2p.Muxer(id, tpt))
}
return libp2p.ChainOptions(opts...)
}
func SmuxTransport(mplex bool) func() (opts Libp2pOpts, err error) {
func ConnectionManager(low, high int, grace time.Duration) func() (opts Libp2pOpts, err error) {
return func() (opts Libp2pOpts, err error) {
opts.Opts = append(opts.Opts, makeSmuxTransportOption(mplex))
cm := connmgr.NewConnManager(low, high, grace)
opts.Opts = append(opts.Opts, libp2p.ConnectionManager(cm))
return
}
}
func NatPortMap(cfg *config.Config) (opts Libp2pOpts, err error) {
if !cfg.Swarm.DisableNatPortMap {
opts.Opts = append(opts.Opts, libp2p.NATPortMap())
}
return
}
func Relay(cfg *config.Config) (opts Libp2pOpts, err error) {
if cfg.Swarm.DisableRelay {
// Enabled by default.
opts.Opts = append(opts.Opts, libp2p.DisableRelay())
} else {
relayOpts := []relay.RelayOpt{relay.OptDiscovery}
if cfg.Swarm.EnableRelayHop {
relayOpts = append(relayOpts, relay.OptHop)
}
opts.Opts = append(opts.Opts, libp2p.EnableRelay(relayOpts...))
}
return
}
func AutoRealy(cfg *config.Config) (opts Libp2pOpts, err error) {
// enable autorelay
if cfg.Swarm.EnableAutoRelay {
opts.Opts = append(opts.Opts, libp2p.EnableAutoRelay())
}
return
}
func DefaultTransports() (opts Libp2pOpts, err error) {
opts.Opts = append(opts.Opts, libp2p.DefaultTransports)
return
}
func QUIC(cfg *config.Config) (opts Libp2pOpts, err error) {
if cfg.Experimental.QUIC {
opts.Opts = append(opts.Opts, libp2p.Transport(libp2pquic.NewTransport))
}
return
}
func Security(enabled bool) interface{} {
if !enabled {
return func() (opts Libp2pOpts) {
// TODO: shouldn't this be Errorf to guarantee visibility?
log.Warningf(`Your IPFS node has been configured to run WITHOUT ENCRYPTED CONNECTIONS.
You will not be able to connect to any nodes configured to use encrypted connections`)
opts.Opts = append(opts.Opts, libp2p.NoSecurity)
return opts
}
}
return func(cfg *config.Config) (opts Libp2pOpts) {
if cfg.Experimental.PreferTLS {
opts.Opts = append(opts.Opts, libp2p.ChainOptions(libp2p.Security(tls.ID, tls.New), libp2p.Security(secio.ID, secio.New)))
} else {
opts.Opts = append(opts.Opts, libp2p.ChainOptions(libp2p.Security(secio.ID, secio.New), libp2p.Security(tls.ID, tls.New)))
}
return opts
}
}
type P2PHostIn struct {
fx.In
Repo repo.Repo
Validator record.Validator
HostOption HostOption
RoutingOption RoutingOption
ID peer.ID
Peerstore peerstore.Peerstore
Opts [][]libp2p.Option `group:"libp2p"`
}
type BaseIpfsRouting routing.IpfsRouting
type P2PHostOut struct {
fx.Out
Host host.Host
Routing BaseIpfsRouting
}
func Host(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHostOut, err error) {
opts := []libp2p.Option{libp2p.NoListenAddrs}
for _, o := range params.Opts {
opts = append(opts, o...)
}
ctx := helpers.LifecycleCtx(mctx, lc)
opts = append(opts, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
r, err := params.RoutingOption(ctx, h, params.Repo.Datastore(), params.Validator)
out.Routing = r
return r, err
}))
out.Host, err = params.HostOption(ctx, params.ID, params.Peerstore, opts...)
if err != nil {
return P2PHostOut{}, err
}
// this code is necessary just for tests: mock network constructions
// ignore the libp2p constructor options that actually construct the routing!
if out.Routing == nil {
r, err := params.RoutingOption(ctx, out.Host, params.Repo.Datastore(), params.Validator)
if err != nil {
return P2PHostOut{}, err
}
out.Routing = r
out.Host = routedhost.Wrap(out.Host, out.Routing)
}
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return out.Host.Close()
},
})
return out, err
}
type Router struct {
routing.IpfsRouting
Priority int // less = more important
}
type p2pRouterOut struct {
fx.Out
Router Router `group:"routers"`
}
func BaseRouting(lc fx.Lifecycle, in BaseIpfsRouting) (out p2pRouterOut, dr *dht.IpfsDHT) {
if dht, ok := in.(*dht.IpfsDHT); ok {
dr = dht
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return dr.Close()
},
})
}
return p2pRouterOut{
Router: Router{
Priority: 1000,
IpfsRouting: in,
},
}, dr
}
type p2pOnlineRoutingIn struct {
fx.In
Routers []Router `group:"routers"`
Validator record.Validator
}
func Routing(in p2pOnlineRoutingIn) routing.IpfsRouting {
routers := in.Routers
sort.SliceStable(routers, func(i, j int) bool {
return routers[i].Priority < routers[j].Priority
})
irouters := make([]routing.IpfsRouting, len(routers))
for i, v := range routers {
irouters[i] = v.IpfsRouting
}
return routinghelpers.Tiered{
Routers: irouters,
Validator: in.Validator,
}
}
type p2pPSRoutingIn struct {
fx.In
BaseRouting BaseIpfsRouting
Repo repo.Repo
Validator record.Validator
Host host.Host
PubSub *pubsub.PubSub `optional:"true"`
}
func PubsubRouter(mctx helpers.MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) (p2pRouterOut, *namesys.PubsubValueStore) {
psRouter := namesys.NewPubsubValueStore(
helpers.LifecycleCtx(mctx, lc),
in.Host,
in.BaseRouting,
in.PubSub,
in.Validator,
)
return p2pRouterOut{
Router: Router{
IpfsRouting: &routinghelpers.Compose{
ValueStore: &routinghelpers.LimitedValueStore{
ValueStore: psRouter,
Namespaces: []string{"ipns"},
},
},
Priority: 100,
},
}, psRouter
}
func AutoNATService(repo repo.Repo, mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config, host host.Host) error {
if !cfg.Swarm.EnableAutoNATService {
return nil
}
// collect private net option in case swarm.key is presented
opts, _, err := PNet(repo)
if err != nil {
// swarm key exists but was failed to decode
func PstoreAddSelfKeys(id peer.ID, sk crypto.PrivKey, ps peerstore.Peerstore) error {
if err := ps.AddPubKey(id, sk.GetPublic()); err != nil {
return err
}
if cfg.Experimental.QUIC {
opts.Opts = append(opts.Opts, libp2p.DefaultTransports, libp2p.Transport(libp2pquic.NewTransport))
}
_, err = autonat.NewAutoNATService(helpers.LifecycleCtx(mctx, lc), host, opts.Opts...)
return err
return ps.AddPrivKey(id, sk)
}
func Pubsub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, cfg *config.Config) (service *pubsub.PubSub, err error) {
var pubsubOptions []pubsub.Option
if cfg.Pubsub.DisableSigning {
pubsubOptions = append(pubsubOptions, pubsub.WithMessageSigning(false))
func simpleOpt(opt libp2p.Option) func() (opts Libp2pOpts, err error) {
return func() (opts Libp2pOpts, err error) {
opts.Opts = append(opts.Opts, opt)
return
}
if cfg.Pubsub.StrictSignatureVerification {
pubsubOptions = append(pubsubOptions, pubsub.WithStrictSignatureVerification(true))
}
switch cfg.Pubsub.Router {
case "":
fallthrough
case "floodsub":
service, err = pubsub.NewFloodSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...)
case "gossipsub":
service, err = pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...)
default:
err = fmt.Errorf("Unknown pubsub router %s", cfg.Pubsub.Router)
}
return service, err
}
func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) {
var listen []ma.Multiaddr
for _, addr := range cfg.Addresses.Swarm {
maddr, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, fmt.Errorf("failure to parse config.Addresses.Swarm: %s", cfg.Addresses.Swarm)
}
listen = append(listen, maddr)
}
return listen, nil
}
func StartListening(host host.Host, cfg *config.Config) error {
listenAddrs, err := listenAddresses(cfg)
if err != nil {
return err
}
// Actually start listening:
if err := host.Network().Listen(listenAddrs...); err != nil {
return err
}
// list out our addresses
addrs, err := host.Network().InterfaceListenAddresses()
if err != nil {
return err
}
log.Infof("Swarm listening at: %s", addrs)
return nil
}

32
core/node/libp2p/nat.go Normal file
View File

@ -0,0 +1,32 @@
package libp2p
import (
"github.com/libp2p/go-libp2p"
autonat "github.com/libp2p/go-libp2p-autonat-svc"
host "github.com/libp2p/go-libp2p-host"
libp2pquic "github.com/libp2p/go-libp2p-quic-transport"
"go.uber.org/fx"
"github.com/ipfs/go-ipfs/core/node/helpers"
"github.com/ipfs/go-ipfs/repo"
)
var NatPortMap = simpleOpt(libp2p.NATPortMap())
func AutoNATService(quic bool) func(repo repo.Repo, mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) error {
return func(repo repo.Repo, mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) error {
// collect private net option in case swarm.key is presented
opts, _, err := PNet(repo)
if err != nil {
// swarm key exists but was failed to decode
return err
}
if quic {
opts.Opts = append(opts.Opts, libp2p.DefaultTransports, libp2p.Transport(libp2pquic.NewTransport))
}
_, err = autonat.NewAutoNATService(helpers.LifecycleCtx(mctx, lc), host, opts.Opts...)
return err
}
}

70
core/node/libp2p/pnet.go Normal file
View File

@ -0,0 +1,70 @@
package libp2p
import (
"bytes"
"context"
"fmt"
"time"
"github.com/libp2p/go-libp2p"
host "github.com/libp2p/go-libp2p-host"
pnet "github.com/libp2p/go-libp2p-pnet"
"go.uber.org/fx"
"github.com/ipfs/go-ipfs/repo"
)
type PNetFingerprint []byte
func PNet(repo repo.Repo) (opts Libp2pOpts, fp PNetFingerprint, err error) {
swarmkey, err := repo.SwarmKey()
if err != nil || swarmkey == nil {
return opts, nil, err
}
protec, err := pnet.NewProtector(bytes.NewReader(swarmkey))
if err != nil {
return opts, nil, fmt.Errorf("failed to configure private network: %s", err)
}
fp = protec.Fingerprint()
opts.Opts = append(opts.Opts, libp2p.PrivateNetwork(protec))
return opts, fp, nil
}
func PNetChecker(repo repo.Repo, ph host.Host, lc fx.Lifecycle) error {
// TODO: better check?
swarmkey, err := repo.SwarmKey()
if err != nil || swarmkey == nil {
return err
}
done := make(chan struct{})
lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
go func() {
t := time.NewTicker(30 * time.Second)
defer t.Stop()
<-t.C // swallow one tick
for {
select {
case <-t.C:
if len(ph.Network().Peers()) == 0 {
log.Warning("We are in private network and have no peers.")
log.Warning("This might be configuration mistake.")
}
case <-done:
return
}
}
}()
return nil
},
OnStop: func(_ context.Context) error {
close(done)
return nil
},
})
return nil
}

View File

@ -0,0 +1,21 @@
package libp2p
import (
host "github.com/libp2p/go-libp2p-host"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"
"github.com/ipfs/go-ipfs/core/node/helpers"
)
func FloodSub(pubsubOptions ...pubsub.Option) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) (service *pubsub.PubSub, err error) {
return pubsub.NewFloodSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...)
}
}
func GossipSub(pubsubOptions ...pubsub.Option) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) (service *pubsub.PubSub, err error) {
return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...)
}
}

24
core/node/libp2p/relay.go Normal file
View File

@ -0,0 +1,24 @@
package libp2p
import (
"github.com/libp2p/go-libp2p"
relay "github.com/libp2p/go-libp2p-circuit"
)
func Relay(disable, enableHop bool) func() (opts Libp2pOpts, err error) {
return func() (opts Libp2pOpts, err error) {
if disable {
// Enabled by default.
opts.Opts = append(opts.Opts, libp2p.DisableRelay())
} else {
relayOpts := []relay.RelayOpt{relay.OptDiscovery}
if enableHop {
relayOpts = append(relayOpts, relay.OptHop)
}
opts.Opts = append(opts.Opts, libp2p.EnableRelay(relayOpts...))
}
return
}
}
var AutoRealy = simpleOpt(libp2p.EnableAutoRelay())

108
core/node/libp2p/routing.go Normal file
View File

@ -0,0 +1,108 @@
package libp2p
import (
"context"
"sort"
host "github.com/libp2p/go-libp2p-host"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-pubsub"
namesys "github.com/libp2p/go-libp2p-pubsub-router"
record "github.com/libp2p/go-libp2p-record"
routing "github.com/libp2p/go-libp2p-routing"
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
"go.uber.org/fx"
"github.com/ipfs/go-ipfs/core/node/helpers"
"github.com/ipfs/go-ipfs/repo"
)
type BaseIpfsRouting routing.IpfsRouting
type Router struct {
routing.IpfsRouting
Priority int // less = more important
}
type p2pRouterOut struct {
fx.Out
Router Router `group:"routers"`
}
func BaseRouting(lc fx.Lifecycle, in BaseIpfsRouting) (out p2pRouterOut, dr *dht.IpfsDHT) {
if dht, ok := in.(*dht.IpfsDHT); ok {
dr = dht
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return dr.Close()
},
})
}
return p2pRouterOut{
Router: Router{
Priority: 1000,
IpfsRouting: in,
},
}, dr
}
type p2pOnlineRoutingIn struct {
fx.In
Routers []Router `group:"routers"`
Validator record.Validator
}
func Routing(in p2pOnlineRoutingIn) routing.IpfsRouting {
routers := in.Routers
sort.SliceStable(routers, func(i, j int) bool {
return routers[i].Priority < routers[j].Priority
})
irouters := make([]routing.IpfsRouting, len(routers))
for i, v := range routers {
irouters[i] = v.IpfsRouting
}
return routinghelpers.Tiered{
Routers: irouters,
Validator: in.Validator,
}
}
type p2pPSRoutingIn struct {
fx.In
BaseRouting BaseIpfsRouting
Repo repo.Repo
Validator record.Validator
Host host.Host
PubSub *pubsub.PubSub `optional:"true"`
}
func PubsubRouter(mctx helpers.MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) (p2pRouterOut, *namesys.PubsubValueStore) {
psRouter := namesys.NewPubsubValueStore(
helpers.LifecycleCtx(mctx, lc),
in.Host,
in.BaseRouting,
in.PubSub,
in.Validator,
)
return p2pRouterOut{
Router: Router{
IpfsRouting: &routinghelpers.Compose{
ValueStore: &routinghelpers.LimitedValueStore{
ValueStore: psRouter,
Namespaces: []string{"ipns"},
},
},
Priority: 100,
},
}, psRouter
}

View File

@ -0,0 +1,36 @@
package libp2p
import (
"context"
"github.com/ipfs/go-datastore"
nilrouting "github.com/ipfs/go-ipfs-routing/none"
host "github.com/libp2p/go-libp2p-host"
dht "github.com/libp2p/go-libp2p-kad-dht"
dhtopts "github.com/libp2p/go-libp2p-kad-dht/opts"
record "github.com/libp2p/go-libp2p-record"
routing "github.com/libp2p/go-libp2p-routing"
)
type RoutingOption func(context.Context, host.Host, datastore.Batching, record.Validator) (routing.IpfsRouting, error)
func constructDHTRouting(ctx context.Context, host host.Host, dstore datastore.Batching, validator record.Validator) (routing.IpfsRouting, error) {
return dht.New(
ctx, host,
dhtopts.Datastore(dstore),
dhtopts.Validator(validator),
)
}
func constructClientDHTRouting(ctx context.Context, host host.Host, dstore datastore.Batching, validator record.Validator) (routing.IpfsRouting, error) {
return dht.New(
ctx, host,
dhtopts.Client(true),
dhtopts.Datastore(dstore),
dhtopts.Validator(validator),
)
}
var DHTOption RoutingOption = constructDHTRouting
var DHTClientOption RoutingOption = constructClientDHTRouting
var NilRouterOption RoutingOption = nilrouting.ConstructNilRouting

62
core/node/libp2p/smux.go Normal file
View File

@ -0,0 +1,62 @@
package libp2p
import (
"io/ioutil"
"os"
"strings"
"time"
"github.com/libp2p/go-libp2p"
smux "github.com/libp2p/go-stream-muxer"
mplex "github.com/whyrusleeping/go-smux-multiplex"
yamux "github.com/whyrusleeping/go-smux-yamux"
)
func makeSmuxTransportOption(mplexExp bool) libp2p.Option {
const yamuxID = "/yamux/1.0.0"
const mplexID = "/mplex/6.7.0"
ymxtpt := &yamux.Transport{
AcceptBacklog: 512,
ConnectionWriteTimeout: time.Second * 10,
KeepAliveInterval: time.Second * 30,
EnableKeepAlive: true,
MaxStreamWindowSize: uint32(16 * 1024 * 1024), // 16MiB
LogOutput: ioutil.Discard,
}
if os.Getenv("YAMUX_DEBUG") != "" {
ymxtpt.LogOutput = os.Stderr
}
muxers := map[string]smux.Transport{yamuxID: ymxtpt}
if mplexExp {
muxers[mplexID] = mplex.DefaultTransport
}
// Allow muxer preference order overriding
order := []string{yamuxID, mplexID}
if prefs := os.Getenv("LIBP2P_MUX_PREFS"); prefs != "" {
order = strings.Fields(prefs)
}
opts := make([]libp2p.Option, 0, len(order))
for _, id := range order {
tpt, ok := muxers[id]
if !ok {
log.Warning("unknown or duplicate muxer in LIBP2P_MUX_PREFS: %s", id)
continue
}
delete(muxers, id)
opts = append(opts, libp2p.Muxer(id, tpt))
}
return libp2p.ChainOptions(opts...)
}
func SmuxTransport(mplex bool) func() (opts Libp2pOpts, err error) {
return func() (opts Libp2pOpts, err error) {
opts.Opts = append(opts.Opts, makeSmuxTransportOption(mplex))
return
}
}

View File

@ -0,0 +1,38 @@
package libp2p
import (
"github.com/libp2p/go-libp2p"
metrics "github.com/libp2p/go-libp2p-metrics"
libp2pquic "github.com/libp2p/go-libp2p-quic-transport"
secio "github.com/libp2p/go-libp2p-secio"
tls "github.com/libp2p/go-libp2p-tls"
)
var DefaultTransports = simpleOpt(libp2p.DefaultTransports)
var QUIC = simpleOpt(libp2p.Transport(libp2pquic.NewTransport))
func Security(enabled, preferTLS bool) interface{} {
if !enabled {
return func() (opts Libp2pOpts) {
// TODO: shouldn't this be Errorf to guarantee visibility?
log.Warningf(`Your IPFS node has been configured to run WITHOUT ENCRYPTED CONNECTIONS.
You will not be able to connect to any nodes configured to use encrypted connections`)
opts.Opts = append(opts.Opts, libp2p.NoSecurity)
return opts
}
}
return func() (opts Libp2pOpts) {
if preferTLS {
opts.Opts = append(opts.Opts, libp2p.ChainOptions(libp2p.Security(tls.ID, tls.New), libp2p.Security(secio.ID, secio.New)))
} else {
opts.Opts = append(opts.Opts, libp2p.ChainOptions(libp2p.Security(secio.ID, secio.New), libp2p.Security(tls.ID, tls.New)))
}
return opts
}
}
func BandwidthCounter() (opts Libp2pOpts, reporter metrics.Reporter) {
reporter = metrics.NewBandwidthCounter()
opts.Opts = append(opts.Opts, libp2p.BandwidthReporter(reporter))
return opts, reporter
}

View File

@ -2,16 +2,12 @@ package node
import (
"context"
"fmt"
"time"
"github.com/ipfs/go-ipfs-config"
"github.com/ipfs/go-ipld-format"
"github.com/libp2p/go-libp2p-routing"
"go.uber.org/fx"
"github.com/ipfs/go-ipfs/core/node/helpers"
"github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/provider"
"github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs/reprovide"
@ -42,32 +38,10 @@ func ProviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *provider.Queu
}
// ReproviderCtor creates new reprovider
func ReproviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config, bs BaseBlocks, ds format.DAGService, pinning pin.Pinner, rt routing.IpfsRouting) (*reprovide.Reprovider, error) {
var keyProvider reprovide.KeyChanFunc
reproviderInterval := kReprovideFrequency
if cfg.Reprovider.Interval != "" {
dur, err := time.ParseDuration(cfg.Reprovider.Interval)
if err != nil {
return nil, err
}
reproviderInterval = dur
func ReproviderCtor(reproviderInterval time.Duration) func(helpers.MetricsCtx, fx.Lifecycle, routing.IpfsRouting, reprovide.KeyChanFunc) (*reprovide.Reprovider, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, rt routing.IpfsRouting, keyProvider reprovide.KeyChanFunc) (*reprovide.Reprovider, error) {
return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil
}
switch cfg.Reprovider.Strategy {
case "all":
fallthrough
case "":
keyProvider = reprovide.NewBlockstoreProvider(bs)
case "roots":
keyProvider = reprovide.NewPinnedProvider(pinning, ds, true)
case "pinned":
keyProvider = reprovide.NewPinnedProvider(pinning, ds, false)
default:
return nil, fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy)
}
return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil
}
// Reprovider runs the reprovider service

View File

@ -42,8 +42,8 @@ func Datastore(repo repo.Repo) datastore.Datastore {
type BaseBlocks blockstore.Blockstore
// BaseBlockstoreCtor creates cached blockstore backed by the provided datastore
func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsCtx, repo repo.Repo, cfg *config.Config, lc fx.Lifecycle) (bs BaseBlocks, err error) {
return func(mctx helpers.MetricsCtx, repo repo.Repo, cfg *config.Config, lc fx.Lifecycle) (bs BaseBlocks, err error) {
func BaseBlockstoreCtor(cacheOpts blockstore.CacheOpts, nilRepo bool, hashOnRead bool) func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) {
return func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) {
rds := &retrystore.Datastore{
Batching: repo.Datastore(),
Delay: time.Millisecond * 200,
@ -54,12 +54,6 @@ func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsC
bs = blockstore.NewBlockstore(rds)
bs = &verifbs.VerifBS{Blockstore: bs}
opts := blockstore.DefaultCacheOpts()
opts.HasBloomFilterSize = cfg.Datastore.BloomFilterSize
if !permanent {
opts.HasBloomFilterSize = 0
}
if !nilRepo {
ctx, cancel := context.WithCancel(mctx)
@ -69,7 +63,7 @@ func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsC
return nil
},
})
bs, err = blockstore.CachedBlockstore(ctx, bs, opts)
bs, err = blockstore.CachedBlockstore(ctx, bs, cacheOpts)
if err != nil {
return nil, err
}
@ -78,7 +72,7 @@ func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsC
bs = blockstore.NewIdStore(bs)
bs = cidv0v1.NewBlockstore(bs)
if cfg.Datastore.HashOnRead { // TODO: review: this is how it was done originally, is there a reason we can't just pass this directly?
if hashOnRead { // TODO: review: this is how it was done originally, is there a reason we can't just pass this directly?
bs.HashOnRead(true)
}
@ -87,16 +81,23 @@ func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsC
}
// GcBlockstoreCtor wraps the base blockstore with GC and Filestore layers
func GcBlockstoreCtor(repo repo.Repo, bb BaseBlocks, cfg *config.Config) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore, fstore *filestore.Filestore) {
func GcBlockstoreCtor(bb BaseBlocks) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore) {
gclocker = blockstore.NewGCLocker()
gcbs = blockstore.NewGCBlockstore(bb, gclocker)
if cfg.Experimental.FilestoreEnabled || cfg.Experimental.UrlstoreEnabled {
// hash security
fstore = filestore.NewFilestore(bb, repo.FileManager()) // TODO: mark optional
gcbs = blockstore.NewGCBlockstore(fstore, gclocker)
gcbs = &verifbs.VerifBSGC{GCBlockstore: gcbs}
}
bs = gcbs
return
}
// GcBlockstoreCtor wraps GcBlockstore and adds Filestore support
func FilestoreBlockstoreCtor(repo repo.Repo, bb BaseBlocks) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore, fstore *filestore.Filestore) {
gclocker, gcbs, bs = GcBlockstoreCtor(bb)
// hash security
fstore = filestore.NewFilestore(bb, repo.FileManager())
gcbs = blockstore.NewGCBlockstore(fstore, gclocker)
gcbs = &verifbs.VerifBSGC{GCBlockstore: gcbs}
bs = gcbs
return
}

View File

@ -20,27 +20,29 @@ func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc {
}
// NewPinnedProvider returns provider supplying pinned keys
func NewPinnedProvider(pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) KeyChanFunc {
return func(ctx context.Context) (<-chan cid.Cid, error) {
set, err := pinSet(ctx, pinning, dag, onlyRoots)
if err != nil {
return nil, err
}
outCh := make(chan cid.Cid)
go func() {
defer close(outCh)
for c := range set.New {
select {
case <-ctx.Done():
return
case outCh <- c:
}
func NewPinnedProvider(onlyRoots bool) func(pinning pin.Pinner, dag ipld.DAGService) KeyChanFunc {
return func(pinning pin.Pinner, dag ipld.DAGService) KeyChanFunc {
return func(ctx context.Context) (<-chan cid.Cid, error) {
set, err := pinSet(ctx, pinning, dag, onlyRoots)
if err != nil {
return nil, err
}
}()
outCh := make(chan cid.Cid)
go func() {
defer close(outCh)
for c := range set.New {
select {
case <-ctx.Done():
return
case outCh <- c:
}
}
return outCh, nil
}()
return outCh, nil
}
}
}