Merge pull request #7394 from ipfs/fix/pubsub

fix two pubsub issues.
This commit is contained in:
Steven Allen 2020-06-01 14:26:29 -07:00 committed by GitHub
commit cf68619b80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 14 additions and 60 deletions

View File

@ -74,7 +74,7 @@ This command outputs data in the following encodings:
cmds.StringArg("topic", true, false, "String name of topic to subscribe to."),
},
Options: []cmds.Option{
cmds.BoolOption(pubsubDiscoverOptionName, "try to discover other peers subscribed to the same topic"),
cmds.BoolOption(pubsubDiscoverOptionName, "Deprecated option to instruct pubsub to discovery peers for the topic. Discovery is now built into pubsub."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
api, err := cmdenv.GetApi(env, req)
@ -83,9 +83,7 @@ This command outputs data in the following encodings:
}
topic := req.Arguments[0]
discover, _ := req.Options[pubsubDiscoverOptionName].(bool)
sub, err := api.PubSub().Subscribe(req.Context, topic, options.PubSub.Discover(discover))
sub, err := api.PubSub().Subscribe(req.Context, topic)
if err != nil {
return err
}

View File

@ -26,7 +26,6 @@ import (
"github.com/ipfs/go-ipfs-provider"
offlineroute "github.com/ipfs/go-ipfs-routing/offline"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
dag "github.com/ipfs/go-merkledag"
coreiface "github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/options"
@ -44,8 +43,6 @@ import (
"github.com/ipfs/go-ipfs/repo"
)
var log = logging.Logger("core/coreapi")
type CoreAPI struct {
nctx context.Context

View File

@ -3,14 +3,9 @@ package coreapi
import (
"context"
"errors"
"strings"
"sync"
"time"
cid "github.com/ipfs/go-cid"
coreiface "github.com/ipfs/interface-go-ipfs-core"
caopts "github.com/ipfs/interface-go-ipfs-core/options"
p2phost "github.com/libp2p/go-libp2p-core/host"
peer "github.com/libp2p/go-libp2p-core/peer"
routing "github.com/libp2p/go-libp2p-core/routing"
pubsub "github.com/libp2p/go-libp2p-pubsub"
@ -19,7 +14,6 @@ import (
type PubSubAPI CoreAPI
type pubSubSubscription struct {
cancel context.CancelFunc
subscription *pubsub.Subscription
}
@ -61,12 +55,16 @@ func (api *PubSubAPI) Publish(ctx context.Context, topic string, data []byte) er
}
func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (coreiface.PubSubSubscription, error) {
options, err := caopts.PubSubSubscribeOptions(opts...)
// Parse the options to avoid introducing silent failures for invalid
// options. However, we don't currently have any use for them. The only
// subscription option, discovery, is now a no-op as it's handled by
// pubsub itself.
_, err := caopts.PubSubSubscribeOptions(opts...)
if err != nil {
return nil, err
}
r, err := api.checkNode()
_, err = api.checkNode()
if err != nil {
return nil, err
}
@ -77,45 +75,7 @@ func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopt
return nil, err
}
pubctx, cancel := context.WithCancel(api.nctx)
if options.Discover {
go func() {
blk, err := api.core().Block().Put(pubctx, strings.NewReader("floodsub:"+topic))
if err != nil {
log.Error("pubsub discovery: ", err)
return
}
connectToPubSubPeers(pubctx, r, api.peerHost, blk.Path().Cid())
}()
}
return &pubSubSubscription{cancel, sub}, nil
}
func connectToPubSubPeers(ctx context.Context, r routing.Routing, ph p2phost.Host, cid cid.Cid) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
provs := r.FindProvidersAsync(ctx, cid, 10)
var wg sync.WaitGroup
for p := range provs {
wg.Add(1)
go func(pi peer.AddrInfo) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
err := ph.Connect(ctx, pi)
if err != nil {
log.Info("pubsub discover: ", err)
return
}
log.Info("connected to pubsub peer:", pi.ID)
}(p)
}
wg.Wait()
return &pubSubSubscription{sub}, nil
}
func (api *PubSubAPI) checkNode() (routing.Routing, error) {
@ -132,7 +92,6 @@ func (api *PubSubAPI) checkNode() (routing.Routing, error) {
}
func (sub *pubSubSubscription) Close() error {
sub.cancel()
sub.subscription.Cancel()
return nil
}
@ -161,7 +120,3 @@ func (msg *pubSubMessage) Seq() []byte {
func (msg *pubSubMessage) Topics() []string {
return msg.msg.TopicIDs
}
func (api *PubSubAPI) core() coreiface.CoreAPI {
return (*CoreAPI)(api)
}

View File

@ -17,6 +17,10 @@ func FloodSub(pubsubOptions ...pubsub.Option) interface{} {
func GossipSub(pubsubOptions ...pubsub.Option) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, disc discovery.Discovery) (service *pubsub.PubSub, err error) {
return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, append(pubsubOptions, pubsub.WithDiscovery(disc))...)
return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, append(
pubsubOptions,
pubsub.WithDiscovery(disc),
pubsub.WithFloodPublish(true))...,
)
}
}