kubo/client/httpapi/pubsub.go
Łukasz Magiera 42273cab06 Don't use valid() pattern
This commit was moved from ipfs/go-ipfs-http-client@4d07c48f98
2019-02-14 19:05:17 +01:00

126 lines
2.5 KiB
Go

package httpapi
import (
"bytes"
"context"
"encoding/json"
"io"
"github.com/ipfs/interface-go-ipfs-core"
caopts "github.com/ipfs/interface-go-ipfs-core/options"
"github.com/libp2p/go-libp2p-peer"
)
type PubsubAPI HttpApi
func (api *PubsubAPI) Ls(ctx context.Context) ([]string, error) {
var out struct {
Strings []string
}
if err := api.core().request("pubsub/ls").Exec(ctx, &out); err != nil {
return nil, err
}
return out.Strings, nil
}
func (api *PubsubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOption) ([]peer.ID, error) {
options, err := caopts.PubSubPeersOptions(opts...)
if err != nil {
return nil, err
}
var out struct {
Strings []string
}
if err := api.core().request("pubsub/peers", options.Topic).Exec(ctx, &out); err != nil {
return nil, err
}
res := make([]peer.ID, len(out.Strings))
for i, sid := range out.Strings {
id, err := peer.IDB58Decode(sid)
if err != nil {
return nil, err
}
res[i] = id
}
return res, nil
}
func (api *PubsubAPI) Publish(ctx context.Context, topic string, message []byte) error {
return api.core().request("pubsub/pub", topic).
FileBody(bytes.NewReader(message)).
Exec(ctx, nil)
}
type pubsubSub struct {
io.Closer
dec *json.Decoder
}
type pubsubMessage struct {
JFrom []byte `json:"from,omitempty"`
JData []byte `json:"data,omitempty"`
JSeqno []byte `json:"seqno,omitempty"`
JTopicIDs []string `json:"topicIDs,omitempty"`
from peer.ID
}
func (msg *pubsubMessage) From() peer.ID {
return msg.from
}
func (msg *pubsubMessage) Data() []byte {
return msg.JData
}
func (msg *pubsubMessage) Seq() []byte {
return msg.JSeqno
}
func (msg *pubsubMessage) Topics() []string {
return msg.JTopicIDs
}
func (s *pubsubSub) Next(ctx context.Context) (iface.PubSubMessage, error) {
// TODO: handle ctx
var msg pubsubMessage
if err := s.dec.Decode(&msg); err != nil {
return nil, err
}
var err error
msg.from, err = peer.IDFromBytes(msg.JFrom)
return &msg, err
}
func (api *PubsubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (iface.PubSubSubscription, error) {
options, err := caopts.PubSubSubscribeOptions(opts...)
if err != nil {
return nil, err
}
resp, err := api.core().request("pubsub/sub", topic).
Option("discover", options.Discover).
Send(ctx)
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}
return &pubsubSub{
Closer: resp,
dec: json.NewDecoder(resp.Output),
}, nil
}
func (api *PubsubAPI) core() *HttpApi {
return (*HttpApi)(api)
}