ipfs dht query emits lookupevents

This commit is contained in:
Adin Schmahmann 2020-12-07 11:24:43 -05:00
parent 582d6c79a0
commit c78cdfd51e

View File

@ -3,6 +3,7 @@ package commands
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
@ -18,6 +19,8 @@ import (
path "github.com/ipfs/go-path"
peer "github.com/libp2p/go-libp2p-core/peer"
routing "github.com/libp2p/go-libp2p-core/routing"
kaddht "github.com/libp2p/go-libp2p-kad-dht"
)
var ErrNotDHT = errors.New("routing service is not a DHT")
@ -45,6 +48,18 @@ const (
dhtVerboseOptionName = "verbose"
)
type dhtEvent struct {
Info *kaddht.LookupEvent `json:"info"`
Time int64 `json:"ts"` // nanoseconds since the unix epoch
}
func newDHTEvent(evt *kaddht.LookupEvent) *dhtEvent {
return &dhtEvent{
Info: evt,
Time: time.Now().UnixNano(),
}
}
var queryDhtCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Find the closest Peer IDs to a given Peer ID by querying the DHT.",
@ -73,7 +88,8 @@ var queryDhtCmd = &cmds.Command{
}
ctx, cancel := context.WithCancel(req.Context)
ctx, events := routing.RegisterForQueryEvents(ctx)
//ctx, events := routing.RegisterForQueryEvents(ctx)
ctx, events := kaddht.RegisterForLookupEvents(ctx)
dht := nd.DHT.WAN
if !nd.DHT.WANActive() {
@ -101,7 +117,7 @@ var queryDhtCmd = &cmds.Command{
}()
for e := range events {
if err := res.Emit(e); err != nil {
if err := res.Emit(newDHTEvent(e)); err != nil {
return err
}
}
@ -109,18 +125,16 @@ var queryDhtCmd = &cmds.Command{
return <-errCh
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error {
pfm := pfuncMap{
routing.FinalPeer: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error {
fmt.Fprintf(out, "%s\n", obj.ID)
return nil
},
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *dhtEvent) error {
b, err := json.Marshal(out)
if err != nil {
return fmt.Errorf("could not marshall lookup event: %w", err)
}
verbose, _ := req.Options[dhtVerboseOptionName].(bool)
return printEvent(out, w, verbose, pfm)
_, err = fmt.Fprintf(w, "event: %s", string(b))
return err
}),
},
Type: routing.QueryEvent{},
Type: dhtEvent{},
}
const (