mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 18:37:45 +08:00
* chore: apply go fix modernizers from Go 1.26
automated refactoring: interface{} to any, slices.Contains,
and other idiomatic updates.
* feat(ci): add `go fix` check to Go analysis workflow
ensures Go 1.26 modernizers are applied, fails CI if `go fix ./...`
produces any changes (similar to existing `go fmt` enforcement)
347 lines
8.4 KiB
Go
347 lines
8.4 KiB
Go
package libp2p
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"runtime/debug"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/cenkalti/backoff/v4"
|
|
offroute "github.com/ipfs/boxo/routing/offline"
|
|
ds "github.com/ipfs/go-datastore"
|
|
dht "github.com/libp2p/go-libp2p-kad-dht"
|
|
ddht "github.com/libp2p/go-libp2p-kad-dht/dual"
|
|
"github.com/libp2p/go-libp2p-kad-dht/fullrt"
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
namesys "github.com/libp2p/go-libp2p-pubsub-router"
|
|
record "github.com/libp2p/go-libp2p-record"
|
|
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/libp2p/go-libp2p/core/routing"
|
|
"go.uber.org/fx"
|
|
|
|
config "github.com/ipfs/kubo/config"
|
|
"github.com/ipfs/kubo/core/node/helpers"
|
|
"github.com/ipfs/kubo/repo"
|
|
irouting "github.com/ipfs/kubo/routing"
|
|
)
|
|
|
|
type Router struct {
|
|
routing.Routing
|
|
|
|
Priority int // less = more important
|
|
}
|
|
|
|
type p2pRouterOut struct {
|
|
fx.Out
|
|
|
|
Router Router `group:"routers"`
|
|
}
|
|
|
|
type processInitialRoutingIn struct {
|
|
fx.In
|
|
|
|
Router routing.Routing `name:"initialrouting"`
|
|
|
|
// For setting up experimental DHT client
|
|
Host host.Host
|
|
Repo repo.Repo
|
|
Validator record.Validator
|
|
}
|
|
|
|
type processInitialRoutingOut struct {
|
|
fx.Out
|
|
|
|
Router Router `group:"routers"`
|
|
ContentRouter routing.ContentRouting `group:"content-routers"`
|
|
|
|
DHT *ddht.DHT
|
|
DHTClient routing.Routing `name:"dhtc"`
|
|
}
|
|
|
|
type AddrInfoChan chan peer.AddrInfo
|
|
|
|
func BaseRouting(cfg *config.Config) any {
|
|
return func(lc fx.Lifecycle, in processInitialRoutingIn) (out processInitialRoutingOut, err error) {
|
|
var dualDHT *ddht.DHT
|
|
if dht, ok := in.Router.(*ddht.DHT); ok {
|
|
dualDHT = dht
|
|
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(ctx context.Context) error {
|
|
return dualDHT.Close()
|
|
},
|
|
})
|
|
}
|
|
|
|
if cr, ok := in.Router.(routinghelpers.ComposableRouter); ok {
|
|
for _, r := range cr.Routers() {
|
|
if dht, ok := r.(*ddht.DHT); ok {
|
|
dualDHT = dht
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(ctx context.Context) error {
|
|
return dualDHT.Close()
|
|
},
|
|
})
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
if dualDHT != nil && cfg.Routing.AcceleratedDHTClient.WithDefault(config.DefaultAcceleratedDHTClient) {
|
|
cfg, err := in.Repo.Config()
|
|
if err != nil {
|
|
return out, err
|
|
}
|
|
// Use auto-config resolution for actual connectivity
|
|
bspeers, err := cfg.BootstrapPeersWithAutoConf()
|
|
if err != nil {
|
|
return out, err
|
|
}
|
|
|
|
fullRTClient, err := fullrt.NewFullRT(in.Host,
|
|
dht.DefaultPrefix,
|
|
fullrt.DHTOption(
|
|
dht.Validator(in.Validator),
|
|
dht.Datastore(in.Repo.Datastore()),
|
|
dht.BootstrapPeers(bspeers...),
|
|
dht.BucketSize(20),
|
|
),
|
|
)
|
|
if err != nil {
|
|
return out, err
|
|
}
|
|
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(ctx context.Context) error {
|
|
return fullRTClient.Close()
|
|
},
|
|
})
|
|
|
|
// we want to also use the default HTTP routers, so wrap the FullRT client
|
|
// in a parallel router that calls them in parallel
|
|
httpRouters, err := constructDefaultHTTPRouters(cfg)
|
|
if err != nil {
|
|
return out, err
|
|
}
|
|
routers := []*routinghelpers.ParallelRouter{
|
|
{Router: fullRTClient, DoNotWaitForSearchValue: true},
|
|
}
|
|
routers = append(routers, httpRouters...)
|
|
router := routinghelpers.NewComposableParallel(routers)
|
|
|
|
return processInitialRoutingOut{
|
|
Router: Router{
|
|
Priority: 1000,
|
|
Routing: router,
|
|
},
|
|
DHT: dualDHT,
|
|
DHTClient: fullRTClient,
|
|
ContentRouter: fullRTClient,
|
|
}, nil
|
|
}
|
|
|
|
return processInitialRoutingOut{
|
|
Router: Router{
|
|
Priority: 1000,
|
|
Routing: in.Router,
|
|
},
|
|
DHT: dualDHT,
|
|
DHTClient: dualDHT,
|
|
ContentRouter: in.Router,
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
type p2pOnlineContentRoutingIn struct {
|
|
fx.In
|
|
|
|
ContentRouter []routing.ContentRouting `group:"content-routers"`
|
|
}
|
|
|
|
// ContentRouting will get all routers that can do contentRouting and add them
|
|
// all together using a TieredRouter. It will be used for topic discovery.
|
|
func ContentRouting(in p2pOnlineContentRoutingIn) routing.ContentRouting {
|
|
var routers []routing.Routing
|
|
for _, cr := range in.ContentRouter {
|
|
routers = append(routers,
|
|
&routinghelpers.Compose{
|
|
ContentRouting: cr,
|
|
},
|
|
)
|
|
}
|
|
|
|
return routinghelpers.Tiered{
|
|
Routers: routers,
|
|
}
|
|
}
|
|
|
|
// ContentDiscovery narrows down the given content routing facility so that it
|
|
// only does discovery.
|
|
func ContentDiscovery(in irouting.ProvideManyRouter) routing.ContentDiscovery {
|
|
return in
|
|
}
|
|
|
|
type p2pOnlineRoutingIn struct {
|
|
fx.In
|
|
|
|
Routers []Router `group:"routers"`
|
|
Validator record.Validator
|
|
}
|
|
|
|
// Routing will get all routers obtained from different methods (delegated
|
|
// routers, pub-sub, and so on) and add them all together using a ParallelRouter.
|
|
func Routing(in p2pOnlineRoutingIn) irouting.ProvideManyRouter {
|
|
routers := in.Routers
|
|
|
|
sort.SliceStable(routers, func(i, j int) bool {
|
|
return routers[i].Priority < routers[j].Priority
|
|
})
|
|
|
|
var cRouters []*routinghelpers.ParallelRouter
|
|
for _, v := range routers {
|
|
cRouters = append(cRouters, &routinghelpers.ParallelRouter{
|
|
IgnoreError: true,
|
|
DoNotWaitForSearchValue: true,
|
|
Router: v.Routing,
|
|
})
|
|
}
|
|
|
|
return routinghelpers.NewComposableParallel(cRouters)
|
|
}
|
|
|
|
// OfflineRouting provides a special Router to the routers list when we are
|
|
// creating an offline node.
|
|
func OfflineRouting(dstore ds.Datastore, validator record.Validator) p2pRouterOut {
|
|
return p2pRouterOut{
|
|
Router: Router{
|
|
Routing: offroute.NewOfflineRouter(dstore, validator),
|
|
Priority: 10000,
|
|
},
|
|
}
|
|
}
|
|
|
|
type p2pPSRoutingIn struct {
|
|
fx.In
|
|
|
|
Validator record.Validator
|
|
Host host.Host
|
|
PubSub *pubsub.PubSub `optional:"true"`
|
|
}
|
|
|
|
func PubsubRouter(mctx helpers.MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) (p2pRouterOut, *namesys.PubsubValueStore, error) {
|
|
psRouter, err := namesys.NewPubsubValueStore(
|
|
helpers.LifecycleCtx(mctx, lc),
|
|
in.Host,
|
|
in.PubSub,
|
|
in.Validator,
|
|
namesys.WithRebroadcastInterval(time.Minute),
|
|
)
|
|
if err != nil {
|
|
return p2pRouterOut{}, nil, err
|
|
}
|
|
|
|
return p2pRouterOut{
|
|
Router: Router{
|
|
Routing: &routinghelpers.Compose{
|
|
ValueStore: &routinghelpers.LimitedValueStore{
|
|
ValueStore: psRouter,
|
|
Namespaces: []string{"ipns"},
|
|
},
|
|
},
|
|
Priority: 100,
|
|
},
|
|
}, psRouter, nil
|
|
}
|
|
|
|
func autoRelayFeeder(cfgPeering config.Peering, peerChan chan<- peer.AddrInfo) fx.Option {
|
|
return fx.Invoke(func(lc fx.Lifecycle, h host.Host, dht *ddht.DHT) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
done := make(chan struct{})
|
|
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
fmt.Println("Recovering from unexpected error in AutoRelayFeeder:", r)
|
|
debug.PrintStack()
|
|
}
|
|
}()
|
|
go func() {
|
|
defer close(done)
|
|
|
|
// Feed peers more often right after the bootstrap, then backoff
|
|
bo := backoff.NewExponentialBackOff()
|
|
bo.InitialInterval = 15 * time.Second
|
|
bo.Multiplier = 3
|
|
bo.MaxInterval = 1 * time.Hour
|
|
bo.MaxElapsedTime = 0 // never stop
|
|
t := backoff.NewTicker(bo)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-t.C:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
|
|
// Always feed trusted IDs (Peering.Peers in the config)
|
|
for _, trustedPeer := range cfgPeering.Peers {
|
|
if len(trustedPeer.Addrs) == 0 {
|
|
continue
|
|
}
|
|
select {
|
|
case peerChan <- trustedPeer:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
|
|
// Additionally, feed closest peers discovered via DHT
|
|
if dht != nil {
|
|
closestPeers, err := dht.WAN.GetClosestPeers(ctx, h.ID().String())
|
|
if err == nil {
|
|
for _, p := range closestPeers {
|
|
addrs := h.Peerstore().Addrs(p)
|
|
if len(addrs) == 0 {
|
|
continue
|
|
}
|
|
dhtPeer := peer.AddrInfo{ID: p, Addrs: addrs}
|
|
select {
|
|
case peerChan <- dhtPeer:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Additionally, feed all connected swarm peers as potential relay candidates.
|
|
// This includes peers from HTTP routing, manual swarm connect, mDNS discovery, etc.
|
|
// (fixes https://github.com/ipfs/kubo/issues/10899)
|
|
connectedPeers := h.Network().Peers()
|
|
for _, p := range connectedPeers {
|
|
addrs := h.Peerstore().Addrs(p)
|
|
if len(addrs) == 0 {
|
|
continue
|
|
}
|
|
swarmPeer := peer.AddrInfo{ID: p, Addrs: addrs}
|
|
select {
|
|
case peerChan <- swarmPeer:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(_ context.Context) error {
|
|
cancel()
|
|
<-done
|
|
return nil
|
|
},
|
|
})
|
|
})
|
|
}
|