diff --git a/core/commands/pubsub.go b/core/commands/pubsub.go index 413e9656f..a7f9771f9 100644 --- a/core/commands/pubsub.go +++ b/core/commands/pubsub.go @@ -2,13 +2,21 @@ package commands import ( "bytes" + "context" "encoding/binary" "io" + "sync" + "time" + blocks "github.com/ipfs/go-ipfs/blocks" cmds "github.com/ipfs/go-ipfs/commands" + core "github.com/ipfs/go-ipfs/core" - floodsub "gx/ipfs/QmSWp1Yx7Z5pbpeCbUy6tfFj2DrHUe7tGQqyYC2vspbXH1/floodsub" + floodsub "gx/ipfs/QmQtsU1T46uxjFMd5r5PfyaY1HdV5jcxZbvvHbAVRL52hc/floodsub" u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util" + key "gx/ipfs/Qmce4Y4zg3sYr7xKM5UueS67vhNni6EeWgCRnb7MbLJMew/go-key" + pstore "gx/ipfs/QmdMfSLMDBDYhtc4oF3NYGCZr5dy4wQb6Ji26N4D4mdxa2/go-libp2p-peerstore" + cid "gx/ipfs/QmfSc2xehWmWLnwwYR91Y8QF4xdASypTFVknutoKQS3GHp/go-cid" ) var PubsubCmd = &cmds.Command{ @@ -41,6 +49,9 @@ to be used in a production environment. Arguments: []cmds.Argument{ cmds.StringArg("topic", true, false, "String name of topic to subscribe to."), }, + Options: []cmds.Option{ + cmds.BoolOption("discover", "try to discover other peers subscribed to the same topic"), + }, Run: func(req cmds.Request, res cmds.Response) { n, err := req.InvocContext().GetNode() if err != nil { @@ -79,6 +90,18 @@ to be used in a production environment. } } }() + + discover, _, _ := req.Option("discover").Bool() + if discover { + blk := blocks.NewBlock([]byte("floodsub:" + topic)) + cid, err := n.Blocks.AddObject(blk) + if err != nil { + log.Error("pubsub discovery: ", err) + return + } + + connectToPubSubPeers(req.Context(), n, cid) + } }, Marshalers: cmds.MarshalerMap{ cmds.Text: getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) { @@ -97,6 +120,30 @@ to be used in a production environment. Type: floodsub.Message{}, } +func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid *cid.Cid) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + provs := n.Routing.FindProvidersAsync(ctx, key.Key(cid.Hash()), 10) + wg := &sync.WaitGroup{} + for p := range provs { + wg.Add(1) + go func(pi pstore.PeerInfo) { + defer wg.Done() + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + err := n.PeerHost.Connect(ctx, pi) + if err != nil { + log.Info("pubsub discover: ", err) + return + } + log.Info("connected to pubsub peer:", pi.ID) + }(p) + } + + wg.Wait() +} + func getPsMsgMarshaler(f func(m *floodsub.Message) (io.Reader, error)) func(cmds.Response) (io.Reader, error) { return func(res cmds.Response) (io.Reader, error) { outChan, ok := res.Output().(<-chan interface{}) diff --git a/core/core.go b/core/core.go index 77970da4f..a2709b8df 100644 --- a/core/core.go +++ b/core/core.go @@ -17,9 +17,9 @@ import ( "time" diag "github.com/ipfs/go-ipfs/diagnostics" + floodsub "gx/ipfs/QmQtsU1T46uxjFMd5r5PfyaY1HdV5jcxZbvvHbAVRL52hc/floodsub" goprocess "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" mamask "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter" - floodsub "gx/ipfs/QmSWp1Yx7Z5pbpeCbUy6tfFj2DrHUe7tGQqyYC2vspbXH1/floodsub" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" b58 "gx/ipfs/QmT8rehPR3F6bmwL6zjUN8XpiDBFFpMP2myPdC6ApsWfJf/go-base58" discovery "gx/ipfs/QmUuwQUJmtvC6ReYcu7xaYKEUM3pD46H18dFn3LBhVt2Di/go-libp2p/p2p/discovery" diff --git a/package.json b/package.json index e77f35db3..187feca31 100644 --- a/package.json +++ b/package.json @@ -242,9 +242,9 @@ }, { "author": "whyrusleeping", - "hash": "QmSWp1Yx7Z5pbpeCbUy6tfFj2DrHUe7tGQqyYC2vspbXH1", + "hash": "QmQtsU1T46uxjFMd5r5PfyaY1HdV5jcxZbvvHbAVRL52hc", "name": "floodsub", - "version": "0.4.1" + "version": "0.4.2" } ], "gxVersion": "0.4.0",