fix: use default HTTP routers when FullRT DHT client is used (#9841)

This commit is contained in:
Gus Eggert 2023-05-01 15:29:34 -04:00 committed by GitHub
parent e4908a0163
commit 4ca36c41b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 65 additions and 53 deletions

View File

@ -413,19 +413,9 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
case routingOptionSupernodeKwd:
return errors.New("supernode routing was never fully implemented and has been removed")
case routingOptionDefaultKwd, routingOptionAutoKwd:
ncfg.Routing = libp2p.ConstructDefaultRouting(
cfg.Identity.PeerID,
cfg.Addresses.Swarm,
cfg.Identity.PrivKey,
libp2p.DHTOption,
)
ncfg.Routing = libp2p.ConstructDefaultRouting(cfg, libp2p.DHTOption)
case routingOptionAutoClientKwd:
ncfg.Routing = libp2p.ConstructDefaultRouting(
cfg.Identity.PeerID,
cfg.Addresses.Swarm,
cfg.Identity.PrivKey,
libp2p.DHTClientOption,
)
ncfg.Routing = libp2p.ConstructDefaultRouting(cfg, libp2p.DHTClientOption)
case routingOptionDHTClientKwd:
ncfg.Routing = libp2p.DHTClientOption
case routingOptionDHTKwd:

View File

@ -165,7 +165,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part
fx.Provide(libp2p.Routing),
fx.Provide(libp2p.ContentRouting),
fx.Provide(libp2p.BaseRouting(cfg.Experimental.AcceleratedDHTClient)),
fx.Provide(libp2p.BaseRouting(cfg)),
maybeProvide(libp2p.PubsubRouter, bcfg.getOpt("ipnsps")),
maybeProvide(libp2p.BandwidthCounter, !cfg.Swarm.DisableBandwidthMetrics),

View File

@ -63,35 +63,34 @@ type processInitialRoutingOut struct {
type AddrInfoChan chan peer.AddrInfo
func BaseRouting(experimentalDHTClient bool) interface{} {
func BaseRouting(cfg *config.Config) interface{} {
return func(lc fx.Lifecycle, in processInitialRoutingIn) (out processInitialRoutingOut, err error) {
var dr *ddht.DHT
var dualDHT *ddht.DHT
if dht, ok := in.Router.(*ddht.DHT); ok {
dr = dht
dualDHT = dht
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return dr.Close()
return dualDHT.Close()
},
})
}
if pr, ok := in.Router.(routinghelpers.ComposableRouter); ok {
for _, r := range pr.Routers() {
if cr, ok := in.Router.(routinghelpers.ComposableRouter); ok {
for _, r := range cr.Routers() {
if dht, ok := r.(*ddht.DHT); ok {
dr = dht
dualDHT = dht
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return dr.Close()
return dualDHT.Close()
},
})
break
}
}
}
if dr != nil && experimentalDHTClient {
if dualDHT != nil && cfg.Experimental.AcceleratedDHTClient {
cfg, err := in.Repo.Config()
if err != nil {
return out, err
@ -101,7 +100,7 @@ func BaseRouting(experimentalDHTClient bool) interface{} {
return out, err
}
expClient, err := fullrt.NewFullRT(in.Host,
fullRTClient, err := fullrt.NewFullRT(in.Host,
dht.DefaultPrefix,
fullrt.DHTOption(
dht.Validator(in.Validator),
@ -116,18 +115,30 @@ func BaseRouting(experimentalDHTClient bool) interface{} {
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return expClient.Close()
return fullRTClient.Close()
},
})
// we want to also use the default HTTP routers, so wrap the FullRT client
// in a parallel router that calls them in parallel
httpRouters, err := constructDefaultHTTPRouters(cfg)
if err != nil {
return out, err
}
routers := []*routinghelpers.ParallelRouter{
{Router: fullRTClient},
}
routers = append(routers, httpRouters...)
router := routinghelpers.NewComposableParallel(routers)
return processInitialRoutingOut{
Router: Router{
Routing: expClient,
Priority: 1000,
Routing: router,
},
DHT: dr,
DHTClient: expClient,
ContentRouter: expClient,
DHT: dualDHT,
DHTClient: fullRTClient,
ContentRouter: fullRTClient,
}, nil
}
@ -136,8 +147,8 @@ func BaseRouting(experimentalDHTClient bool) interface{} {
Priority: 1000,
Routing: in.Router,
},
DHT: dr,
DHTClient: dr,
DHT: dualDHT,
DHTClient: dualDHT,
ContentRouter: in.Router,
}, nil
}

View File

@ -43,8 +43,35 @@ func init() {
}
}
func constructDefaultHTTPRouters(cfg *config.Config) ([]*routinghelpers.ParallelRouter, error) {
var routers []*routinghelpers.ParallelRouter
// Append HTTP routers for additional speed
for _, endpoint := range defaultHTTPRouters {
httpRouter, err := irouting.ConstructHTTPRouter(endpoint, cfg.Identity.PeerID, cfg.Addresses.Swarm, cfg.Identity.PrivKey)
if err != nil {
return nil, err
}
r := &irouting.Composer{
GetValueRouter: routinghelpers.Null{},
PutValueRouter: routinghelpers.Null{},
ProvideRouter: routinghelpers.Null{}, // modify this when indexers supports provide
FindPeersRouter: routinghelpers.Null{},
FindProvidersRouter: httpRouter,
}
routers = append(routers, &routinghelpers.ParallelRouter{
Router: r,
IgnoreError: true, // https://github.com/ipfs/kubo/pull/9475#discussion_r1042507387
Timeout: 15 * time.Second, // 5x server value from https://github.com/ipfs/kubo/pull/9475#discussion_r1042428529
ExecuteAfter: 0,
})
}
return routers, nil
}
// ConstructDefaultRouting returns routers used when Routing.Type is unset or set to "auto"
func ConstructDefaultRouting(peerID string, addrs []string, privKey string, routingOpt RoutingOption) RoutingOption {
func ConstructDefaultRouting(cfg *config.Config, routingOpt RoutingOption) RoutingOption {
return func(args RoutingOptionArgs) (routing.Routing, error) {
// Defined routers will be queried in parallel (optimizing for response speed)
// Different trade-offs can be made by setting Routing.Type = "custom" with own Routing.Routers
@ -60,29 +87,13 @@ func ConstructDefaultRouting(peerID string, addrs []string, privKey string, rout
ExecuteAfter: 0,
})
// Append HTTP routers for additional speed
for _, endpoint := range defaultHTTPRouters {
httpRouter, err := irouting.ConstructHTTPRouter(endpoint, peerID, addrs, privKey)
if err != nil {
return nil, err
}
r := &irouting.Composer{
GetValueRouter: routinghelpers.Null{},
PutValueRouter: routinghelpers.Null{},
ProvideRouter: routinghelpers.Null{}, // modify this when indexers supports provide
FindPeersRouter: routinghelpers.Null{},
FindProvidersRouter: httpRouter,
}
routers = append(routers, &routinghelpers.ParallelRouter{
Router: r,
IgnoreError: true, // https://github.com/ipfs/kubo/pull/9475#discussion_r1042507387
Timeout: 15 * time.Second, // 5x server value from https://github.com/ipfs/kubo/pull/9475#discussion_r1042428529
ExecuteAfter: 0,
})
httpRouters, err := constructDefaultHTTPRouters(cfg)
if err != nil {
return nil, err
}
routers = append(routers, httpRouters...)
routing := routinghelpers.NewComposableParallel(routers)
return routing, nil
}