kubo/core/node/groups.go
Marcin Rataj 541bd657a6 feat(dns): add DNS.OverrideSystem config to apply resolvers globally
adds DNS.OverrideSystem flag (enabled by default) that extends
DNS.Resolvers to affect all DNS lookups in the daemon process,
not just DNSLink and Multiaddr resolution.

this ensures AutoTLS ACME DNS-01 challenge verification, HTTP retrieval,
and third-party library code all respect DNS.Resolvers config.

implementation creates a net.Resolver bridge that intercepts DNS wire
protocol queries, parses them with miekg/dns, calls madns.Resolver,
and returns properly formatted DNS responses.

note: this is an exploration of daemon-wide DNS configuration without
refactoring boxo/gateway. a cleaner future approach may create native
net.Resolver from config first, then convert to madns only for go-libp2p.
2026-01-12 21:26:29 +01:00

465 lines
16 KiB
Go

package node
import (
"context"
"errors"
"fmt"
"regexp"
"strings"
"time"
blockstore "github.com/ipfs/boxo/blockstore"
offline "github.com/ipfs/boxo/exchange/offline"
uio "github.com/ipfs/boxo/ipld/unixfs/io"
util "github.com/ipfs/boxo/util"
"github.com/ipfs/go-log/v2"
"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/core/node/libp2p"
"github.com/ipfs/kubo/p2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p-pubsub/timecache"
"github.com/libp2p/go-libp2p/core/peer"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"go.uber.org/fx"
)
var logger = log.Logger("core:constructor")
var BaseLibP2P = fx.Options(
fx.Provide(libp2p.PNet),
fx.Provide(libp2p.ConnectionManager),
fx.Provide(libp2p.Host),
fx.Provide(libp2p.MultiaddrResolver),
fx.Provide(libp2p.DiscoveryHandler),
fx.Invoke(libp2p.PNetChecker),
)
func LibP2P(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.PartialLimitConfig) fx.Option {
var connmgr fx.Option
// set connmgr based on Swarm.ConnMgr.Type
connMgrType := cfg.Swarm.ConnMgr.Type.WithDefault(config.DefaultConnMgrType)
switch connMgrType {
case "none":
connmgr = fx.Options() // noop
case "", "basic":
grace := cfg.Swarm.ConnMgr.GracePeriod.WithDefault(config.DefaultConnMgrGracePeriod)
low := int(cfg.Swarm.ConnMgr.LowWater.WithDefault(config.DefaultConnMgrLowWater))
high := int(cfg.Swarm.ConnMgr.HighWater.WithDefault(config.DefaultConnMgrHighWater))
silence := cfg.Swarm.ConnMgr.SilencePeriod.WithDefault(config.DefaultConnMgrSilencePeriod)
connmgr = fx.Provide(libp2p.ConnectionManager(low, high, grace, silence))
default:
return fx.Error(fmt.Errorf("unrecognized Swarm.ConnMgr.Type: %q", connMgrType))
}
// parse PubSub config
ps, disc := fx.Options(), fx.Options()
if bcfg.getOpt("pubsub") || bcfg.getOpt("ipnsps") {
disc = fx.Provide(libp2p.TopicDiscovery())
var pubsubOptions []pubsub.Option
pubsubOptions = append(
pubsubOptions,
pubsub.WithMessageSigning(!cfg.Pubsub.DisableSigning),
pubsub.WithSeenMessagesTTL(cfg.Pubsub.SeenMessagesTTL.WithDefault(pubsub.TimeCacheDuration)),
)
var seenMessagesStrategy timecache.Strategy
configSeenMessagesStrategy := cfg.Pubsub.SeenMessagesStrategy.WithDefault(config.DefaultSeenMessagesStrategy)
switch configSeenMessagesStrategy {
case config.LastSeenMessagesStrategy:
seenMessagesStrategy = timecache.Strategy_LastSeen
case config.FirstSeenMessagesStrategy:
seenMessagesStrategy = timecache.Strategy_FirstSeen
default:
return fx.Error(fmt.Errorf("unsupported Pubsub.SeenMessagesStrategy %q", configSeenMessagesStrategy))
}
pubsubOptions = append(pubsubOptions, pubsub.WithSeenMessagesStrategy(seenMessagesStrategy))
switch cfg.Pubsub.Router {
case "":
fallthrough
case "gossipsub":
ps = fx.Provide(libp2p.GossipSub(pubsubOptions...))
case "floodsub":
ps = fx.Provide(libp2p.FloodSub(pubsubOptions...))
default:
return fx.Error(fmt.Errorf("unknown pubsub router %s", cfg.Pubsub.Router))
}
}
autonat := fx.Options()
switch cfg.AutoNAT.ServiceMode {
default:
panic("BUG: unhandled autonat service mode")
case config.AutoNATServiceDisabled:
case config.AutoNATServiceUnset:
// TODO
//
// We're enabling the AutoNAT service by default on _all_ nodes
// for the moment.
//
// We should consider disabling it by default if the dht is set
// to dhtclient.
fallthrough
case config.AutoNATServiceEnabled:
autonat = fx.Provide(libp2p.AutoNATService(cfg.AutoNAT.Throttle, false))
case config.AutoNATServiceEnabledV1Only:
autonat = fx.Provide(libp2p.AutoNATService(cfg.AutoNAT.Throttle, true))
}
enableTCPTransport := cfg.Swarm.Transports.Network.TCP.WithDefault(true)
enableWebsocketTransport := cfg.Swarm.Transports.Network.Websocket.WithDefault(true)
enableRelayTransport := cfg.Swarm.Transports.Network.Relay.WithDefault(true) // nolint
enableRelayService := cfg.Swarm.RelayService.Enabled.WithDefault(enableRelayTransport)
enableRelayClient := cfg.Swarm.RelayClient.Enabled.WithDefault(enableRelayTransport)
enableAutoTLS := cfg.AutoTLS.Enabled.WithDefault(config.DefaultAutoTLSEnabled)
enableAutoWSS := cfg.AutoTLS.AutoWSS.WithDefault(config.DefaultAutoWSS)
atlsLog := log.Logger("autotls")
// Log error when relay subsystem could not be initialized due to missing dependency
if !enableRelayTransport {
if enableRelayService {
logger.Fatal("Failed to enable `Swarm.RelayService`, it requires `Swarm.Transports.Network.Relay` to be true.")
}
if enableRelayClient {
logger.Fatal("Failed to enable `Swarm.RelayClient`, it requires `Swarm.Transports.Network.Relay` to be true.")
}
}
switch {
case enableAutoTLS && enableTCPTransport && enableWebsocketTransport:
// AutoTLS for Secure WebSockets: ensure WSS listeners are in place (manual or automatic)
wssWildcard := fmt.Sprintf("/tls/sni/*.%s/ws", cfg.AutoTLS.DomainSuffix.WithDefault(config.DefaultDomainSuffix))
wssWildcardPresent := false
customWsPresent := false
customWsRegex := regexp.MustCompile(`/wss?$`)
tcpRegex := regexp.MustCompile(`/tcp/\d+$`)
// inspect listeners defined in config at Addresses.Swarm
var tcpListeners []string
for _, listener := range cfg.Addresses.Swarm {
// detect if user manually added /tls/sni/.../ws listener matching AutoTLS.DomainSuffix
if strings.Contains(listener, wssWildcard) {
atlsLog.Infof("found compatible wildcard listener in Addresses.Swarm. AutoTLS will be used on %s", listener)
wssWildcardPresent = true
break
}
// detect if user manually added own /ws or /wss listener that is
// not related to AutoTLS feature
if customWsRegex.MatchString(listener) {
atlsLog.Infof("found custom /ws listener set by user in Addresses.Swarm. AutoTLS will not be used on %s.", listener)
customWsPresent = true
break
}
// else, remember /tcp listeners that can be reused for /tls/sni/../ws
if tcpRegex.MatchString(listener) {
tcpListeners = append(tcpListeners, listener)
}
}
// Append AutoTLS's wildcard listener
// if no manual /ws listener was set by the user
if enableAutoWSS && !wssWildcardPresent && !customWsPresent {
if len(tcpListeners) == 0 {
logger.Error("Invalid configuration, AutoTLS will be disabled: AutoTLS.AutoWSS=true requires at least one /tcp listener present in Addresses.Swarm, see https://github.com/ipfs/kubo/blob/master/docs/config.md#autotls")
enableAutoTLS = false
}
for _, tcpListener := range tcpListeners {
wssListener := tcpListener + wssWildcard
cfg.Addresses.Swarm = append(cfg.Addresses.Swarm, wssListener)
atlsLog.Infof("appended AutoWSS listener: %s", wssListener)
}
}
if !wssWildcardPresent && !enableAutoWSS {
logger.Error(fmt.Sprintf("Invalid configuration, AutoTLS will be disabled: AutoTLS.Enabled=true requires a /tcp listener ending with %q to be present in Addresses.Swarm or AutoTLS.AutoWSS=true, see https://github.com/ipfs/kubo/blob/master/docs/config.md#autotls", wssWildcard))
enableAutoTLS = false
}
case enableAutoTLS && !enableTCPTransport:
logger.Error("Invalid configuration: AutoTLS.Enabled=true requires Swarm.Transports.Network.TCP to be true as well. AutoTLS will be disabled.")
enableAutoTLS = false
case enableAutoTLS && !enableWebsocketTransport:
logger.Error("Invalid configuration: AutoTLS.Enabled=true requires Swarm.Transports.Network.Websocket to be true as well. AutoTLS will be disabled.")
enableAutoTLS = false
}
// Gather all the options
opts := fx.Options(
BaseLibP2P,
// identify's AgentVersion (incl. optional agent-version-suffix)
fx.Provide(libp2p.UserAgent()),
// Services (resource management)
fx.Provide(libp2p.ResourceManager(bcfg.Repo.Path(), cfg.Swarm, userResourceOverrides)),
maybeProvide(libp2p.P2PForgeCertMgr(bcfg.Repo.Path(), cfg.AutoTLS, atlsLog), enableAutoTLS),
maybeInvoke(libp2p.StartP2PAutoTLS, enableAutoTLS),
fx.Provide(libp2p.AddrFilters(cfg.Swarm.AddrFilters)),
fx.Provide(libp2p.AddrsFactory(cfg.Addresses.Announce, cfg.Addresses.AppendAnnounce, cfg.Addresses.NoAnnounce)),
fx.Provide(libp2p.SmuxTransport(cfg.Swarm.Transports)),
fx.Provide(libp2p.RelayTransport(enableRelayTransport)),
fx.Provide(libp2p.RelayService(enableRelayService, cfg.Swarm.RelayService)),
fx.Provide(libp2p.Transports(cfg.Swarm.Transports)),
fx.Provide(libp2p.ListenOn(cfg.Addresses.Swarm)),
fx.Invoke(libp2p.SetupDiscovery(cfg.Discovery.MDNS.Enabled)),
fx.Provide(libp2p.ForceReachability(cfg.Internal.Libp2pForceReachability)),
fx.Provide(libp2p.HolePunching(cfg.Swarm.EnableHolePunching, enableRelayClient)),
fx.Provide(libp2p.Security(!bcfg.DisableEncryptedConnections, cfg.Swarm.Transports)),
fx.Provide(libp2p.Routing),
fx.Provide(libp2p.ContentRouting),
fx.Provide(libp2p.ContentDiscovery),
fx.Provide(libp2p.BaseRouting(cfg)),
maybeProvide(libp2p.PubsubRouter, bcfg.getOpt("ipnsps")),
maybeProvide(libp2p.BandwidthCounter, !cfg.Swarm.DisableBandwidthMetrics),
maybeProvide(libp2p.NatPortMap, !cfg.Swarm.DisableNatPortMap),
libp2p.MaybeAutoRelay(cfg.Swarm.RelayClient.StaticRelays, cfg.Peering, enableRelayClient),
autonat,
connmgr,
ps,
disc,
)
return opts
}
// Storage groups units which setup datastore based persistence and blockstore layers
func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option {
cacheOpts := blockstore.DefaultCacheOpts()
cacheOpts.HasBloomFilterSize = cfg.Datastore.BloomFilterSize
cacheOpts.HasTwoQueueCacheSize = int(cfg.Datastore.BlockKeyCacheSize.WithDefault(config.DefaultBlockKeyCacheSize))
if !bcfg.Permanent {
cacheOpts.HasBloomFilterSize = 0
}
finalBstore := fx.Provide(GcBlockstoreCtor)
if cfg.Experimental.FilestoreEnabled || cfg.Experimental.UrlstoreEnabled {
finalBstore = fx.Provide(FilestoreBlockstoreCtor)
}
return fx.Options(
fx.Provide(RepoConfig),
fx.Provide(Datastore),
fx.Provide(BaseBlockstoreCtor(
cacheOpts,
cfg.Datastore.HashOnRead,
cfg.Datastore.WriteThrough.WithDefault(config.DefaultWriteThrough),
cfg.Provide.Strategy.WithDefault(config.DefaultProvideStrategy),
)),
finalBstore,
)
}
// Identity groups units providing cryptographic identity
func Identity(cfg *config.Config) fx.Option {
// PeerID
cid := cfg.Identity.PeerID
if cid == "" {
return fx.Error(errors.New("identity was not set in config (was 'ipfs init' run?)"))
}
if len(cid) == 0 {
return fx.Error(errors.New("no peer ID in config! (was 'ipfs init' run?)"))
}
id, err := peer.Decode(cid)
if err != nil {
return fx.Error(fmt.Errorf("peer ID invalid: %s", err))
}
// Private Key
if cfg.Identity.PrivKey == "" {
return fx.Options( // No PK (usually in tests)
fx.Provide(PeerID(id)),
fx.Provide(libp2p.Peerstore),
)
}
sk, err := cfg.Identity.DecodePrivateKey("passphrase todo!")
if err != nil {
return fx.Error(err)
}
return fx.Options( // Full identity
fx.Provide(PeerID(id)),
fx.Provide(PrivateKey(sk)),
fx.Provide(libp2p.Peerstore),
fx.Invoke(libp2p.PstoreAddSelfKeys),
)
}
// IPNS groups namesys related units
var IPNS = fx.Options(
fx.Provide(RecordValidator),
)
// Online groups online-only units
func Online(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.PartialLimitConfig) fx.Option {
// Namesys params
ipnsCacheSize := cfg.Ipns.ResolveCacheSize
if ipnsCacheSize == 0 {
ipnsCacheSize = DefaultIpnsCacheSize
}
if ipnsCacheSize < 0 {
return fx.Error(errors.New("cannot specify negative resolve cache size"))
}
// Republisher params
var repubPeriod, recordLifetime time.Duration
if cfg.Ipns.RepublishPeriod != "" {
d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod)
if err != nil {
return fx.Error(fmt.Errorf("failure to parse config setting IPNS.RepublishPeriod: %s", err))
}
if !util.Debug && (d < time.Minute || d > (time.Hour*24)) {
return fx.Error(fmt.Errorf("config setting IPNS.RepublishPeriod is not between 1min and 1day: %s", d))
}
repubPeriod = d
}
if cfg.Ipns.RecordLifetime != "" {
d, err := time.ParseDuration(cfg.Ipns.RecordLifetime)
if err != nil {
return fx.Error(fmt.Errorf("failure to parse config setting IPNS.RecordLifetime: %s", err))
}
recordLifetime = d
}
isBitswapLibp2pEnabled := cfg.Bitswap.Libp2pEnabled.WithDefault(config.DefaultBitswapLibp2pEnabled)
isBitswapServerEnabled := cfg.Bitswap.ServerEnabled.WithDefault(config.DefaultBitswapServerEnabled)
isHTTPRetrievalEnabled := cfg.HTTPRetrieval.Enabled.WithDefault(config.DefaultHTTPRetrievalEnabled)
// The Provide system handles both new CID announcements and periodic re-announcements.
// Disabling is controlled by Provide.Enabled=false or setting Interval to 0.
isProviderEnabled := cfg.Provide.Enabled.WithDefault(config.DefaultProvideEnabled) && cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval) != 0
return fx.Options(
fx.Provide(BitswapOptions(cfg)),
fx.Provide(Bitswap(isBitswapServerEnabled, isBitswapLibp2pEnabled, isHTTPRetrievalEnabled)),
fx.Provide(OnlineExchange(isBitswapLibp2pEnabled)),
fx.Provide(DNSResolver),
maybeOverrideDefaultResolver(cfg.DNS.OverrideSystem.WithDefault(true)),
fx.Provide(Namesys(ipnsCacheSize, cfg.Ipns.MaxCacheTTL.WithDefault(config.DefaultIpnsMaxCacheTTL))),
fx.Provide(Peering),
PeerWith(cfg.Peering.Peers...),
fx.Invoke(IpnsRepublisher(repubPeriod, recordLifetime)),
fx.Provide(p2p.New),
LibP2P(bcfg, cfg, userResourceOverrides),
OnlineProviders(isProviderEnabled, cfg),
)
}
// Offline groups offline alternatives to Online units
func Offline(cfg *config.Config) fx.Option {
return fx.Options(
fx.Provide(offline.Exchange),
fx.Provide(DNSResolver),
maybeOverrideDefaultResolver(cfg.DNS.OverrideSystem.WithDefault(true)),
fx.Provide(Namesys(0, 0)),
fx.Provide(libp2p.Routing),
fx.Provide(libp2p.ContentRouting),
fx.Provide(libp2p.OfflineRouting),
fx.Provide(libp2p.ContentDiscovery),
OfflineProviders(),
)
}
// Core groups basic IPFS services
var Core = fx.Options(
fx.Provide(Dag),
fx.Provide(FetcherConfig),
fx.Provide(PathResolverConfig),
)
func Networked(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.PartialLimitConfig) fx.Option {
if bcfg.Online {
return Online(bcfg, cfg, userResourceOverrides)
}
return Offline(cfg)
}
// IPFS builds a group of fx Options based on the passed BuildCfg
func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option {
if bcfg == nil {
bcfg = new(BuildCfg)
}
bcfgOpts, cfg := bcfg.options(ctx)
if cfg == nil {
return bcfgOpts // error
}
userResourceOverrides, err := bcfg.Repo.UserResourceOverrides()
if err != nil {
return fx.Error(err)
}
// Migrate users of deprecated Experimental.ShardingEnabled flag
if cfg.Experimental.ShardingEnabled {
logger.Fatal("The `Experimental.ShardingEnabled` field is no longer used, please remove it from the config. Use Import.UnixFSHAMTDirectorySizeThreshold instead.")
}
if !cfg.Internal.UnixFSShardingSizeThreshold.IsDefault() {
msg := "The `Internal.UnixFSShardingSizeThreshold` field was renamed to `Import.UnixFSHAMTDirectorySizeThreshold`. Please update your config.\n"
if !cfg.Import.UnixFSHAMTDirectorySizeThreshold.IsDefault() {
logger.Fatal(msg) // conflicting values, hard fail
}
logger.Error(msg)
// Migrate the old OptionalString value to the new OptionalBytes field.
// Since OptionalBytes embeds OptionalString, we can construct it directly
// with the old value, preserving the user's original string (e.g., "256KiB").
cfg.Import.UnixFSHAMTDirectorySizeThreshold = config.OptionalBytes{OptionalString: *cfg.Internal.UnixFSShardingSizeThreshold}
}
// Validate Import configuration
if err := config.ValidateImportConfig(&cfg.Import); err != nil {
return fx.Error(err)
}
// Validate Provide configuration
if err := config.ValidateProvideConfig(&cfg.Provide); err != nil {
return fx.Error(err)
}
// Auto-sharding settings
shardSingThresholdInt := cfg.Import.UnixFSHAMTDirectorySizeThreshold.WithDefault(config.DefaultUnixFSHAMTDirectorySizeThreshold)
shardMaxFanout := cfg.Import.UnixFSHAMTDirectoryMaxFanout.WithDefault(config.DefaultUnixFSHAMTDirectoryMaxFanout)
// TODO: avoid overriding this globally, see if we can extend Directory interface like Get/SetMaxLinks from https://github.com/ipfs/boxo/pull/906
uio.HAMTShardingSize = int(shardSingThresholdInt)
uio.DefaultShardWidth = int(shardMaxFanout)
providerStrategy := cfg.Provide.Strategy.WithDefault(config.DefaultProvideStrategy)
return fx.Options(
bcfgOpts,
Storage(bcfg, cfg),
Identity(cfg),
IPNS,
Networked(bcfg, cfg, userResourceOverrides),
fx.Provide(BlockService(cfg)),
fx.Provide(Pinning(providerStrategy)),
fx.Provide(Files(providerStrategy)),
Core,
)
}