From 4b096c4bba60ffac1789cd5b58a42d44eb597f3b Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 10 Sep 2016 06:48:08 -0700 Subject: [PATCH] floodsub: add api for pub/sub License: MIT Signed-off-by: Jeromy --- core/commands/pubsub.go | 162 ++++++++++++++++++++++++++++++++++++++++ core/commands/root.go | 1 + core/core.go | 7 +- package.json | 6 ++ 4 files changed, 175 insertions(+), 1 deletion(-) create mode 100644 core/commands/pubsub.go diff --git a/core/commands/pubsub.go b/core/commands/pubsub.go new file mode 100644 index 000000000..16b937d13 --- /dev/null +++ b/core/commands/pubsub.go @@ -0,0 +1,162 @@ +package commands + +import ( + "bytes" + "encoding/binary" + "io" + + cmds "github.com/ipfs/go-ipfs/commands" + + floodsub "gx/ipfs/QmQriRMW5cCJyLrzDnXi7fZ5mVbetiEZjPjbqoJhuSL94m/floodsub" + u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util" +) + +var PubsubCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "An experimental publish-subscribe system on ipfs.", + ShortDescription: ` +ipfs pubsub allows you to publish messages to a given topic, and also to +subscribe to new messages on a given topic. + +This is an experimental feature. It is not intended in its current state +to be used in a production environment. +`, + }, + Subcommands: map[string]*cmds.Command{ + "pub": PubsubPubCmd, + "sub": PubsubSubCmd, + }, +} + +var PubsubSubCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Subscribe to messages on a given topic.", + ShortDescription: ` +ipfs pubsub sub subscribes to messages on a given topic. + +This is an experimental feature. It is not intended in its current state +to be used in a production environment. +`, + }, + Arguments: []cmds.Argument{ + cmds.StringArg("topic", true, false, "String name of topic to subscribe to."), + }, + Run: func(req cmds.Request, res cmds.Response) { + n, err := req.InvocContext().GetNode() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + // Must be online! + if !n.OnlineMode() { + res.SetError(errNotOnline, cmds.ErrClient) + return + } + + topic := req.Arguments()[0] + msgs, err := n.Floodsub.Subscribe(topic) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + out := make(chan interface{}) + res.SetOutput((<-chan interface{})(out)) + + ctx := req.Context() + go func() { + defer close(out) + for { + select { + case msg, ok := <-msgs: + if !ok { + return + } + out <- msg + case <-ctx.Done(): + n.Floodsub.Unsub(topic) + } + } + }() + }, + Marshalers: cmds.MarshalerMap{ + cmds.Text: getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) { + log.Error("FROM: ", m.GetFrom()) + return bytes.NewReader(m.Data), nil + }), + "ndpayload": getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) { + m.Data = append(m.Data, '\n') + return bytes.NewReader(m.Data), nil + }), + "lenpayload": getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) { + buf := make([]byte, 8) + n := binary.PutUvarint(buf, uint64(len(m.Data))) + return io.MultiReader(bytes.NewReader(buf[:n]), bytes.NewReader(m.Data)), nil + }), + }, + Type: floodsub.Message{}, +} + +func getPsMsgMarshaler(f func(m *floodsub.Message) (io.Reader, error)) func(cmds.Response) (io.Reader, error) { + return func(res cmds.Response) (io.Reader, error) { + outChan, ok := res.Output().(<-chan interface{}) + if !ok { + return nil, u.ErrCast() + } + + marshal := func(v interface{}) (io.Reader, error) { + obj, ok := v.(*floodsub.Message) + if !ok { + return nil, u.ErrCast() + } + + return f(obj) + } + + return &cmds.ChannelMarshaler{ + Channel: outChan, + Marshaler: marshal, + Res: res, + }, nil + } +} + +var PubsubPubCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Publish a message to a given pubsub topic.", + ShortDescription: ` +ipfs pubsub pub publishes a message to a specified topic. + +This is an experimental feature. It is not intended in its current state +to be used in a production environment. +`, + }, + Arguments: []cmds.Argument{ + cmds.StringArg("topic", true, false, "Topic to publish to."), + cmds.StringArg("data", true, true, "Payload of message to publish.").EnableStdin(), + }, + Options: []cmds.Option{}, + Run: func(req cmds.Request, res cmds.Response) { + n, err := req.InvocContext().GetNode() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + // Must be online! + if !n.OnlineMode() { + res.SetError(errNotOnline, cmds.ErrClient) + return + } + + topic := req.Arguments()[0] + + for _, data := range req.Arguments()[1:] { + if err := n.Floodsub.Publish(topic, []byte(data)); err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + } + }, +} diff --git a/core/commands/root.go b/core/commands/root.go index 03a447d43..1826725ac 100644 --- a/core/commands/root.go +++ b/core/commands/root.go @@ -105,6 +105,7 @@ var rootSubcommands = map[string]*cmds.Command{ "stats": StatsCmd, "swarm": SwarmCmd, "tar": TarCmd, + "pubsub": PubsubCmd, "tour": tourCmd, "file": unixfs.UnixFSCmd, "update": ExternalBinary(), diff --git a/core/core.go b/core/core.go index 013ab0bcf..977e1068d 100644 --- a/core/core.go +++ b/core/core.go @@ -17,7 +17,8 @@ import ( "time" diag "github.com/ipfs/go-ipfs/diagnostics" - goprocess "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess" + floodsub "gx/ipfs/QmQriRMW5cCJyLrzDnXi7fZ5mVbetiEZjPjbqoJhuSL94m/floodsub" + goprocess "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" mamask "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" b58 "gx/ipfs/QmT8rehPR3F6bmwL6zjUN8XpiDBFFpMP2myPdC6ApsWfJf/go-base58" @@ -112,6 +113,8 @@ type IpfsNode struct { Reprovider *rp.Reprovider // the value reprovider system IpnsRepub *ipnsrp.Republisher + Floodsub *floodsub.PubSub + proc goprocess.Process ctx context.Context @@ -184,6 +187,8 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin go n.Reprovider.ProvideEvery(ctx, interval) } + n.Floodsub = floodsub.NewFloodSub(ctx, peerhost) + // setup local discovery if do != nil { service, err := do(ctx, n.PeerHost) diff --git a/package.json b/package.json index e00c61e7c..0fa1c95ca 100644 --- a/package.json +++ b/package.json @@ -263,6 +263,12 @@ "hash": "QmdCL8M8DXJdSRnwhpDhukX5r8ydjxnzPJpaKrFudDA8yn", "name": "hang-fds", "version": "0.0.0" + }, + { + "author": "whyrusleeping", + "hash": "QmQriRMW5cCJyLrzDnXi7fZ5mVbetiEZjPjbqoJhuSL94m", + "name": "floodsub", + "version": "0.3.0" } ], "gxVersion": "0.4.0",