kubo/core/node/libp2p/routing.go
Marcin Rataj 6a008fc74c
refactor: apply go fix modernizers from Go 1.26 (#11190)
* chore: apply go fix modernizers from Go 1.26

automated refactoring: interface{} to any, slices.Contains,
and other idiomatic updates.

* feat(ci): add `go fix` check to Go analysis workflow

ensures Go 1.26 modernizers are applied, fails CI if `go fix ./...`
produces any changes (similar to existing `go fmt` enforcement)
2026-02-11 01:01:32 +01:00

347 lines
8.4 KiB
Go

package libp2p
import (
"context"
"fmt"
"runtime/debug"
"sort"
"time"
"github.com/cenkalti/backoff/v4"
offroute "github.com/ipfs/boxo/routing/offline"
ds "github.com/ipfs/go-datastore"
dht "github.com/libp2p/go-libp2p-kad-dht"
ddht "github.com/libp2p/go-libp2p-kad-dht/dual"
"github.com/libp2p/go-libp2p-kad-dht/fullrt"
pubsub "github.com/libp2p/go-libp2p-pubsub"
namesys "github.com/libp2p/go-libp2p-pubsub-router"
record "github.com/libp2p/go-libp2p-record"
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
"go.uber.org/fx"
config "github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/core/node/helpers"
"github.com/ipfs/kubo/repo"
irouting "github.com/ipfs/kubo/routing"
)
type Router struct {
routing.Routing
Priority int // less = more important
}
type p2pRouterOut struct {
fx.Out
Router Router `group:"routers"`
}
type processInitialRoutingIn struct {
fx.In
Router routing.Routing `name:"initialrouting"`
// For setting up experimental DHT client
Host host.Host
Repo repo.Repo
Validator record.Validator
}
type processInitialRoutingOut struct {
fx.Out
Router Router `group:"routers"`
ContentRouter routing.ContentRouting `group:"content-routers"`
DHT *ddht.DHT
DHTClient routing.Routing `name:"dhtc"`
}
type AddrInfoChan chan peer.AddrInfo
func BaseRouting(cfg *config.Config) any {
return func(lc fx.Lifecycle, in processInitialRoutingIn) (out processInitialRoutingOut, err error) {
var dualDHT *ddht.DHT
if dht, ok := in.Router.(*ddht.DHT); ok {
dualDHT = dht
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return dualDHT.Close()
},
})
}
if cr, ok := in.Router.(routinghelpers.ComposableRouter); ok {
for _, r := range cr.Routers() {
if dht, ok := r.(*ddht.DHT); ok {
dualDHT = dht
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return dualDHT.Close()
},
})
break
}
}
}
if dualDHT != nil && cfg.Routing.AcceleratedDHTClient.WithDefault(config.DefaultAcceleratedDHTClient) {
cfg, err := in.Repo.Config()
if err != nil {
return out, err
}
// Use auto-config resolution for actual connectivity
bspeers, err := cfg.BootstrapPeersWithAutoConf()
if err != nil {
return out, err
}
fullRTClient, err := fullrt.NewFullRT(in.Host,
dht.DefaultPrefix,
fullrt.DHTOption(
dht.Validator(in.Validator),
dht.Datastore(in.Repo.Datastore()),
dht.BootstrapPeers(bspeers...),
dht.BucketSize(20),
),
)
if err != nil {
return out, err
}
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return fullRTClient.Close()
},
})
// we want to also use the default HTTP routers, so wrap the FullRT client
// in a parallel router that calls them in parallel
httpRouters, err := constructDefaultHTTPRouters(cfg)
if err != nil {
return out, err
}
routers := []*routinghelpers.ParallelRouter{
{Router: fullRTClient, DoNotWaitForSearchValue: true},
}
routers = append(routers, httpRouters...)
router := routinghelpers.NewComposableParallel(routers)
return processInitialRoutingOut{
Router: Router{
Priority: 1000,
Routing: router,
},
DHT: dualDHT,
DHTClient: fullRTClient,
ContentRouter: fullRTClient,
}, nil
}
return processInitialRoutingOut{
Router: Router{
Priority: 1000,
Routing: in.Router,
},
DHT: dualDHT,
DHTClient: dualDHT,
ContentRouter: in.Router,
}, nil
}
}
type p2pOnlineContentRoutingIn struct {
fx.In
ContentRouter []routing.ContentRouting `group:"content-routers"`
}
// ContentRouting will get all routers that can do contentRouting and add them
// all together using a TieredRouter. It will be used for topic discovery.
func ContentRouting(in p2pOnlineContentRoutingIn) routing.ContentRouting {
var routers []routing.Routing
for _, cr := range in.ContentRouter {
routers = append(routers,
&routinghelpers.Compose{
ContentRouting: cr,
},
)
}
return routinghelpers.Tiered{
Routers: routers,
}
}
// ContentDiscovery narrows down the given content routing facility so that it
// only does discovery.
func ContentDiscovery(in irouting.ProvideManyRouter) routing.ContentDiscovery {
return in
}
type p2pOnlineRoutingIn struct {
fx.In
Routers []Router `group:"routers"`
Validator record.Validator
}
// Routing will get all routers obtained from different methods (delegated
// routers, pub-sub, and so on) and add them all together using a ParallelRouter.
func Routing(in p2pOnlineRoutingIn) irouting.ProvideManyRouter {
routers := in.Routers
sort.SliceStable(routers, func(i, j int) bool {
return routers[i].Priority < routers[j].Priority
})
var cRouters []*routinghelpers.ParallelRouter
for _, v := range routers {
cRouters = append(cRouters, &routinghelpers.ParallelRouter{
IgnoreError: true,
DoNotWaitForSearchValue: true,
Router: v.Routing,
})
}
return routinghelpers.NewComposableParallel(cRouters)
}
// OfflineRouting provides a special Router to the routers list when we are
// creating an offline node.
func OfflineRouting(dstore ds.Datastore, validator record.Validator) p2pRouterOut {
return p2pRouterOut{
Router: Router{
Routing: offroute.NewOfflineRouter(dstore, validator),
Priority: 10000,
},
}
}
type p2pPSRoutingIn struct {
fx.In
Validator record.Validator
Host host.Host
PubSub *pubsub.PubSub `optional:"true"`
}
func PubsubRouter(mctx helpers.MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) (p2pRouterOut, *namesys.PubsubValueStore, error) {
psRouter, err := namesys.NewPubsubValueStore(
helpers.LifecycleCtx(mctx, lc),
in.Host,
in.PubSub,
in.Validator,
namesys.WithRebroadcastInterval(time.Minute),
)
if err != nil {
return p2pRouterOut{}, nil, err
}
return p2pRouterOut{
Router: Router{
Routing: &routinghelpers.Compose{
ValueStore: &routinghelpers.LimitedValueStore{
ValueStore: psRouter,
Namespaces: []string{"ipns"},
},
},
Priority: 100,
},
}, psRouter, nil
}
func autoRelayFeeder(cfgPeering config.Peering, peerChan chan<- peer.AddrInfo) fx.Option {
return fx.Invoke(func(lc fx.Lifecycle, h host.Host, dht *ddht.DHT) {
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovering from unexpected error in AutoRelayFeeder:", r)
debug.PrintStack()
}
}()
go func() {
defer close(done)
// Feed peers more often right after the bootstrap, then backoff
bo := backoff.NewExponentialBackOff()
bo.InitialInterval = 15 * time.Second
bo.Multiplier = 3
bo.MaxInterval = 1 * time.Hour
bo.MaxElapsedTime = 0 // never stop
t := backoff.NewTicker(bo)
defer t.Stop()
for {
select {
case <-t.C:
case <-ctx.Done():
return
}
// Always feed trusted IDs (Peering.Peers in the config)
for _, trustedPeer := range cfgPeering.Peers {
if len(trustedPeer.Addrs) == 0 {
continue
}
select {
case peerChan <- trustedPeer:
case <-ctx.Done():
return
}
}
// Additionally, feed closest peers discovered via DHT
if dht != nil {
closestPeers, err := dht.WAN.GetClosestPeers(ctx, h.ID().String())
if err == nil {
for _, p := range closestPeers {
addrs := h.Peerstore().Addrs(p)
if len(addrs) == 0 {
continue
}
dhtPeer := peer.AddrInfo{ID: p, Addrs: addrs}
select {
case peerChan <- dhtPeer:
case <-ctx.Done():
return
}
}
}
}
// Additionally, feed all connected swarm peers as potential relay candidates.
// This includes peers from HTTP routing, manual swarm connect, mDNS discovery, etc.
// (fixes https://github.com/ipfs/kubo/issues/10899)
connectedPeers := h.Network().Peers()
for _, p := range connectedPeers {
addrs := h.Peerstore().Addrs(p)
if len(addrs) == 0 {
continue
}
swarmPeer := peer.AddrInfo{ID: p, Addrs: addrs}
select {
case peerChan <- swarmPeer:
case <-ctx.Done():
return
}
}
}
}()
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
cancel()
<-done
return nil
},
})
})
}