floodsub: add api for pub/sub

License: MIT
Signed-off-by: Jeromy <why@ipfs.io>
This commit is contained in:
Jeromy 2016-09-10 06:48:08 -07:00
parent dcb21bd20e
commit 4b096c4bba
4 changed files with 175 additions and 1 deletions

162
core/commands/pubsub.go Normal file
View File

@ -0,0 +1,162 @@
package commands
import (
"bytes"
"encoding/binary"
"io"
cmds "github.com/ipfs/go-ipfs/commands"
floodsub "gx/ipfs/QmQriRMW5cCJyLrzDnXi7fZ5mVbetiEZjPjbqoJhuSL94m/floodsub"
u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"
)
var PubsubCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "An experimental publish-subscribe system on ipfs.",
ShortDescription: `
ipfs pubsub allows you to publish messages to a given topic, and also to
subscribe to new messages on a given topic.
This is an experimental feature. It is not intended in its current state
to be used in a production environment.
`,
},
Subcommands: map[string]*cmds.Command{
"pub": PubsubPubCmd,
"sub": PubsubSubCmd,
},
}
var PubsubSubCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Subscribe to messages on a given topic.",
ShortDescription: `
ipfs pubsub sub subscribes to messages on a given topic.
This is an experimental feature. It is not intended in its current state
to be used in a production environment.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("topic", true, false, "String name of topic to subscribe to."),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
// Must be online!
if !n.OnlineMode() {
res.SetError(errNotOnline, cmds.ErrClient)
return
}
topic := req.Arguments()[0]
msgs, err := n.Floodsub.Subscribe(topic)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
out := make(chan interface{})
res.SetOutput((<-chan interface{})(out))
ctx := req.Context()
go func() {
defer close(out)
for {
select {
case msg, ok := <-msgs:
if !ok {
return
}
out <- msg
case <-ctx.Done():
n.Floodsub.Unsub(topic)
}
}
}()
},
Marshalers: cmds.MarshalerMap{
cmds.Text: getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) {
log.Error("FROM: ", m.GetFrom())
return bytes.NewReader(m.Data), nil
}),
"ndpayload": getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) {
m.Data = append(m.Data, '\n')
return bytes.NewReader(m.Data), nil
}),
"lenpayload": getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) {
buf := make([]byte, 8)
n := binary.PutUvarint(buf, uint64(len(m.Data)))
return io.MultiReader(bytes.NewReader(buf[:n]), bytes.NewReader(m.Data)), nil
}),
},
Type: floodsub.Message{},
}
func getPsMsgMarshaler(f func(m *floodsub.Message) (io.Reader, error)) func(cmds.Response) (io.Reader, error) {
return func(res cmds.Response) (io.Reader, error) {
outChan, ok := res.Output().(<-chan interface{})
if !ok {
return nil, u.ErrCast()
}
marshal := func(v interface{}) (io.Reader, error) {
obj, ok := v.(*floodsub.Message)
if !ok {
return nil, u.ErrCast()
}
return f(obj)
}
return &cmds.ChannelMarshaler{
Channel: outChan,
Marshaler: marshal,
Res: res,
}, nil
}
}
var PubsubPubCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Publish a message to a given pubsub topic.",
ShortDescription: `
ipfs pubsub pub publishes a message to a specified topic.
This is an experimental feature. It is not intended in its current state
to be used in a production environment.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("topic", true, false, "Topic to publish to."),
cmds.StringArg("data", true, true, "Payload of message to publish.").EnableStdin(),
},
Options: []cmds.Option{},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
// Must be online!
if !n.OnlineMode() {
res.SetError(errNotOnline, cmds.ErrClient)
return
}
topic := req.Arguments()[0]
for _, data := range req.Arguments()[1:] {
if err := n.Floodsub.Publish(topic, []byte(data)); err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
}
},
}

View File

@ -105,6 +105,7 @@ var rootSubcommands = map[string]*cmds.Command{
"stats": StatsCmd,
"swarm": SwarmCmd,
"tar": TarCmd,
"pubsub": PubsubCmd,
"tour": tourCmd,
"file": unixfs.UnixFSCmd,
"update": ExternalBinary(),

View File

@ -17,7 +17,8 @@ import (
"time"
diag "github.com/ipfs/go-ipfs/diagnostics"
goprocess "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess"
floodsub "gx/ipfs/QmQriRMW5cCJyLrzDnXi7fZ5mVbetiEZjPjbqoJhuSL94m/floodsub"
goprocess "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
mamask "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
b58 "gx/ipfs/QmT8rehPR3F6bmwL6zjUN8XpiDBFFpMP2myPdC6ApsWfJf/go-base58"
@ -112,6 +113,8 @@ type IpfsNode struct {
Reprovider *rp.Reprovider // the value reprovider system
IpnsRepub *ipnsrp.Republisher
Floodsub *floodsub.PubSub
proc goprocess.Process
ctx context.Context
@ -184,6 +187,8 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
go n.Reprovider.ProvideEvery(ctx, interval)
}
n.Floodsub = floodsub.NewFloodSub(ctx, peerhost)
// setup local discovery
if do != nil {
service, err := do(ctx, n.PeerHost)

View File

@ -263,6 +263,12 @@
"hash": "QmdCL8M8DXJdSRnwhpDhukX5r8ydjxnzPJpaKrFudDA8yn",
"name": "hang-fds",
"version": "0.0.0"
},
{
"author": "whyrusleeping",
"hash": "QmQriRMW5cCJyLrzDnXi7fZ5mVbetiEZjPjbqoJhuSL94m",
"name": "floodsub",
"version": "0.3.0"
}
],
"gxVersion": "0.4.0",