feat: add experimental optimistic provide

This adds the ability to enable "optimistic provide" to the default
DHT client, which enables faster provides and reprovides.

For more information about optimistic provide, see:

https://protocollabs.notion.site/Optimistic-Provide-2c79745820fa45649d48de038516b814

Note that this feature only works when using non-custom router
types. This does not include the ability to enable optimistic provide
on custom routers for now, to minimize the footprint of this
experimental feature. We intend on continuing to test this and improve
the UX, which may or may not involve adding configuration for it to
custom routers. We also plan on refactoring/redesigning custom routers
more broadly so I don't want this to add more effort for maintainers
and confusion for users.
This commit is contained in:
Gus Eggert 2023-03-24 21:09:35 -04:00
parent 6f08cdedc0
commit 0e55ca9377
10 changed files with 111 additions and 92 deletions

View File

@ -1,12 +1,14 @@
package config
type Experiments struct {
FilestoreEnabled bool
UrlstoreEnabled bool
ShardingEnabled bool `json:",omitempty"` // deprecated by autosharding: https://github.com/ipfs/kubo/pull/8527
GraphsyncEnabled bool
Libp2pStreamMounting bool
P2pHttpProxy bool //nolint
StrategicProviding bool
AcceleratedDHTClient bool
FilestoreEnabled bool
UrlstoreEnabled bool
ShardingEnabled bool `json:",omitempty"` // deprecated by autosharding: https://github.com/ipfs/kubo/pull/8527
GraphsyncEnabled bool
Libp2pStreamMounting bool
P2pHttpProxy bool //nolint
StrategicProviding bool
AcceleratedDHTClient bool
OptimisticProvide bool
OptimisticProvideJobsPoolSize int
}

View File

@ -53,13 +53,18 @@ func Host(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHo
return out, err
}
routingOptArgs := RoutingOptionArgs{
Ctx: ctx,
Datastore: params.Repo.Datastore(),
Validator: params.Validator,
BootstrapPeers: bootstrappers,
OptimisticProvide: cfg.Experimental.OptimisticProvide,
OptimisticProvideJobsPoolSize: cfg.Experimental.OptimisticProvideJobsPoolSize,
}
opts = append(opts, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
r, err := params.RoutingOption(
ctx, h,
params.Repo.Datastore(),
params.Validator,
bootstrappers...,
)
args := routingOptArgs
args.Host = h
r, err := params.RoutingOption(args)
out.Routing = r
return r, err
}))
@ -69,10 +74,12 @@ func Host(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHo
return P2PHostOut{}, err
}
routingOptArgs.Host = out.Host
// this code is necessary just for tests: mock network constructions
// ignore the libp2p constructor options that actually construct the routing!
if out.Routing == nil {
r, err := params.RoutingOption(ctx, out.Host, params.Repo.Datastore(), params.Validator, bootstrappers...)
r, err := params.RoutingOption(routingOptArgs)
if err != nil {
return P2PHostOut{}, err
}

View File

@ -18,13 +18,17 @@ import (
routing "github.com/libp2p/go-libp2p/core/routing"
)
type RoutingOption func(
context.Context,
host.Host,
datastore.Batching,
record.Validator,
...peer.AddrInfo,
) (routing.Routing, error)
type RoutingOptionArgs struct {
Ctx context.Context
Host host.Host
Datastore datastore.Batching
Validator record.Validator
BootstrapPeers []peer.AddrInfo
OptimisticProvide bool
OptimisticProvideJobsPoolSize int
}
type RoutingOption func(args RoutingOptionArgs) (routing.Routing, error)
// Default HTTP routers used in parallel to DHT when Routing.Type = "auto"
var defaultHTTPRouters = []string{
@ -40,25 +44,13 @@ func init() {
}
// ConstructDefaultRouting returns routers used when Routing.Type is unset or set to "auto"
func ConstructDefaultRouting(peerID string, addrs []string, privKey string, routingOpt RoutingOption) func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
return func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
func ConstructDefaultRouting(peerID string, addrs []string, privKey string, routingOpt RoutingOption) RoutingOption {
return func(args RoutingOptionArgs) (routing.Routing, error) {
// Defined routers will be queried in parallel (optimizing for response speed)
// Different trade-offs can be made by setting Routing.Type = "custom" with own Routing.Routers
var routers []*routinghelpers.ParallelRouter
dhtRouting, err := routingOpt(ctx, host, dstore, validator, bootstrapPeers...)
dhtRouting, err := routingOpt(args)
if err != nil {
return nil, err
}
@ -97,54 +89,38 @@ func ConstructDefaultRouting(peerID string, addrs []string, privKey string, rout
}
// constructDHTRouting is used when Routing.Type = "dht"
func constructDHTRouting(mode dht.ModeOpt) func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
return func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
func constructDHTRouting(mode dht.ModeOpt) RoutingOption {
return func(args RoutingOptionArgs) (routing.Routing, error) {
dhtOpts := []dht.Option{
dht.Concurrency(10),
dht.Mode(mode),
dht.Datastore(args.Datastore),
dht.Validator(args.Validator),
}
if args.OptimisticProvide {
dhtOpts = append(dhtOpts, dht.EnableOptimisticProvide())
}
if args.OptimisticProvideJobsPoolSize != 0 {
dhtOpts = append(dhtOpts, dht.OptimisticProvideJobsPoolSize(args.OptimisticProvideJobsPoolSize))
}
return dual.New(
ctx, host,
dual.DHTOption(
dht.Concurrency(10),
dht.Mode(mode),
dht.Datastore(dstore),
dht.Validator(validator)),
dual.WanDHTOption(dht.BootstrapPeers(bootstrapPeers...)),
args.Ctx, args.Host,
dual.DHTOption(dhtOpts...),
dual.WanDHTOption(dht.BootstrapPeers(args.BootstrapPeers...)),
)
}
}
// ConstructDelegatedRouting is used when Routing.Type = "custom"
func ConstructDelegatedRouting(routers config.Routers, methods config.Methods, peerID string, addrs []string, privKey string) func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
return func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
func ConstructDelegatedRouting(routers config.Routers, methods config.Methods, peerID string, addrs []string, privKey string) RoutingOption {
return func(args RoutingOptionArgs) (routing.Routing, error) {
return irouting.Parse(routers, methods,
&irouting.ExtraDHTParams{
BootstrapPeers: bootstrapPeers,
Host: host,
Validator: validator,
Datastore: dstore,
Context: ctx,
BootstrapPeers: args.BootstrapPeers,
Host: args.Host,
Validator: args.Validator,
Datastore: args.Datastore,
Context: args.Ctx,
},
&irouting.ExtraHTTPParams{
PeerID: peerID,
@ -154,13 +130,7 @@ func ConstructDelegatedRouting(routers config.Routers, methods config.Methods, p
}
}
func constructNilRouting(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
func constructNilRouting(_ RoutingOptionArgs) (routing.Routing, error) {
return routinghelpers.Null{}, nil
}

View File

@ -103,7 +103,7 @@ require (
github.com/libp2p/go-doh-resolver v0.4.0 // indirect
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.22.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.23.0 // indirect
github.com/libp2p/go-libp2p-kbucket v0.5.0 // indirect
github.com/libp2p/go-libp2p-pubsub v0.9.3 // indirect
github.com/libp2p/go-libp2p-pubsub-router v0.6.0 // indirect
@ -188,6 +188,7 @@ require (
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
google.golang.org/grpc v1.53.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect

View File

@ -518,8 +518,8 @@ github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLE
github.com/libp2p/go-libp2p-asn-util v0.3.0/go.mod h1:B1mcOrKUE35Xq/ASTmQ4tN3LNzVVaMNmq2NACuqyB9w=
github.com/libp2p/go-libp2p-core v0.2.4/go.mod h1:STh4fdfa5vDYr0/SzYYeqnt+E6KfEV5VxfIrm0bcI0g=
github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw=
github.com/libp2p/go-libp2p-kad-dht v0.22.0 h1:cW2nGgG0hztDM42tOPyC5cVflD7EzLaHM0/Kjol6Wio=
github.com/libp2p/go-libp2p-kad-dht v0.22.0/go.mod h1:hareSo3Z/GJ7nUWPMj7XhD/56a7+rRltYCWwCuy3FQk=
github.com/libp2p/go-libp2p-kad-dht v0.23.0 h1:sxE6LxLopp79eLeV695n7+c77V/Vn4AMF28AdM/XFqM=
github.com/libp2p/go-libp2p-kad-dht v0.23.0/go.mod h1:oO5N308VT2msnQI6qi5M61wzPmJYg7Tr9e16m5n7uDU=
github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio=
github.com/libp2p/go-libp2p-kbucket v0.5.0 h1:g/7tVm8ACHDxH29BGrpsQlnNeu+6OF1A9bno/4/U1oA=
github.com/libp2p/go-libp2p-kbucket v0.5.0/go.mod h1:zGzGCpQd78b5BNTDGHNDLaTt9aDK/A02xeZp9QeFC4U=
@ -1191,6 +1191,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E=
gonum.org/v1/gonum v0.11.0/go.mod h1:fSG4YDCxxUZQJ7rKsQrj0gMOg00Il0Z96/qMA4bVQhA=
google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.1.0/go.mod h1:UGEZY7KEX120AnNLIHFMKIo4obdJhkp2tPbaPlQx13Y=

3
go.mod
View File

@ -47,7 +47,7 @@ require (
github.com/libp2p/go-doh-resolver v0.4.0
github.com/libp2p/go-libp2p v0.26.4
github.com/libp2p/go-libp2p-http v0.5.0
github.com/libp2p/go-libp2p-kad-dht v0.22.0
github.com/libp2p/go-libp2p-kad-dht v0.23.0
github.com/libp2p/go-libp2p-kbucket v0.5.0
github.com/libp2p/go-libp2p-pubsub v0.9.3
github.com/libp2p/go-libp2p-pubsub-router v0.6.0
@ -222,6 +222,7 @@ require (
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
google.golang.org/grpc v1.53.0 // indirect

6
go.sum
View File

@ -549,8 +549,8 @@ github.com/libp2p/go-libp2p-gostream v0.6.0 h1:QfAiWeQRce6pqnYfmIVWJFXNdDyfiR/qk
github.com/libp2p/go-libp2p-gostream v0.6.0/go.mod h1:Nywu0gYZwfj7Jc91PQvbGU8dIpqbQQkjWgDuOrFaRdA=
github.com/libp2p/go-libp2p-http v0.5.0 h1:+x0AbLaUuLBArHubbbNRTsgWz0RjNTy6DJLOxQ3/QBc=
github.com/libp2p/go-libp2p-http v0.5.0/go.mod h1:glh87nZ35XCQyFsdzZps6+F4HYI6DctVFY5u1fehwSg=
github.com/libp2p/go-libp2p-kad-dht v0.22.0 h1:cW2nGgG0hztDM42tOPyC5cVflD7EzLaHM0/Kjol6Wio=
github.com/libp2p/go-libp2p-kad-dht v0.22.0/go.mod h1:hareSo3Z/GJ7nUWPMj7XhD/56a7+rRltYCWwCuy3FQk=
github.com/libp2p/go-libp2p-kad-dht v0.23.0 h1:sxE6LxLopp79eLeV695n7+c77V/Vn4AMF28AdM/XFqM=
github.com/libp2p/go-libp2p-kad-dht v0.23.0/go.mod h1:oO5N308VT2msnQI6qi5M61wzPmJYg7Tr9e16m5n7uDU=
github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio=
github.com/libp2p/go-libp2p-kbucket v0.5.0 h1:g/7tVm8ACHDxH29BGrpsQlnNeu+6OF1A9bno/4/U1oA=
github.com/libp2p/go-libp2p-kbucket v0.5.0/go.mod h1:zGzGCpQd78b5BNTDGHNDLaTt9aDK/A02xeZp9QeFC4U=
@ -1282,6 +1282,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E=
gonum.org/v1/gonum v0.11.0/go.mod h1:fSG4YDCxxUZQJ7rKsQrj0gMOg00Il0Z96/qMA4bVQhA=
google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.1.0/go.mod h1:UGEZY7KEX120AnNLIHFMKIo4obdJhkp2tPbaPlQx13Y=

View File

@ -0,0 +1,30 @@
package cli
import (
"testing"
"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/ipfs/kubo/test/cli/testutils"
"github.com/stretchr/testify/assert"
)
func TestDHTOptimisticProvide(t *testing.T) {
t.Parallel()
t.Run("optimistic provide smoke test", func(t *testing.T) {
nodes := harness.NewT(t).NewNodes(2).Init()
nodes[0].UpdateConfig(func(cfg *config.Config) {
cfg.Experimental.OptimisticProvide = true
})
nodes.StartDaemons().Connect()
hash := nodes[0].IPFSAddStr(testutils.RandomStr(100))
nodes[0].IPFS("dht", "provide", hash)
res := nodes[1].IPFS("routing", "findprovs", "--num-providers=1", hash)
assert.Equal(t, nodes[0].PeerID().String(), res.Stdout.Trimmed())
})
}

View File

@ -218,8 +218,8 @@ func (n *Node) Init(ipfsArgs ...string) *Node {
//
// node.StartDaemonWithReq(harness.RunRequest{
// CmdOpts: []harness.CmdOpt{
// harness.RunWithStderr(os.Stdout),
// harness.RunWithStdout(os.Stdout),
// harness.RunWithStderr(os.Stdout),
// harness.RunWithStdout(os.Stdout),
// },
// })
func (n *Node) StartDaemonWithReq(req RunRequest) *Node {

View File

@ -10,3 +10,7 @@ func RandomBytes(n int) []byte {
}
return bytes
}
func RandomStr(n int) string {
return string(RandomBytes(n))
}