mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-25 12:27:43 +08:00
cmds: implement ipfs dht provide command
License: MIT Signed-off-by: Jeromy <why@ipfs.io>
This commit is contained in:
parent
7276fd84b0
commit
eaa433f27b
@ -31,6 +31,7 @@ var DhtCmd = &cmds.Command{
|
||||
"findpeer": findPeerDhtCmd,
|
||||
"get": getValueDhtCmd,
|
||||
"put": putValueDhtCmd,
|
||||
"provide": provideRefDhtCmd,
|
||||
},
|
||||
}
|
||||
|
||||
@ -227,6 +228,116 @@ var findProvidersDhtCmd = &cmds.Command{
|
||||
Type: notif.QueryEvent{},
|
||||
}
|
||||
|
||||
var provideRefDhtCmd = &cmds.Command{
|
||||
Helptext: cmds.HelpText{
|
||||
Tagline: "Announce to the network that you are providing given values.",
|
||||
},
|
||||
|
||||
Arguments: []cmds.Argument{
|
||||
cmds.StringArg("key", true, true, "The key to find providers for.").EnableStdin(),
|
||||
},
|
||||
Options: []cmds.Option{
|
||||
cmds.BoolOption("verbose", "v", "Print extra information.").Default(false),
|
||||
},
|
||||
Run: func(req cmds.Request, res cmds.Response) {
|
||||
n, err := req.InvocContext().GetNode()
|
||||
if err != nil {
|
||||
res.SetError(err, cmds.ErrNormal)
|
||||
return
|
||||
}
|
||||
|
||||
dht, ok := n.Routing.(*ipdht.IpfsDHT)
|
||||
if !ok {
|
||||
res.SetError(ErrNotDHT, cmds.ErrNormal)
|
||||
return
|
||||
}
|
||||
|
||||
var keys []key.Key
|
||||
for _, arg := range req.Arguments() {
|
||||
k := key.B58KeyDecode(arg)
|
||||
if k == "" {
|
||||
res.SetError(fmt.Errorf("incorrectly formatted key: ", arg), cmds.ErrNormal)
|
||||
return
|
||||
}
|
||||
|
||||
has, err := n.Blockstore.Has(k)
|
||||
if err != nil {
|
||||
res.SetError(err, cmds.ErrNormal)
|
||||
return
|
||||
}
|
||||
|
||||
if !has {
|
||||
res.SetError(fmt.Errorf("block %s not found locally, cannot provide", k), cmds.ErrNormal)
|
||||
return
|
||||
}
|
||||
|
||||
keys = append(keys, k)
|
||||
}
|
||||
|
||||
outChan := make(chan interface{})
|
||||
res.SetOutput((<-chan interface{})(outChan))
|
||||
|
||||
events := make(chan *notif.QueryEvent)
|
||||
ctx := notif.RegisterForQueryEvents(req.Context(), events)
|
||||
|
||||
go func() {
|
||||
defer close(outChan)
|
||||
for e := range events {
|
||||
outChan <- e
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer close(outChan)
|
||||
for _, k := range keys {
|
||||
err := dht.Provide(ctx, k)
|
||||
if err != nil {
|
||||
notif.PublishQueryEvent(ctx, ¬if.QueryEvent{
|
||||
Type: notif.QueryError,
|
||||
Extra: err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
},
|
||||
Marshalers: cmds.MarshalerMap{
|
||||
cmds.Text: func(res cmds.Response) (io.Reader, error) {
|
||||
outChan, ok := res.Output().(<-chan interface{})
|
||||
if !ok {
|
||||
return nil, u.ErrCast()
|
||||
}
|
||||
|
||||
verbose, _, _ := res.Request().Option("v").Bool()
|
||||
pfm := pfuncMap{
|
||||
notif.FinalPeer: func(obj *notif.QueryEvent, out io.Writer, verbose bool) {
|
||||
if verbose {
|
||||
fmt.Fprintf(out, "sending provider record to peer %s\n", obj.ID)
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
marshal := func(v interface{}) (io.Reader, error) {
|
||||
obj, ok := v.(*notif.QueryEvent)
|
||||
if !ok {
|
||||
return nil, u.ErrCast()
|
||||
}
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
printEvent(obj, buf, verbose, pfm)
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
return &cmds.ChannelMarshaler{
|
||||
Channel: outChan,
|
||||
Marshaler: marshal,
|
||||
Res: res,
|
||||
}, nil
|
||||
},
|
||||
},
|
||||
Type: notif.QueryEvent{},
|
||||
}
|
||||
|
||||
var findPeerDhtCmd = &cmds.Command{
|
||||
Helptext: cmds.HelpText{
|
||||
Tagline: "Query the DHT for all of the multiaddresses associated with a Peer ID.",
|
||||
|
||||
@ -263,6 +263,10 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key key.Key) error {
|
||||
go func(p peer.ID) {
|
||||
defer wg.Done()
|
||||
log.Debugf("putProvider(%s, %s)", key, p)
|
||||
notif.PublishQueryEvent(ctx, ¬if.QueryEvent{
|
||||
Type: notif.FinalPeer,
|
||||
ID: p,
|
||||
})
|
||||
err := dht.sendMessage(ctx, p, mes)
|
||||
if err != nil {
|
||||
log.Debug(err)
|
||||
@ -272,6 +276,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key key.Key) error {
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) makeProvRecord(skey key.Key) (*pb.Message, error) {
|
||||
pi := pstore.PeerInfo{
|
||||
ID: dht.self,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user