From 0e55ca9377515ef2a78816ee78db33a0fa66bf3b Mon Sep 17 00:00:00 2001 From: Gus Eggert Date: Fri, 24 Mar 2023 21:09:35 -0400 Subject: [PATCH] feat: add experimental optimistic provide This adds the ability to enable "optimistic provide" to the default DHT client, which enables faster provides and reprovides. For more information about optimistic provide, see: https://protocollabs.notion.site/Optimistic-Provide-2c79745820fa45649d48de038516b814 Note that this feature only works when using non-custom router types. This does not include the ability to enable optimistic provide on custom routers for now, to minimize the footprint of this experimental feature. We intend on continuing to test this and improve the UX, which may or may not involve adding configuration for it to custom routers. We also plan on refactoring/redesigning custom routers more broadly so I don't want this to add more effort for maintainers and confusion for users. --- config/experiments.go | 18 +++-- core/node/libp2p/host.go | 21 +++-- core/node/libp2p/routingopt.go | 108 +++++++++---------------- docs/examples/kubo-as-a-library/go.mod | 3 +- docs/examples/kubo-as-a-library/go.sum | 6 +- go.mod | 3 +- go.sum | 6 +- test/cli/dht_opt_prov_test.go | 30 +++++++ test/cli/harness/node.go | 4 +- test/cli/testutils/random.go | 4 + 10 files changed, 111 insertions(+), 92 deletions(-) create mode 100644 test/cli/dht_opt_prov_test.go diff --git a/config/experiments.go b/config/experiments.go index 7ad87c853..072dcd0dd 100644 --- a/config/experiments.go +++ b/config/experiments.go @@ -1,12 +1,14 @@ package config type Experiments struct { - FilestoreEnabled bool - UrlstoreEnabled bool - ShardingEnabled bool `json:",omitempty"` // deprecated by autosharding: https://github.com/ipfs/kubo/pull/8527 - GraphsyncEnabled bool - Libp2pStreamMounting bool - P2pHttpProxy bool //nolint - StrategicProviding bool - AcceleratedDHTClient bool + FilestoreEnabled bool + UrlstoreEnabled bool + ShardingEnabled bool `json:",omitempty"` // deprecated by autosharding: https://github.com/ipfs/kubo/pull/8527 + GraphsyncEnabled bool + Libp2pStreamMounting bool + P2pHttpProxy bool //nolint + StrategicProviding bool + AcceleratedDHTClient bool + OptimisticProvide bool + OptimisticProvideJobsPoolSize int } diff --git a/core/node/libp2p/host.go b/core/node/libp2p/host.go index 15d815812..afbd2080c 100644 --- a/core/node/libp2p/host.go +++ b/core/node/libp2p/host.go @@ -53,13 +53,18 @@ func Host(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHo return out, err } + routingOptArgs := RoutingOptionArgs{ + Ctx: ctx, + Datastore: params.Repo.Datastore(), + Validator: params.Validator, + BootstrapPeers: bootstrappers, + OptimisticProvide: cfg.Experimental.OptimisticProvide, + OptimisticProvideJobsPoolSize: cfg.Experimental.OptimisticProvideJobsPoolSize, + } opts = append(opts, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { - r, err := params.RoutingOption( - ctx, h, - params.Repo.Datastore(), - params.Validator, - bootstrappers..., - ) + args := routingOptArgs + args.Host = h + r, err := params.RoutingOption(args) out.Routing = r return r, err })) @@ -69,10 +74,12 @@ func Host(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHo return P2PHostOut{}, err } + routingOptArgs.Host = out.Host + // this code is necessary just for tests: mock network constructions // ignore the libp2p constructor options that actually construct the routing! if out.Routing == nil { - r, err := params.RoutingOption(ctx, out.Host, params.Repo.Datastore(), params.Validator, bootstrappers...) + r, err := params.RoutingOption(routingOptArgs) if err != nil { return P2PHostOut{}, err } diff --git a/core/node/libp2p/routingopt.go b/core/node/libp2p/routingopt.go index 8a69e181b..1d47ae273 100644 --- a/core/node/libp2p/routingopt.go +++ b/core/node/libp2p/routingopt.go @@ -18,13 +18,17 @@ import ( routing "github.com/libp2p/go-libp2p/core/routing" ) -type RoutingOption func( - context.Context, - host.Host, - datastore.Batching, - record.Validator, - ...peer.AddrInfo, -) (routing.Routing, error) +type RoutingOptionArgs struct { + Ctx context.Context + Host host.Host + Datastore datastore.Batching + Validator record.Validator + BootstrapPeers []peer.AddrInfo + OptimisticProvide bool + OptimisticProvideJobsPoolSize int +} + +type RoutingOption func(args RoutingOptionArgs) (routing.Routing, error) // Default HTTP routers used in parallel to DHT when Routing.Type = "auto" var defaultHTTPRouters = []string{ @@ -40,25 +44,13 @@ func init() { } // ConstructDefaultRouting returns routers used when Routing.Type is unset or set to "auto" -func ConstructDefaultRouting(peerID string, addrs []string, privKey string, routingOpt RoutingOption) func( - ctx context.Context, - host host.Host, - dstore datastore.Batching, - validator record.Validator, - bootstrapPeers ...peer.AddrInfo, -) (routing.Routing, error) { - return func( - ctx context.Context, - host host.Host, - dstore datastore.Batching, - validator record.Validator, - bootstrapPeers ...peer.AddrInfo, - ) (routing.Routing, error) { +func ConstructDefaultRouting(peerID string, addrs []string, privKey string, routingOpt RoutingOption) RoutingOption { + return func(args RoutingOptionArgs) (routing.Routing, error) { // Defined routers will be queried in parallel (optimizing for response speed) // Different trade-offs can be made by setting Routing.Type = "custom" with own Routing.Routers var routers []*routinghelpers.ParallelRouter - dhtRouting, err := routingOpt(ctx, host, dstore, validator, bootstrapPeers...) + dhtRouting, err := routingOpt(args) if err != nil { return nil, err } @@ -97,54 +89,38 @@ func ConstructDefaultRouting(peerID string, addrs []string, privKey string, rout } // constructDHTRouting is used when Routing.Type = "dht" -func constructDHTRouting(mode dht.ModeOpt) func( - ctx context.Context, - host host.Host, - dstore datastore.Batching, - validator record.Validator, - bootstrapPeers ...peer.AddrInfo, -) (routing.Routing, error) { - return func( - ctx context.Context, - host host.Host, - dstore datastore.Batching, - validator record.Validator, - bootstrapPeers ...peer.AddrInfo, - ) (routing.Routing, error) { +func constructDHTRouting(mode dht.ModeOpt) RoutingOption { + return func(args RoutingOptionArgs) (routing.Routing, error) { + dhtOpts := []dht.Option{ + dht.Concurrency(10), + dht.Mode(mode), + dht.Datastore(args.Datastore), + dht.Validator(args.Validator), + } + if args.OptimisticProvide { + dhtOpts = append(dhtOpts, dht.EnableOptimisticProvide()) + } + if args.OptimisticProvideJobsPoolSize != 0 { + dhtOpts = append(dhtOpts, dht.OptimisticProvideJobsPoolSize(args.OptimisticProvideJobsPoolSize)) + } return dual.New( - ctx, host, - dual.DHTOption( - dht.Concurrency(10), - dht.Mode(mode), - dht.Datastore(dstore), - dht.Validator(validator)), - dual.WanDHTOption(dht.BootstrapPeers(bootstrapPeers...)), + args.Ctx, args.Host, + dual.DHTOption(dhtOpts...), + dual.WanDHTOption(dht.BootstrapPeers(args.BootstrapPeers...)), ) } } // ConstructDelegatedRouting is used when Routing.Type = "custom" -func ConstructDelegatedRouting(routers config.Routers, methods config.Methods, peerID string, addrs []string, privKey string) func( - ctx context.Context, - host host.Host, - dstore datastore.Batching, - validator record.Validator, - bootstrapPeers ...peer.AddrInfo, -) (routing.Routing, error) { - return func( - ctx context.Context, - host host.Host, - dstore datastore.Batching, - validator record.Validator, - bootstrapPeers ...peer.AddrInfo, - ) (routing.Routing, error) { +func ConstructDelegatedRouting(routers config.Routers, methods config.Methods, peerID string, addrs []string, privKey string) RoutingOption { + return func(args RoutingOptionArgs) (routing.Routing, error) { return irouting.Parse(routers, methods, &irouting.ExtraDHTParams{ - BootstrapPeers: bootstrapPeers, - Host: host, - Validator: validator, - Datastore: dstore, - Context: ctx, + BootstrapPeers: args.BootstrapPeers, + Host: args.Host, + Validator: args.Validator, + Datastore: args.Datastore, + Context: args.Ctx, }, &irouting.ExtraHTTPParams{ PeerID: peerID, @@ -154,13 +130,7 @@ func ConstructDelegatedRouting(routers config.Routers, methods config.Methods, p } } -func constructNilRouting( - ctx context.Context, - host host.Host, - dstore datastore.Batching, - validator record.Validator, - bootstrapPeers ...peer.AddrInfo, -) (routing.Routing, error) { +func constructNilRouting(_ RoutingOptionArgs) (routing.Routing, error) { return routinghelpers.Null{}, nil } diff --git a/docs/examples/kubo-as-a-library/go.mod b/docs/examples/kubo-as-a-library/go.mod index 3f809f00b..3fa9b7c90 100644 --- a/docs/examples/kubo-as-a-library/go.mod +++ b/docs/examples/kubo-as-a-library/go.mod @@ -103,7 +103,7 @@ require ( github.com/libp2p/go-doh-resolver v0.4.0 // indirect github.com/libp2p/go-flow-metrics v0.1.0 // indirect github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect - github.com/libp2p/go-libp2p-kad-dht v0.22.0 // indirect + github.com/libp2p/go-libp2p-kad-dht v0.23.0 // indirect github.com/libp2p/go-libp2p-kbucket v0.5.0 // indirect github.com/libp2p/go-libp2p-pubsub v0.9.3 // indirect github.com/libp2p/go-libp2p-pubsub-router v0.6.0 // indirect @@ -188,6 +188,7 @@ require ( golang.org/x/text v0.8.0 // indirect golang.org/x/tools v0.6.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect + gonum.org/v1/gonum v0.11.0 // indirect google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect google.golang.org/grpc v1.53.0 // indirect google.golang.org/protobuf v1.30.0 // indirect diff --git a/docs/examples/kubo-as-a-library/go.sum b/docs/examples/kubo-as-a-library/go.sum index 4a9480ed3..1403cb302 100644 --- a/docs/examples/kubo-as-a-library/go.sum +++ b/docs/examples/kubo-as-a-library/go.sum @@ -518,8 +518,8 @@ github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLE github.com/libp2p/go-libp2p-asn-util v0.3.0/go.mod h1:B1mcOrKUE35Xq/ASTmQ4tN3LNzVVaMNmq2NACuqyB9w= github.com/libp2p/go-libp2p-core v0.2.4/go.mod h1:STh4fdfa5vDYr0/SzYYeqnt+E6KfEV5VxfIrm0bcI0g= github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw= -github.com/libp2p/go-libp2p-kad-dht v0.22.0 h1:cW2nGgG0hztDM42tOPyC5cVflD7EzLaHM0/Kjol6Wio= -github.com/libp2p/go-libp2p-kad-dht v0.22.0/go.mod h1:hareSo3Z/GJ7nUWPMj7XhD/56a7+rRltYCWwCuy3FQk= +github.com/libp2p/go-libp2p-kad-dht v0.23.0 h1:sxE6LxLopp79eLeV695n7+c77V/Vn4AMF28AdM/XFqM= +github.com/libp2p/go-libp2p-kad-dht v0.23.0/go.mod h1:oO5N308VT2msnQI6qi5M61wzPmJYg7Tr9e16m5n7uDU= github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio= github.com/libp2p/go-libp2p-kbucket v0.5.0 h1:g/7tVm8ACHDxH29BGrpsQlnNeu+6OF1A9bno/4/U1oA= github.com/libp2p/go-libp2p-kbucket v0.5.0/go.mod h1:zGzGCpQd78b5BNTDGHNDLaTt9aDK/A02xeZp9QeFC4U= @@ -1191,6 +1191,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E= +gonum.org/v1/gonum v0.11.0/go.mod h1:fSG4YDCxxUZQJ7rKsQrj0gMOg00Il0Z96/qMA4bVQhA= google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/api v0.1.0/go.mod h1:UGEZY7KEX120AnNLIHFMKIo4obdJhkp2tPbaPlQx13Y= diff --git a/go.mod b/go.mod index 0c1456a07..38f5a5a21 100644 --- a/go.mod +++ b/go.mod @@ -47,7 +47,7 @@ require ( github.com/libp2p/go-doh-resolver v0.4.0 github.com/libp2p/go-libp2p v0.26.4 github.com/libp2p/go-libp2p-http v0.5.0 - github.com/libp2p/go-libp2p-kad-dht v0.22.0 + github.com/libp2p/go-libp2p-kad-dht v0.23.0 github.com/libp2p/go-libp2p-kbucket v0.5.0 github.com/libp2p/go-libp2p-pubsub v0.9.3 github.com/libp2p/go-libp2p-pubsub-router v0.6.0 @@ -222,6 +222,7 @@ require ( golang.org/x/text v0.8.0 // indirect golang.org/x/tools v0.6.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect + gonum.org/v1/gonum v0.11.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect google.golang.org/grpc v1.53.0 // indirect diff --git a/go.sum b/go.sum index 6ac7bbbec..bd48a883c 100644 --- a/go.sum +++ b/go.sum @@ -549,8 +549,8 @@ github.com/libp2p/go-libp2p-gostream v0.6.0 h1:QfAiWeQRce6pqnYfmIVWJFXNdDyfiR/qk github.com/libp2p/go-libp2p-gostream v0.6.0/go.mod h1:Nywu0gYZwfj7Jc91PQvbGU8dIpqbQQkjWgDuOrFaRdA= github.com/libp2p/go-libp2p-http v0.5.0 h1:+x0AbLaUuLBArHubbbNRTsgWz0RjNTy6DJLOxQ3/QBc= github.com/libp2p/go-libp2p-http v0.5.0/go.mod h1:glh87nZ35XCQyFsdzZps6+F4HYI6DctVFY5u1fehwSg= -github.com/libp2p/go-libp2p-kad-dht v0.22.0 h1:cW2nGgG0hztDM42tOPyC5cVflD7EzLaHM0/Kjol6Wio= -github.com/libp2p/go-libp2p-kad-dht v0.22.0/go.mod h1:hareSo3Z/GJ7nUWPMj7XhD/56a7+rRltYCWwCuy3FQk= +github.com/libp2p/go-libp2p-kad-dht v0.23.0 h1:sxE6LxLopp79eLeV695n7+c77V/Vn4AMF28AdM/XFqM= +github.com/libp2p/go-libp2p-kad-dht v0.23.0/go.mod h1:oO5N308VT2msnQI6qi5M61wzPmJYg7Tr9e16m5n7uDU= github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio= github.com/libp2p/go-libp2p-kbucket v0.5.0 h1:g/7tVm8ACHDxH29BGrpsQlnNeu+6OF1A9bno/4/U1oA= github.com/libp2p/go-libp2p-kbucket v0.5.0/go.mod h1:zGzGCpQd78b5BNTDGHNDLaTt9aDK/A02xeZp9QeFC4U= @@ -1282,6 +1282,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E= +gonum.org/v1/gonum v0.11.0/go.mod h1:fSG4YDCxxUZQJ7rKsQrj0gMOg00Il0Z96/qMA4bVQhA= google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/api v0.1.0/go.mod h1:UGEZY7KEX120AnNLIHFMKIo4obdJhkp2tPbaPlQx13Y= diff --git a/test/cli/dht_opt_prov_test.go b/test/cli/dht_opt_prov_test.go new file mode 100644 index 000000000..5481315af --- /dev/null +++ b/test/cli/dht_opt_prov_test.go @@ -0,0 +1,30 @@ +package cli + +import ( + "testing" + + "github.com/ipfs/kubo/config" + "github.com/ipfs/kubo/test/cli/harness" + "github.com/ipfs/kubo/test/cli/testutils" + "github.com/stretchr/testify/assert" +) + +func TestDHTOptimisticProvide(t *testing.T) { + t.Parallel() + + t.Run("optimistic provide smoke test", func(t *testing.T) { + nodes := harness.NewT(t).NewNodes(2).Init() + + nodes[0].UpdateConfig(func(cfg *config.Config) { + cfg.Experimental.OptimisticProvide = true + }) + + nodes.StartDaemons().Connect() + + hash := nodes[0].IPFSAddStr(testutils.RandomStr(100)) + nodes[0].IPFS("dht", "provide", hash) + + res := nodes[1].IPFS("routing", "findprovs", "--num-providers=1", hash) + assert.Equal(t, nodes[0].PeerID().String(), res.Stdout.Trimmed()) + }) +} diff --git a/test/cli/harness/node.go b/test/cli/harness/node.go index 625a01fb6..0516c2b45 100644 --- a/test/cli/harness/node.go +++ b/test/cli/harness/node.go @@ -218,8 +218,8 @@ func (n *Node) Init(ipfsArgs ...string) *Node { // // node.StartDaemonWithReq(harness.RunRequest{ // CmdOpts: []harness.CmdOpt{ -// harness.RunWithStderr(os.Stdout), -// harness.RunWithStdout(os.Stdout), +// harness.RunWithStderr(os.Stdout), +// harness.RunWithStdout(os.Stdout), // }, // }) func (n *Node) StartDaemonWithReq(req RunRequest) *Node { diff --git a/test/cli/testutils/random.go b/test/cli/testutils/random.go index 00bb9de49..6fa6528c3 100644 --- a/test/cli/testutils/random.go +++ b/test/cli/testutils/random.go @@ -10,3 +10,7 @@ func RandomBytes(n int) []byte { } return bytes } + +func RandomStr(n int) string { + return string(RandomBytes(n)) +}