diff --git a/core/commands/pubsub.go b/core/commands/pubsub.go index aaf01672b..10bddcd57 100644 --- a/core/commands/pubsub.go +++ b/core/commands/pubsub.go @@ -74,7 +74,7 @@ This command outputs data in the following encodings: cmds.StringArg("topic", true, false, "String name of topic to subscribe to."), }, Options: []cmds.Option{ - cmds.BoolOption(pubsubDiscoverOptionName, "try to discover other peers subscribed to the same topic"), + cmds.BoolOption(pubsubDiscoverOptionName, "Deprecated option to instruct pubsub to discovery peers for the topic. Discovery is now built into pubsub."), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { api, err := cmdenv.GetApi(env, req) @@ -83,9 +83,7 @@ This command outputs data in the following encodings: } topic := req.Arguments[0] - discover, _ := req.Options[pubsubDiscoverOptionName].(bool) - - sub, err := api.PubSub().Subscribe(req.Context, topic, options.PubSub.Discover(discover)) + sub, err := api.PubSub().Subscribe(req.Context, topic) if err != nil { return err } diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index 3235b7d39..5b638826b 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -26,7 +26,6 @@ import ( "github.com/ipfs/go-ipfs-provider" offlineroute "github.com/ipfs/go-ipfs-routing/offline" ipld "github.com/ipfs/go-ipld-format" - logging "github.com/ipfs/go-log" dag "github.com/ipfs/go-merkledag" coreiface "github.com/ipfs/interface-go-ipfs-core" "github.com/ipfs/interface-go-ipfs-core/options" @@ -44,8 +43,6 @@ import ( "github.com/ipfs/go-ipfs/repo" ) -var log = logging.Logger("core/coreapi") - type CoreAPI struct { nctx context.Context diff --git a/core/coreapi/pubsub.go b/core/coreapi/pubsub.go index de2ce4672..e84d15034 100644 --- a/core/coreapi/pubsub.go +++ b/core/coreapi/pubsub.go @@ -3,14 +3,9 @@ package coreapi import ( "context" "errors" - "strings" - "sync" - "time" - cid "github.com/ipfs/go-cid" coreiface "github.com/ipfs/interface-go-ipfs-core" caopts "github.com/ipfs/interface-go-ipfs-core/options" - p2phost "github.com/libp2p/go-libp2p-core/host" peer "github.com/libp2p/go-libp2p-core/peer" routing "github.com/libp2p/go-libp2p-core/routing" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -19,7 +14,6 @@ import ( type PubSubAPI CoreAPI type pubSubSubscription struct { - cancel context.CancelFunc subscription *pubsub.Subscription } @@ -61,12 +55,16 @@ func (api *PubSubAPI) Publish(ctx context.Context, topic string, data []byte) er } func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (coreiface.PubSubSubscription, error) { - options, err := caopts.PubSubSubscribeOptions(opts...) + // Parse the options to avoid introducing silent failures for invalid + // options. However, we don't currently have any use for them. The only + // subscription option, discovery, is now a no-op as it's handled by + // pubsub itself. + _, err := caopts.PubSubSubscribeOptions(opts...) if err != nil { return nil, err } - r, err := api.checkNode() + _, err = api.checkNode() if err != nil { return nil, err } @@ -77,45 +75,7 @@ func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopt return nil, err } - pubctx, cancel := context.WithCancel(api.nctx) - - if options.Discover { - go func() { - blk, err := api.core().Block().Put(pubctx, strings.NewReader("floodsub:"+topic)) - if err != nil { - log.Error("pubsub discovery: ", err) - return - } - - connectToPubSubPeers(pubctx, r, api.peerHost, blk.Path().Cid()) - }() - } - - return &pubSubSubscription{cancel, sub}, nil -} - -func connectToPubSubPeers(ctx context.Context, r routing.Routing, ph p2phost.Host, cid cid.Cid) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - provs := r.FindProvidersAsync(ctx, cid, 10) - var wg sync.WaitGroup - for p := range provs { - wg.Add(1) - go func(pi peer.AddrInfo) { - defer wg.Done() - ctx, cancel := context.WithTimeout(ctx, time.Second*10) - defer cancel() - err := ph.Connect(ctx, pi) - if err != nil { - log.Info("pubsub discover: ", err) - return - } - log.Info("connected to pubsub peer:", pi.ID) - }(p) - } - - wg.Wait() + return &pubSubSubscription{sub}, nil } func (api *PubSubAPI) checkNode() (routing.Routing, error) { @@ -132,7 +92,6 @@ func (api *PubSubAPI) checkNode() (routing.Routing, error) { } func (sub *pubSubSubscription) Close() error { - sub.cancel() sub.subscription.Cancel() return nil } @@ -161,7 +120,3 @@ func (msg *pubSubMessage) Seq() []byte { func (msg *pubSubMessage) Topics() []string { return msg.msg.TopicIDs } - -func (api *PubSubAPI) core() coreiface.CoreAPI { - return (*CoreAPI)(api) -} diff --git a/core/node/libp2p/pubsub.go b/core/node/libp2p/pubsub.go index 001a787da..e8b3e57fb 100644 --- a/core/node/libp2p/pubsub.go +++ b/core/node/libp2p/pubsub.go @@ -17,6 +17,10 @@ func FloodSub(pubsubOptions ...pubsub.Option) interface{} { func GossipSub(pubsubOptions ...pubsub.Option) interface{} { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, disc discovery.Discovery) (service *pubsub.PubSub, err error) { - return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, append(pubsubOptions, pubsub.WithDiscovery(disc))...) + return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, append( + pubsubOptions, + pubsub.WithDiscovery(disc), + pubsub.WithFloodPublish(true))..., + ) } }