mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 10:27:46 +08:00
166 lines
3.7 KiB
Go
166 lines
3.7 KiB
Go
package coreapi
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
core "github.com/ipfs/go-ipfs/core"
|
|
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
|
|
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
|
|
|
|
cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
|
|
pstore "gx/ipfs/QmUymf8fJtideyv3z727BcZUifGBjMZMpCJqu3Gxk5aRUk/go-libp2p-peerstore"
|
|
peer "gx/ipfs/QmcqU6QUDSXprb1518vYDGczrTJTyGwLG9eUa5iNX4xUtS/go-libp2p-peer"
|
|
pubsub "gx/ipfs/QmdQmRSSAGmZvBcbETygeTbsqLLn4k69ELvTxVbEiZxGmA/go-libp2p-pubsub"
|
|
)
|
|
|
|
type PubSubAPI CoreAPI
|
|
|
|
type pubSubSubscription struct {
|
|
cancel context.CancelFunc
|
|
subscription *pubsub.Subscription
|
|
}
|
|
|
|
type pubSubMessage struct {
|
|
msg *pubsub.Message
|
|
}
|
|
|
|
func (api *PubSubAPI) Ls(ctx context.Context) ([]string, error) {
|
|
if err := api.checkNode(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return api.node.PubSub.GetTopics(), nil
|
|
}
|
|
|
|
func (api *PubSubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOption) ([]peer.ID, error) {
|
|
if err := api.checkNode(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
settings, err := caopts.PubSubPeersOptions(opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
peers := api.node.PubSub.ListPeers(settings.Topic)
|
|
out := make([]peer.ID, len(peers))
|
|
|
|
for i, peer := range peers {
|
|
out[i] = peer
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
func (api *PubSubAPI) Publish(ctx context.Context, topic string, data []byte) error {
|
|
if err := api.checkNode(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return api.node.PubSub.Publish(topic, data)
|
|
}
|
|
|
|
func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (coreiface.PubSubSubscription, error) {
|
|
options, err := caopts.PubSubSubscribeOptions(opts...)
|
|
|
|
if err := api.checkNode(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sub, err := api.node.PubSub.Subscribe(topic)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
pubctx, cancel := context.WithCancel(api.node.Context())
|
|
|
|
if options.Discover {
|
|
go func() {
|
|
blk, err := api.core().Block().Put(pubctx, strings.NewReader("floodsub:"+topic))
|
|
if err != nil {
|
|
log.Error("pubsub discovery: ", err)
|
|
return
|
|
}
|
|
|
|
connectToPubSubPeers(pubctx, api.node, blk.Path().Cid())
|
|
}()
|
|
}
|
|
|
|
return &pubSubSubscription{cancel, sub}, nil
|
|
}
|
|
|
|
func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid cid.Cid) {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
provs := n.Routing.FindProvidersAsync(ctx, cid, 10)
|
|
var wg sync.WaitGroup
|
|
for p := range provs {
|
|
wg.Add(1)
|
|
go func(pi pstore.PeerInfo) {
|
|
defer wg.Done()
|
|
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
|
|
defer cancel()
|
|
err := n.PeerHost.Connect(ctx, pi)
|
|
if err != nil {
|
|
log.Info("pubsub discover: ", err)
|
|
return
|
|
}
|
|
log.Info("connected to pubsub peer:", pi.ID)
|
|
}(p)
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
func (api *PubSubAPI) checkNode() error {
|
|
if !api.node.OnlineMode() {
|
|
return coreiface.ErrOffline
|
|
}
|
|
|
|
if api.node.PubSub == nil {
|
|
return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (sub *pubSubSubscription) Close() error {
|
|
sub.cancel()
|
|
sub.subscription.Cancel()
|
|
return nil
|
|
}
|
|
|
|
func (sub *pubSubSubscription) Next(ctx context.Context) (coreiface.PubSubMessage, error) {
|
|
msg, err := sub.subscription.Next(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &pubSubMessage{msg}, nil
|
|
}
|
|
|
|
func (msg *pubSubMessage) From() peer.ID {
|
|
return peer.ID(msg.msg.From)
|
|
}
|
|
|
|
func (msg *pubSubMessage) Data() []byte {
|
|
return msg.msg.Data
|
|
}
|
|
|
|
func (msg *pubSubMessage) Seq() []byte {
|
|
return msg.msg.Seqno
|
|
}
|
|
|
|
func (msg *pubSubMessage) Topics() []string {
|
|
return msg.msg.TopicIDs
|
|
}
|
|
|
|
func (api *PubSubAPI) core() coreiface.CoreAPI {
|
|
return (*CoreAPI)(api)
|
|
}
|