added support for an experimental batched provider system

The batched provider system is enabled when the experimental AcceleratedDHTClient is enabled

There is also an `ipfs stats provide` command which gives stats about the providing/reproviding system when the batched provider system is enabled
This commit is contained in:
Adin Schmahmann 2021-05-12 23:56:19 -04:00
parent 2fd55d198c
commit cf45f1a862
7 changed files with 114 additions and 9 deletions

View File

@ -210,6 +210,7 @@ func TestCommands(t *testing.T) {
"/stats/bitswap",
"/stats/bw",
"/stats/dht",
"/stats/provide",
"/stats/repo",
"/swarm",
"/swarm/addrs",

View File

@ -30,6 +30,7 @@ for your IPFS node.`,
"repo": repoStatCmd,
"bitswap": bitswapStatCmd,
"dht": statDhtCmd,
"provide": statProvideCmd,
},
}

View File

@ -0,0 +1,51 @@
package commands
import (
"fmt"
cmds "github.com/ipfs/go-ipfs-cmds"
"github.com/ipfs/go-ipfs/core/commands/cmdenv"
"github.com/ipfs/go-ipfs-provider/batched"
)
var statProvideCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Returns statistics about the node's (re)provider system.",
ShortDescription: `
Returns statistics about the content the node is advertising.
This interface is not stable and may change from release to release.
`,
},
Arguments: []cmds.Argument{},
Options: []cmds.Option{},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
nd, err := cmdenv.GetNode(env)
if err != nil {
return err
}
if !nd.IsOnline {
return ErrNotOnline
}
sys, ok := nd.Provider.(*batched.BatchProvidingSystem)
if !ok {
return fmt.Errorf("can only return stats if Experimental.AcceleratedDHTClient is enabled")
}
stats, err := sys.Stat(req.Context)
if err != nil {
return err
}
if err := res.Emit(stats); err != nil {
return err
}
return nil
},
Encoders: cmds.EncoderMap{},
Type: batched.BatchedProviderStats{},
}

View File

@ -275,7 +275,7 @@ func Online(bcfg *BuildCfg, cfg *config.Config) fx.Option {
fx.Provide(p2p.New),
LibP2P(bcfg, cfg),
OnlineProviders(cfg.Experimental.StrategicProviding, cfg.Reprovider.Strategy, cfg.Reprovider.Interval),
OnlineProviders(cfg.Experimental.StrategicProviding, cfg.Experimental.AcceleratedDHTClient, cfg.Reprovider.Strategy, cfg.Reprovider.Interval),
)
}
@ -286,7 +286,7 @@ func Offline(cfg *config.Config) fx.Option {
fx.Provide(DNSResolver),
fx.Provide(Namesys(0)),
fx.Provide(offroute.NewOfflineRouter),
OfflineProviders(cfg.Experimental.StrategicProviding, cfg.Reprovider.Strategy, cfg.Reprovider.Interval),
OfflineProviders(cfg.Experimental.StrategicProviding, cfg.Experimental.AcceleratedDHTClient, cfg.Reprovider.Strategy, cfg.Reprovider.Interval),
)
}

View File

@ -7,13 +7,16 @@ import (
"github.com/ipfs/go-ipfs-pinner"
"github.com/ipfs/go-ipfs-provider"
"github.com/ipfs/go-ipfs-provider/batched"
q "github.com/ipfs/go-ipfs-provider/queue"
"github.com/ipfs/go-ipfs-provider/simple"
ipld "github.com/ipfs/go-ipld-format"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multihash"
"go.uber.org/fx"
"github.com/ipfs/go-ipfs/core/node/helpers"
"github.com/ipfs/go-ipfs/core/node/libp2p"
"github.com/ipfs/go-ipfs/repo"
)
@ -59,29 +62,78 @@ func SimpleProviderSys(isOnline bool) interface{} {
}
}
type provideMany interface {
ProvideMany(ctx context.Context, keys []multihash.Multihash) error
Ready() bool
}
// BatchedProviderSys creates new provider system
func BatchedProviderSys(isOnline bool, reprovideInterval string) interface{} {
return func(lc fx.Lifecycle, cr libp2p.BaseIpfsRouting, q *q.Queue, keyProvider simple.KeyChanFunc, repo repo.Repo) (provider.System, error) {
r, ok := (cr).(provideMany)
if !ok {
return nil, fmt.Errorf("BatchedProviderSys requires a content router that supports provideMany")
}
reprovideIntervalDuration := kReprovideFrequency
if reprovideInterval != "" {
dur, err := time.ParseDuration(reprovideInterval)
if err != nil {
return nil, err
}
reprovideIntervalDuration = dur
}
sys, err := batched.New(r, q,
batched.ReproviderInterval(reprovideIntervalDuration),
batched.Datastore(repo.Datastore()),
batched.KeyProvider(keyProvider))
if err != nil {
return nil, err
}
if isOnline {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
sys.Run()
return nil
},
OnStop: func(ctx context.Context) error {
return sys.Close()
},
})
}
return sys, nil
}
}
// ONLINE/OFFLINE
// OnlineProviders groups units managing provider routing records online
func OnlineProviders(useStrategicProviding bool, reprovideStrategy string, reprovideInterval string) fx.Option {
func OnlineProviders(useStrategicProviding bool, useBatchedProviding bool, reprovideStrategy string, reprovideInterval string) fx.Option {
if useStrategicProviding {
return fx.Provide(provider.NewOfflineProvider)
}
return fx.Options(
SimpleProviders(reprovideStrategy, reprovideInterval),
fx.Provide(SimpleProviderSys(true)),
maybeProvide(SimpleProviderSys(true), !useBatchedProviding),
maybeProvide(BatchedProviderSys(true, reprovideInterval), useBatchedProviding),
)
}
// OfflineProviders groups units managing provider routing records offline
func OfflineProviders(useStrategicProviding bool, reprovideStrategy string, reprovideInterval string) fx.Option {
func OfflineProviders(useStrategicProviding bool, useBatchedProviding bool, reprovideStrategy string, reprovideInterval string) fx.Option {
if useStrategicProviding {
return fx.Provide(provider.NewOfflineProvider)
}
return fx.Options(
SimpleProviders(reprovideStrategy, reprovideInterval),
fx.Provide(SimpleProviderSys(false)),
maybeProvide(SimpleProviderSys(false), true),
//maybeProvide(BatchedProviderSys(false, reprovideInterval), useBatchedProviding),
)
}

2
go.mod
View File

@ -36,7 +36,7 @@ require (
github.com/ipfs/go-ipfs-keystore v0.0.2
github.com/ipfs/go-ipfs-pinner v0.1.1
github.com/ipfs/go-ipfs-posinfo v0.0.1
github.com/ipfs/go-ipfs-provider v0.4.3
github.com/ipfs/go-ipfs-provider v0.4.4-0.20210513014626-1c19caa05024
github.com/ipfs/go-ipfs-routing v0.1.0
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-ipld-cbor v0.0.5

4
go.sum
View File

@ -436,8 +436,8 @@ github.com/ipfs/go-ipfs-posinfo v0.0.1/go.mod h1:SwyeVP+jCwiDu0C313l/8jg6ZxM0qqt
github.com/ipfs/go-ipfs-pq v0.0.1/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY=
github.com/ipfs/go-ipfs-pq v0.0.2/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-provider v0.4.3 h1:k54OHXZcFBkhL6l3GnPS9PfpaLeLqZjVASG1bgfBdfQ=
github.com/ipfs/go-ipfs-provider v0.4.3/go.mod h1:rcQBVqfblDQRk5LaCtf2uxuKxMJxvKmF5pLS0pO4au4=
github.com/ipfs/go-ipfs-provider v0.4.4-0.20210513014626-1c19caa05024 h1:eYfdZ27ogtwfnwKdfphOwcQ7PEOjKqXlWzVOakK0a60=
github.com/ipfs/go-ipfs-provider v0.4.4-0.20210513014626-1c19caa05024/go.mod h1:kUMTf1R8c+KgWUWKTGSZiXCDZWMCkxCX3wyepk0cYEA=
github.com/ipfs/go-ipfs-routing v0.0.1/go.mod h1:k76lf20iKFxQTjcJokbPM9iBXVXVZhcOwc360N4nuKs=
github.com/ipfs/go-ipfs-routing v0.1.0 h1:gAJTT1cEeeLj6/DlLX6t+NxD9fQe2ymTO6qWRDI/HQQ=
github.com/ipfs/go-ipfs-routing v0.1.0/go.mod h1:hYoUkJLyAUKhF58tysKpids8RNDPO42BVMgK5dNsoqY=