integrate experimental AcceleratedDHTClient

The experimental AcceleratedDHTClient can be enabled from the config

When enabled it modifies the output of the `ipfs stats dht` command.
This commit is contained in:
Adin Schmahmann 2021-04-05 12:48:32 -04:00
parent afa98998da
commit 2fd55d198c
9 changed files with 191 additions and 49 deletions

View File

@ -45,6 +45,12 @@ const (
dhtVerboseOptionName = "verbose"
)
// kademlia extends the routing interface with a command to get the peers closest to the target
type kademlia interface {
routing.Routing
GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error)
}
var queryDhtCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Find the closest Peer IDs to a given Peer ID by querying the DHT.",
@ -63,7 +69,7 @@ var queryDhtCmd = &cmds.Command{
return err
}
if nd.DHT == nil {
if nd.DHTClient == nil {
return ErrNotDHT
}
@ -73,40 +79,46 @@ var queryDhtCmd = &cmds.Command{
}
ctx, cancel := context.WithCancel(req.Context)
defer cancel()
ctx, events := routing.RegisterForQueryEvents(ctx)
dht := nd.DHT.WAN
if !nd.DHT.WANActive() {
dht = nd.DHT.LAN
client := nd.DHTClient
if client == nd.DHT {
client = nd.DHT.WAN
if !nd.DHT.WANActive() {
client = nd.DHT.LAN
}
}
errCh := make(chan error, 1)
go func() {
defer close(errCh)
defer cancel()
closestPeers, err := dht.GetClosestPeers(ctx, string(id))
if closestPeers != nil {
for p := range closestPeers {
if d, ok := client.(kademlia); !ok {
return fmt.Errorf("dht client does not support GetClosestPeers")
} else {
errCh := make(chan error, 1)
go func() {
defer close(errCh)
defer cancel()
closestPeers, err := d.GetClosestPeers(ctx, string(id))
for _, p := range closestPeers {
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
ID: p,
Type: routing.FinalPeer,
})
}
if err != nil {
errCh <- err
return
}
}()
for e := range events {
if err := res.Emit(e); err != nil {
return err
}
}
if err != nil {
errCh <- err
return
}
}()
for e := range events {
if err := res.Emit(e); err != nil {
return err
}
return <-errCh
}
return <-errCh
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error {

View File

@ -12,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
pstore "github.com/libp2p/go-libp2p-core/peerstore"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/fullrt"
kbucket "github.com/libp2p/go-libp2p-kbucket"
)
@ -43,7 +44,8 @@ This interface is not stable and may change from release to release.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("dht", false, true, "The DHT whose table should be listed (wan or lan). Defaults to both."),
cmds.StringArg("dht", false, true, "The DHT whose table should be listed (wanserver, lanserver, wan, lan). "+
"wan and lan refer to client routing tables. When using the experimental DHT client only WAN is supported. Defaults to wan and lan."),
},
Options: []cmds.Option{},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
@ -67,12 +69,62 @@ This interface is not stable and may change from release to release.
dhts = []string{"wan", "lan"}
}
dhttypeloop:
for _, name := range dhts {
var dht *dht.IpfsDHT
var separateClient bool
if nd.DHTClient != nd.DHT {
separateClient = true
}
switch name {
case "wan":
if separateClient {
client, ok := nd.DHTClient.(*fullrt.FullRT)
if !ok {
return cmds.Errorf(cmds.ErrClient, "could not generate stats for the WAN DHT client type")
}
peerMap := client.Stat()
buckets := make([]dhtBucket, 1)
b := &dhtBucket{}
for _, p := range peerMap {
info := dhtPeerInfo{ID: p.String()}
if ver, err := nd.Peerstore.Get(p, "AgentVersion"); err == nil {
info.AgentVersion, _ = ver.(string)
} else if err == pstore.ErrNotFound {
// ignore
} else {
// this is a bug, usually.
log.Errorw(
"failed to get agent version from peerstore",
"error", err,
)
}
info.Connected = nd.PeerHost.Network().Connectedness(p) == network.Connected
b.Peers = append(b.Peers, info)
}
buckets[0] = *b
if err := res.Emit(dhtStat{
Name: name,
Buckets: buckets,
}); err != nil {
return err
}
continue dhttypeloop
}
fallthrough
case "wanserver":
dht = nd.DHT.WAN
case "lan":
if separateClient {
return cmds.Errorf(cmds.ErrClient, "no LAN client found")
}
fallthrough
case "lanserver":
dht = nd.DHT.LAN
default:
return cmds.Errorf(cmds.ErrClient, "unknown dht type: %s", name)

View File

@ -98,8 +98,11 @@ type IpfsNode struct {
PubSub *pubsub.PubSub `optional:"true"`
PSRouter *psrouter.PubsubValueStore `optional:"true"`
DHT *ddht.DHT `optional:"true"`
P2P *p2p.P2P `optional:"true"`
DHT *ddht.DHT `optional:"true"`
DHTClient routing.Routing `name:"dhtc" optional:"true"`
P2P *p2p.P2P `optional:"true"`
Process goprocess.Process
ctx context.Context

View File

@ -139,7 +139,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option {
fx.Provide(libp2p.Security(!bcfg.DisableEncryptedConnections, cfg.Swarm.Transports)),
fx.Provide(libp2p.Routing),
fx.Provide(libp2p.BaseRouting),
fx.Provide(libp2p.BaseRouting(cfg.Experimental.AcceleratedDHTClient)),
maybeProvide(libp2p.PubsubRouter, bcfg.getOpt("ipnsps")),
maybeProvide(libp2p.BandwidthCounter, !cfg.Swarm.DisableBandwidthMetrics),

View File

@ -34,7 +34,7 @@ type P2PHostOut struct {
fx.Out
Host host.Host
Routing BaseIpfsRouting
Routing routing.Routing `name:"initialrouting"`
}
func Host(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHostOut, err error) {

View File

@ -7,9 +7,12 @@ import (
"github.com/ipfs/go-ipfs/core/node/helpers"
"github.com/ipfs/go-ipfs/repo"
host "github.com/libp2p/go-libp2p-core/host"
routing "github.com/libp2p/go-libp2p-core/routing"
dht "github.com/libp2p/go-libp2p-kad-dht"
ddht "github.com/libp2p/go-libp2p-kad-dht/dual"
"github.com/libp2p/go-libp2p-kad-dht/fullrt"
"github.com/libp2p/go-libp2p-pubsub"
namesys "github.com/libp2p/go-libp2p-pubsub-router"
record "github.com/libp2p/go-libp2p-record"
@ -32,23 +35,89 @@ type p2pRouterOut struct {
Router Router `group:"routers"`
}
func BaseRouting(lc fx.Lifecycle, in BaseIpfsRouting) (out p2pRouterOut, dr *ddht.DHT) {
if dht, ok := in.(*ddht.DHT); ok {
dr = dht
type processInitialRoutingIn struct {
fx.In
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return dr.Close()
Router routing.Routing `name:"initialrouting"`
// For setting up experimental DHT client
Host host.Host
Repo repo.Repo
Validator record.Validator
}
type processInitialRoutingOut struct {
fx.Out
Router Router `group:"routers"`
DHT *ddht.DHT
DHTClient routing.Routing `name:"dhtc"`
BaseRT BaseIpfsRouting
}
func BaseRouting(experimentalDHTClient bool) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, in processInitialRoutingIn) (out processInitialRoutingOut, err error) {
var dr *ddht.DHT
if dht, ok := in.Router.(*ddht.DHT); ok {
dr = dht
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return dr.Close()
},
})
}
if dr != nil && experimentalDHTClient {
cfg, err := in.Repo.Config()
if err != nil {
return out, err
}
bspeers, err := cfg.BootstrapPeers()
if err != nil {
return out, err
}
expClient, err := fullrt.NewFullRT(in.Host,
dht.DefaultPrefix,
fullrt.DHTOption(
dht.Validator(in.Validator),
dht.Datastore(in.Repo.Datastore()),
dht.BootstrapPeers(bspeers...),
dht.BucketSize(20),
),
)
if err != nil {
return out, err
}
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return expClient.Close()
},
})
return processInitialRoutingOut{
Router: Router{
Routing: expClient,
Priority: 1000,
},
DHT: dr,
DHTClient: expClient,
BaseRT: expClient,
}, nil
}
return processInitialRoutingOut{
Router: Router{
Priority: 1000,
Routing: in.Router,
},
})
DHT: dr,
DHTClient: dr,
BaseRT: in.Router,
}, nil
}
return p2pRouterOut{
Router: Router{
Priority: 1000,
Routing: in,
},
}, dr
}
type p2pOnlineRoutingIn struct {

View File

@ -2,7 +2,6 @@ package libp2p
import (
"context"
"github.com/ipfs/go-datastore"
host "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"

4
go.mod
View File

@ -29,7 +29,7 @@ require (
github.com/ipfs/go-ipfs-blockstore v0.1.4
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-cmds v0.6.0
github.com/ipfs/go-ipfs-config v0.13.0
github.com/ipfs/go-ipfs-config v0.14.0
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.8
@ -65,7 +65,7 @@ require (
github.com/libp2p/go-libp2p-core v0.8.5
github.com/libp2p/go-libp2p-discovery v0.5.0
github.com/libp2p/go-libp2p-http v0.2.0
github.com/libp2p/go-libp2p-kad-dht v0.11.1
github.com/libp2p/go-libp2p-kad-dht v0.12.0
github.com/libp2p/go-libp2p-kbucket v0.4.7
github.com/libp2p/go-libp2p-loggables v0.1.0
github.com/libp2p/go-libp2p-mplex v0.4.1

15
go.sum
View File

@ -271,8 +271,9 @@ github.com/google/pprof v0.0.0-20200212024743-f11f1df84d12/go.mod h1:ZgVRPoUq/hf
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY=
github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
@ -411,8 +412,8 @@ github.com/ipfs/go-ipfs-chunker v0.0.5 h1:ojCf7HV/m+uS2vhUGWcogIIxiO5ubl5O57Q7Na
github.com/ipfs/go-ipfs-chunker v0.0.5/go.mod h1:jhgdF8vxRHycr00k13FM8Y0E+6BoalYeobXmUyTreP8=
github.com/ipfs/go-ipfs-cmds v0.6.0 h1:yAxdowQZzoFKjcLI08sXVNnqVj3jnABbf9smrPQmBsw=
github.com/ipfs/go-ipfs-cmds v0.6.0/go.mod h1:ZgYiWVnCk43ChwoH8hAmI1IRbuVtq3GSTHwtRB/Kqhk=
github.com/ipfs/go-ipfs-config v0.13.0 h1:ZH3dTmkVR9TTFBIbfWnFNC1JdwHbj8F0ryiaIFo7U/o=
github.com/ipfs/go-ipfs-config v0.13.0/go.mod h1:Ei/FLgHGTdPyqCPK0oPCwGTe8VSnsjJjx7HZqUb6Ry0=
github.com/ipfs/go-ipfs-config v0.14.0 h1:KijwGU788UycqPWv4GxzyfyN6EtfJjjDRzd/wSA86VU=
github.com/ipfs/go-ipfs-config v0.14.0/go.mod h1:Ei/FLgHGTdPyqCPK0oPCwGTe8VSnsjJjx7HZqUb6Ry0=
github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ=
github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
@ -677,8 +678,10 @@ github.com/libp2p/go-libp2p-interface-connmgr v0.0.1/go.mod h1:GarlRLH0LdeWcLnYM
github.com/libp2p/go-libp2p-interface-connmgr v0.0.4/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k=
github.com/libp2p/go-libp2p-interface-connmgr v0.0.5/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k=
github.com/libp2p/go-libp2p-interface-pnet v0.0.1/go.mod h1:el9jHpQAXK5dnTpKA4yfCNBZXvrzdOU75zz+C6ryp3k=
github.com/libp2p/go-libp2p-kad-dht v0.11.1 h1:FsriVQhOUZpCotWIjyFSjEDNJmUzuMma/RyyTDZanwc=
github.com/libp2p/go-libp2p-kad-dht v0.11.1/go.mod h1:5ojtR2acDPqh/jXf5orWy8YGb8bHQDS+qeDcoscL/PI=
github.com/libp2p/go-libp2p-kad-dht v0.12.0 h1:R5vvp8kuXjsyDE/HEMKgM8XIwlRsP5BdAZexM+tJxdU=
github.com/libp2p/go-libp2p-kad-dht v0.12.0/go.mod h1:zdQYru1c7dnluMpZls4i9Fj2TwYXS7YyDkJ1Yahv0w0=
github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio=
github.com/libp2p/go-libp2p-kbucket v0.4.7 h1:spZAcgxifvFZHBD8tErvppbnNiKA5uokDu3CV7axu70=
github.com/libp2p/go-libp2p-kbucket v0.4.7/go.mod h1:XyVo99AfQH0foSf176k4jY1xUJ2+jUJIZCSDm7r2YKk=
github.com/libp2p/go-libp2p-loggables v0.0.1/go.mod h1:lDipDlBNYbpyqyPX/KcoO+eq0sJYEVR2JgOexcivchg=
@ -777,6 +780,8 @@ github.com/libp2p/go-libp2p-transport-upgrader v0.3.0/go.mod h1:i+SKzbRnvXdVbU3D
github.com/libp2p/go-libp2p-transport-upgrader v0.4.0/go.mod h1:J4ko0ObtZSmgn5BX5AmegP+dK3CSnU2lMCKsSq/EY0s=
github.com/libp2p/go-libp2p-transport-upgrader v0.4.2 h1:4JsnbfJzgZeRS9AWN7B9dPqn/LY/HoQTlO9gtdJTIYM=
github.com/libp2p/go-libp2p-transport-upgrader v0.4.2/go.mod h1:NR8ne1VwfreD5VIWIU62Agt/J18ekORFU/j1i2y8zvk=
github.com/libp2p/go-libp2p-xor v0.0.0-20200501025846-71e284145d58 h1:GcTNu27BMpOTtMnQqun03+kbtHA1qTxJ/J8cZRRYu2k=
github.com/libp2p/go-libp2p-xor v0.0.0-20200501025846-71e284145d58/go.mod h1:AYjOiqJIdcmI4SXE2ouKQuFrUbE5myv8txWaB2pl4TI=
github.com/libp2p/go-libp2p-yamux v0.1.2/go.mod h1:xUoV/RmYkg6BW/qGxA9XJyg+HzXFYkeXbnhjmnYzKp8=
github.com/libp2p/go-libp2p-yamux v0.1.3/go.mod h1:VGSQVrqkh6y4nm0189qqxMtvyBft44MOYYPpYKXiVt4=
github.com/libp2p/go-libp2p-yamux v0.2.0/go.mod h1:Db2gU+XfLpm6E4rG5uGCFX6uXA8MEXOxFcRoXUODaK8=
@ -1208,6 +1213,7 @@ github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX
github.com/urfave/cli/v2 v2.0.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30/go.mod h1:YkocrP2K2tcw938x9gCOmT5G5eCD6jsTz0SZuyAqwIE=
github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a h1:G++j5e0OC488te356JvdhaM8YS6nMsjLAYF7JxCv07w=
@ -1312,6 +1318,7 @@ golang.org/x/crypto v0.0.0-20200602180216-279210d13fed/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf h1:B2n+Zi5QeYRDAEodEu72OS36gmTWjgpXr2+cWcBW90o=
golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=