From cf45f1a8621b965eb89bebaca5574bb8cb53ca43 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 23:56:19 -0400 Subject: [PATCH] added support for an experimental batched provider system The batched provider system is enabled when the experimental AcceleratedDHTClient is enabled There is also an `ipfs stats provide` command which gives stats about the providing/reproviding system when the batched provider system is enabled --- core/commands/commands_test.go | 1 + core/commands/stat.go | 1 + core/commands/stat_provide.go | 51 +++++++++++++++++++++++++++++ core/node/groups.go | 4 +-- core/node/provider.go | 60 +++++++++++++++++++++++++++++++--- go.mod | 2 +- go.sum | 4 +-- 7 files changed, 114 insertions(+), 9 deletions(-) create mode 100644 core/commands/stat_provide.go diff --git a/core/commands/commands_test.go b/core/commands/commands_test.go index ef106acb3..81f07c01b 100644 --- a/core/commands/commands_test.go +++ b/core/commands/commands_test.go @@ -210,6 +210,7 @@ func TestCommands(t *testing.T) { "/stats/bitswap", "/stats/bw", "/stats/dht", + "/stats/provide", "/stats/repo", "/swarm", "/swarm/addrs", diff --git a/core/commands/stat.go b/core/commands/stat.go index cc51d7123..e38b5b31b 100644 --- a/core/commands/stat.go +++ b/core/commands/stat.go @@ -30,6 +30,7 @@ for your IPFS node.`, "repo": repoStatCmd, "bitswap": bitswapStatCmd, "dht": statDhtCmd, + "provide": statProvideCmd, }, } diff --git a/core/commands/stat_provide.go b/core/commands/stat_provide.go new file mode 100644 index 000000000..ac02c344c --- /dev/null +++ b/core/commands/stat_provide.go @@ -0,0 +1,51 @@ +package commands + +import ( + "fmt" + + cmds "github.com/ipfs/go-ipfs-cmds" + "github.com/ipfs/go-ipfs/core/commands/cmdenv" + + "github.com/ipfs/go-ipfs-provider/batched" +) + +var statProvideCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Returns statistics about the node's (re)provider system.", + ShortDescription: ` +Returns statistics about the content the node is advertising. + +This interface is not stable and may change from release to release. +`, + }, + Arguments: []cmds.Argument{}, + Options: []cmds.Option{}, + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + nd, err := cmdenv.GetNode(env) + if err != nil { + return err + } + + if !nd.IsOnline { + return ErrNotOnline + } + + sys, ok := nd.Provider.(*batched.BatchProvidingSystem) + if !ok { + return fmt.Errorf("can only return stats if Experimental.AcceleratedDHTClient is enabled") + } + + stats, err := sys.Stat(req.Context) + if err != nil { + return err + } + + if err := res.Emit(stats); err != nil { + return err + } + + return nil + }, + Encoders: cmds.EncoderMap{}, + Type: batched.BatchedProviderStats{}, +} diff --git a/core/node/groups.go b/core/node/groups.go index f55b052c3..8d37f84ce 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -275,7 +275,7 @@ func Online(bcfg *BuildCfg, cfg *config.Config) fx.Option { fx.Provide(p2p.New), LibP2P(bcfg, cfg), - OnlineProviders(cfg.Experimental.StrategicProviding, cfg.Reprovider.Strategy, cfg.Reprovider.Interval), + OnlineProviders(cfg.Experimental.StrategicProviding, cfg.Experimental.AcceleratedDHTClient, cfg.Reprovider.Strategy, cfg.Reprovider.Interval), ) } @@ -286,7 +286,7 @@ func Offline(cfg *config.Config) fx.Option { fx.Provide(DNSResolver), fx.Provide(Namesys(0)), fx.Provide(offroute.NewOfflineRouter), - OfflineProviders(cfg.Experimental.StrategicProviding, cfg.Reprovider.Strategy, cfg.Reprovider.Interval), + OfflineProviders(cfg.Experimental.StrategicProviding, cfg.Experimental.AcceleratedDHTClient, cfg.Reprovider.Strategy, cfg.Reprovider.Interval), ) } diff --git a/core/node/provider.go b/core/node/provider.go index 52d48036e..e865d2b5f 100644 --- a/core/node/provider.go +++ b/core/node/provider.go @@ -7,13 +7,16 @@ import ( "github.com/ipfs/go-ipfs-pinner" "github.com/ipfs/go-ipfs-provider" + "github.com/ipfs/go-ipfs-provider/batched" q "github.com/ipfs/go-ipfs-provider/queue" "github.com/ipfs/go-ipfs-provider/simple" ipld "github.com/ipfs/go-ipld-format" "github.com/libp2p/go-libp2p-core/routing" + "github.com/multiformats/go-multihash" "go.uber.org/fx" "github.com/ipfs/go-ipfs/core/node/helpers" + "github.com/ipfs/go-ipfs/core/node/libp2p" "github.com/ipfs/go-ipfs/repo" ) @@ -59,29 +62,78 @@ func SimpleProviderSys(isOnline bool) interface{} { } } +type provideMany interface { + ProvideMany(ctx context.Context, keys []multihash.Multihash) error + Ready() bool +} + +// BatchedProviderSys creates new provider system +func BatchedProviderSys(isOnline bool, reprovideInterval string) interface{} { + return func(lc fx.Lifecycle, cr libp2p.BaseIpfsRouting, q *q.Queue, keyProvider simple.KeyChanFunc, repo repo.Repo) (provider.System, error) { + r, ok := (cr).(provideMany) + if !ok { + return nil, fmt.Errorf("BatchedProviderSys requires a content router that supports provideMany") + } + + reprovideIntervalDuration := kReprovideFrequency + if reprovideInterval != "" { + dur, err := time.ParseDuration(reprovideInterval) + if err != nil { + return nil, err + } + + reprovideIntervalDuration = dur + } + + sys, err := batched.New(r, q, + batched.ReproviderInterval(reprovideIntervalDuration), + batched.Datastore(repo.Datastore()), + batched.KeyProvider(keyProvider)) + if err != nil { + return nil, err + } + + if isOnline { + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + sys.Run() + return nil + }, + OnStop: func(ctx context.Context) error { + return sys.Close() + }, + }) + } + + return sys, nil + } +} + // ONLINE/OFFLINE // OnlineProviders groups units managing provider routing records online -func OnlineProviders(useStrategicProviding bool, reprovideStrategy string, reprovideInterval string) fx.Option { +func OnlineProviders(useStrategicProviding bool, useBatchedProviding bool, reprovideStrategy string, reprovideInterval string) fx.Option { if useStrategicProviding { return fx.Provide(provider.NewOfflineProvider) } return fx.Options( SimpleProviders(reprovideStrategy, reprovideInterval), - fx.Provide(SimpleProviderSys(true)), + maybeProvide(SimpleProviderSys(true), !useBatchedProviding), + maybeProvide(BatchedProviderSys(true, reprovideInterval), useBatchedProviding), ) } // OfflineProviders groups units managing provider routing records offline -func OfflineProviders(useStrategicProviding bool, reprovideStrategy string, reprovideInterval string) fx.Option { +func OfflineProviders(useStrategicProviding bool, useBatchedProviding bool, reprovideStrategy string, reprovideInterval string) fx.Option { if useStrategicProviding { return fx.Provide(provider.NewOfflineProvider) } return fx.Options( SimpleProviders(reprovideStrategy, reprovideInterval), - fx.Provide(SimpleProviderSys(false)), + maybeProvide(SimpleProviderSys(false), true), + //maybeProvide(BatchedProviderSys(false, reprovideInterval), useBatchedProviding), ) } diff --git a/go.mod b/go.mod index 213887753..99c391981 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/ipfs/go-ipfs-keystore v0.0.2 github.com/ipfs/go-ipfs-pinner v0.1.1 github.com/ipfs/go-ipfs-posinfo v0.0.1 - github.com/ipfs/go-ipfs-provider v0.4.3 + github.com/ipfs/go-ipfs-provider v0.4.4-0.20210513014626-1c19caa05024 github.com/ipfs/go-ipfs-routing v0.1.0 github.com/ipfs/go-ipfs-util v0.0.2 github.com/ipfs/go-ipld-cbor v0.0.5 diff --git a/go.sum b/go.sum index b9772aeee..49d9cc81d 100644 --- a/go.sum +++ b/go.sum @@ -436,8 +436,8 @@ github.com/ipfs/go-ipfs-posinfo v0.0.1/go.mod h1:SwyeVP+jCwiDu0C313l/8jg6ZxM0qqt github.com/ipfs/go-ipfs-pq v0.0.1/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY= github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY= github.com/ipfs/go-ipfs-pq v0.0.2/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY= -github.com/ipfs/go-ipfs-provider v0.4.3 h1:k54OHXZcFBkhL6l3GnPS9PfpaLeLqZjVASG1bgfBdfQ= -github.com/ipfs/go-ipfs-provider v0.4.3/go.mod h1:rcQBVqfblDQRk5LaCtf2uxuKxMJxvKmF5pLS0pO4au4= +github.com/ipfs/go-ipfs-provider v0.4.4-0.20210513014626-1c19caa05024 h1:eYfdZ27ogtwfnwKdfphOwcQ7PEOjKqXlWzVOakK0a60= +github.com/ipfs/go-ipfs-provider v0.4.4-0.20210513014626-1c19caa05024/go.mod h1:kUMTf1R8c+KgWUWKTGSZiXCDZWMCkxCX3wyepk0cYEA= github.com/ipfs/go-ipfs-routing v0.0.1/go.mod h1:k76lf20iKFxQTjcJokbPM9iBXVXVZhcOwc360N4nuKs= github.com/ipfs/go-ipfs-routing v0.1.0 h1:gAJTT1cEeeLj6/DlLX6t+NxD9fQe2ymTO6qWRDI/HQQ= github.com/ipfs/go-ipfs-routing v0.1.0/go.mod h1:hYoUkJLyAUKhF58tysKpids8RNDPO42BVMgK5dNsoqY=