mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 18:37:45 +08:00
No behaviour changes. Currently we are using ProvideManyRouter for Bitswap, which is only meant to use ContentDiscovery. This makes things more clear in that there is a designated ContentDiscovery instance.
266 lines
10 KiB
Go
266 lines
10 KiB
Go
package node
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/dustin/go-humanize"
|
|
"github.com/ipfs/boxo/bitswap"
|
|
"github.com/ipfs/boxo/bitswap/client"
|
|
"github.com/ipfs/boxo/bitswap/network"
|
|
bsnet "github.com/ipfs/boxo/bitswap/network/bsnet"
|
|
"github.com/ipfs/boxo/bitswap/network/httpnet"
|
|
blockstore "github.com/ipfs/boxo/blockstore"
|
|
exchange "github.com/ipfs/boxo/exchange"
|
|
"github.com/ipfs/boxo/exchange/providing"
|
|
provider "github.com/ipfs/boxo/provider"
|
|
rpqm "github.com/ipfs/boxo/routing/providerquerymanager"
|
|
"github.com/ipfs/go-cid"
|
|
ipld "github.com/ipfs/go-ipld-format"
|
|
version "github.com/ipfs/kubo"
|
|
"github.com/ipfs/kubo/config"
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
peer "github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/libp2p/go-libp2p/core/routing"
|
|
"go.uber.org/fx"
|
|
|
|
blocks "github.com/ipfs/go-block-format"
|
|
"github.com/ipfs/kubo/core/node/helpers"
|
|
)
|
|
|
|
// Docs: https://github.com/ipfs/kubo/blob/master/docs/config.md#internalbitswap
|
|
const (
|
|
DefaultEngineBlockstoreWorkerCount = 128
|
|
DefaultTaskWorkerCount = 8
|
|
DefaultEngineTaskWorkerCount = 8
|
|
DefaultMaxOutstandingBytesPerPeer = 1 << 20
|
|
DefaultProviderSearchDelay = 1000 * time.Millisecond
|
|
DefaultMaxProviders = 10 // matching BitswapClientDefaultMaxProviders from https://github.com/ipfs/boxo/blob/v0.29.1/bitswap/internal/defaults/defaults.go#L15
|
|
DefaultWantHaveReplaceSize = 1024
|
|
)
|
|
|
|
type bitswapOptionsOut struct {
|
|
fx.Out
|
|
|
|
BitswapOpts []bitswap.Option `group:"bitswap-options,flatten"`
|
|
}
|
|
|
|
// BitswapOptions creates configuration options for Bitswap from the config file
|
|
// and whether to provide data.
|
|
func BitswapOptions(cfg *config.Config) interface{} {
|
|
return func() bitswapOptionsOut {
|
|
var internalBsCfg config.InternalBitswap
|
|
if cfg.Internal.Bitswap != nil {
|
|
internalBsCfg = *cfg.Internal.Bitswap
|
|
}
|
|
|
|
opts := []bitswap.Option{
|
|
bitswap.ProviderSearchDelay(internalBsCfg.ProviderSearchDelay.WithDefault(DefaultProviderSearchDelay)), // See https://github.com/ipfs/go-ipfs/issues/8807 for rationale
|
|
bitswap.EngineBlockstoreWorkerCount(int(internalBsCfg.EngineBlockstoreWorkerCount.WithDefault(DefaultEngineBlockstoreWorkerCount))),
|
|
bitswap.TaskWorkerCount(int(internalBsCfg.TaskWorkerCount.WithDefault(DefaultTaskWorkerCount))),
|
|
bitswap.EngineTaskWorkerCount(int(internalBsCfg.EngineTaskWorkerCount.WithDefault(DefaultEngineTaskWorkerCount))),
|
|
bitswap.MaxOutstandingBytesPerPeer(int(internalBsCfg.MaxOutstandingBytesPerPeer.WithDefault(DefaultMaxOutstandingBytesPerPeer))),
|
|
bitswap.WithWantHaveReplaceSize(int(internalBsCfg.WantHaveReplaceSize.WithDefault(DefaultWantHaveReplaceSize))),
|
|
}
|
|
|
|
return bitswapOptionsOut{BitswapOpts: opts}
|
|
}
|
|
}
|
|
|
|
type bitswapIn struct {
|
|
fx.In
|
|
|
|
Mctx helpers.MetricsCtx
|
|
Cfg *config.Config
|
|
Host host.Host
|
|
Discovery routing.ContentDiscovery
|
|
Bs blockstore.GCBlockstore
|
|
BitswapOpts []bitswap.Option `group:"bitswap-options"`
|
|
}
|
|
|
|
// Bitswap creates the BitSwap server/client instance.
|
|
// If Bitswap.ServerEnabled is false, the node will act only as a client
|
|
// using an empty blockstore to prevent serving blocks to other peers.
|
|
func Bitswap(serverEnabled, libp2pEnabled, httpEnabled bool) interface{} {
|
|
return func(in bitswapIn, lc fx.Lifecycle) (*bitswap.Bitswap, error) {
|
|
var bitswapNetworks, bitswapLibp2p network.BitSwapNetwork
|
|
var bitswapBlockstore blockstore.Blockstore = in.Bs
|
|
|
|
libp2pEnabled := in.Cfg.Bitswap.Libp2pEnabled.WithDefault(config.DefaultBitswapLibp2pEnabled)
|
|
if libp2pEnabled {
|
|
bitswapLibp2p = bsnet.NewFromIpfsHost(in.Host)
|
|
}
|
|
|
|
if httpEnabled {
|
|
httpCfg := in.Cfg.HTTPRetrieval
|
|
maxBlockSize, err := humanize.ParseBytes(httpCfg.MaxBlockSize.WithDefault(config.DefaultHTTPRetrievalMaxBlockSize))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
logger.Infof("HTTP Retrieval enabled: Allowlist: %t. Denylist: %t",
|
|
httpCfg.Allowlist != nil,
|
|
httpCfg.Denylist != nil,
|
|
)
|
|
|
|
bitswapHTTP := httpnet.New(in.Host,
|
|
httpnet.WithHTTPWorkers(int(httpCfg.NumWorkers.WithDefault(config.DefaultHTTPRetrievalNumWorkers))),
|
|
httpnet.WithAllowlist(httpCfg.Allowlist),
|
|
httpnet.WithDenylist(httpCfg.Denylist),
|
|
httpnet.WithInsecureSkipVerify(httpCfg.TLSInsecureSkipVerify.WithDefault(config.DefaultHTTPRetrievalTLSInsecureSkipVerify)),
|
|
httpnet.WithMaxBlockSize(int64(maxBlockSize)),
|
|
httpnet.WithUserAgent(version.GetUserAgentVersion()),
|
|
httpnet.WithMetricsLabelsForEndpoints(httpCfg.Allowlist),
|
|
)
|
|
bitswapNetworks = network.New(in.Host.Peerstore(), bitswapLibp2p, bitswapHTTP)
|
|
} else if libp2pEnabled {
|
|
bitswapNetworks = bitswapLibp2p
|
|
} else {
|
|
return nil, errors.New("invalid configuration: Bitswap.Libp2pEnabled and HTTPRetrieval.Enabled are both disabled, unable to initialize Bitswap")
|
|
}
|
|
|
|
// Kubo uses own, customized ProviderQueryManager
|
|
in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.WithDefaultProviderQueryManager(false)))
|
|
var maxProviders int = DefaultMaxProviders
|
|
|
|
var bcDisposition string
|
|
if in.Cfg.Internal.Bitswap != nil {
|
|
maxProviders = int(in.Cfg.Internal.Bitswap.ProviderSearchMaxResults.WithDefault(DefaultMaxProviders))
|
|
if in.Cfg.Internal.Bitswap.BroadcastControl != nil {
|
|
bcCfg := in.Cfg.Internal.Bitswap.BroadcastControl
|
|
bcEnable := bcCfg.Enable.WithDefault(config.DefaultBroadcastControlEnable)
|
|
in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlEnable(bcEnable)))
|
|
if bcEnable {
|
|
bcDisposition = "enabled"
|
|
bcMaxPeers := int(bcCfg.MaxPeers.WithDefault(config.DefaultBroadcastControlMaxPeers))
|
|
in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlMaxPeers(bcMaxPeers)))
|
|
|
|
bcLocalPeers := bcCfg.LocalPeers.WithDefault(config.DefaultBroadcastControlLocalPeers)
|
|
in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlLocalPeers(bcLocalPeers)))
|
|
|
|
bcPeeredPeers := bcCfg.PeeredPeers.WithDefault(config.DefaultBroadcastControlPeeredPeers)
|
|
in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlPeeredPeers(bcPeeredPeers)))
|
|
|
|
bcMaxRandomPeers := int(bcCfg.MaxRandomPeers.WithDefault(config.DefaultBroadcastControlMaxRandomPeers))
|
|
in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlMaxRandomPeers(bcMaxRandomPeers)))
|
|
|
|
bcSendToPendingPeers := bcCfg.SendToPendingPeers.WithDefault(config.DefaultBroadcastControlSendToPendingPeers)
|
|
in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlSendToPendingPeers(bcSendToPendingPeers)))
|
|
} else {
|
|
bcDisposition = "disabled"
|
|
}
|
|
}
|
|
}
|
|
|
|
// If broadcast control is not configured, then configure with defaults.
|
|
if bcDisposition == "" {
|
|
in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlEnable(config.DefaultBroadcastControlEnable)))
|
|
if config.DefaultBroadcastControlEnable {
|
|
bcDisposition = "enabled"
|
|
in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlMaxPeers(config.DefaultBroadcastControlMaxPeers)))
|
|
in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlLocalPeers(config.DefaultBroadcastControlLocalPeers)))
|
|
in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlPeeredPeers(config.DefaultBroadcastControlPeeredPeers)))
|
|
in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlMaxRandomPeers(config.DefaultBroadcastControlMaxRandomPeers)))
|
|
in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlSendToPendingPeers(config.DefaultBroadcastControlSendToPendingPeers)))
|
|
} else {
|
|
bcDisposition = "enabled"
|
|
}
|
|
}
|
|
logger.Infof("bitswap client broadcast control %s", bcDisposition)
|
|
|
|
ignoredPeerIDs := make([]peer.ID, 0, len(in.Cfg.Routing.IgnoreProviders))
|
|
for _, str := range in.Cfg.Routing.IgnoreProviders {
|
|
pid, err := peer.Decode(str)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ignoredPeerIDs = append(ignoredPeerIDs, pid)
|
|
}
|
|
providerQueryMgr, err := rpqm.New(bitswapNetworks,
|
|
in.Discovery,
|
|
rpqm.WithMaxProviders(maxProviders),
|
|
rpqm.WithIgnoreProviders(ignoredPeerIDs...),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Explicitly enable/disable server
|
|
in.BitswapOpts = append(in.BitswapOpts, bitswap.WithServerEnabled(serverEnabled))
|
|
|
|
bs := bitswap.New(helpers.LifecycleCtx(in.Mctx, lc), bitswapNetworks, providerQueryMgr, bitswapBlockstore, in.BitswapOpts...)
|
|
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(ctx context.Context) error {
|
|
return bs.Close()
|
|
},
|
|
})
|
|
return bs, nil
|
|
}
|
|
}
|
|
|
|
// OnlineExchange creates new LibP2P backed block exchange.
|
|
// Returns a no-op exchange if Bitswap is disabled.
|
|
func OnlineExchange(isBitswapActive bool) interface{} {
|
|
return func(in *bitswap.Bitswap, lc fx.Lifecycle) exchange.Interface {
|
|
if !isBitswapActive {
|
|
return &noopExchange{closer: in}
|
|
}
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(ctx context.Context) error {
|
|
return in.Close()
|
|
},
|
|
})
|
|
return in
|
|
}
|
|
}
|
|
|
|
type providingExchangeIn struct {
|
|
fx.In
|
|
|
|
BaseExch exchange.Interface
|
|
Provider provider.System
|
|
}
|
|
|
|
// ProvidingExchange creates a providing.Exchange with the existing exchange
|
|
// and the provider.System.
|
|
// We cannot do this in OnlineExchange because it causes cycles so this is for
|
|
// a decorator.
|
|
func ProvidingExchange(provide bool) interface{} {
|
|
return func(in providingExchangeIn, lc fx.Lifecycle) exchange.Interface {
|
|
exch := in.BaseExch
|
|
if provide {
|
|
exch = providing.New(in.BaseExch, in.Provider)
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(ctx context.Context) error {
|
|
return exch.Close()
|
|
},
|
|
})
|
|
}
|
|
return exch
|
|
}
|
|
}
|
|
|
|
type noopExchange struct {
|
|
closer io.Closer
|
|
}
|
|
|
|
func (e *noopExchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
|
|
return nil, ipld.ErrNotFound{Cid: c}
|
|
}
|
|
|
|
func (e *noopExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) {
|
|
ch := make(chan blocks.Block)
|
|
close(ch)
|
|
return ch, nil
|
|
}
|
|
|
|
func (e *noopExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
|
|
return nil
|
|
}
|
|
|
|
func (e *noopExchange) Close() error {
|
|
return e.closer.Close()
|
|
}
|