diff --git a/core/commands/dht.go b/core/commands/dht.go new file mode 100644 index 000000000..197c8ace3 --- /dev/null +++ b/core/commands/dht.go @@ -0,0 +1,336 @@ +package commands + +import ( + "bytes" + "errors" + "fmt" + "io" + "time" + + cmds "github.com/jbenet/go-ipfs/commands" + notif "github.com/jbenet/go-ipfs/notifications" + peer "github.com/jbenet/go-ipfs/p2p/peer" + ipdht "github.com/jbenet/go-ipfs/routing/dht" + u "github.com/jbenet/go-ipfs/util" +) + +var ErrNotDHT = errors.New("routing service is not a DHT") + +var DhtCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Issue commands directly through the DHT", + ShortDescription: ``, + }, + + Subcommands: map[string]*cmds.Command{ + "query": queryDhtCmd, + "findprovs": findProvidersDhtCmd, + "findpeer": findPeerDhtCmd, + }, +} + +var queryDhtCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Run a 'findClosestPeers' query through the DHT", + ShortDescription: ``, + }, + + Arguments: []cmds.Argument{ + cmds.StringArg("peerID", true, true, "The peerID to run the query against"), + }, + Options: []cmds.Option{ + cmds.BoolOption("verbose", "v", "Write extra information"), + }, + Run: func(req cmds.Request) (interface{}, error) { + n, err := req.Context().GetNode() + if err != nil { + return nil, err + } + + dht, ok := n.Routing.(*ipdht.IpfsDHT) + if !ok { + return nil, errors.New("Routing service was not a dht") + } + + events := make(chan *notif.QueryEvent) + ctx := notif.RegisterForQueryEvents(req.Context().Context, events) + + closestPeers, err := dht.GetClosestPeers(ctx, u.Key(req.Arguments()[0])) + + go func() { + defer close(events) + for p := range closestPeers { + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + ID: p, + Type: notif.FinalPeer, + }) + } + }() + + outChan := make(chan interface{}) + go func() { + defer close(outChan) + for e := range events { + outChan <- e + } + }() + return outChan, nil + }, + Marshalers: cmds.MarshalerMap{ + cmds.Text: func(res cmds.Response) (io.Reader, error) { + outChan, ok := res.Output().(<-chan interface{}) + if !ok { + return nil, u.ErrCast() + } + + marshal := func(v interface{}) (io.Reader, error) { + obj, ok := v.(*notif.QueryEvent) + if !ok { + return nil, u.ErrCast() + } + + verbose, _, _ := res.Request().Option("v").Bool() + + buf := new(bytes.Buffer) + if verbose { + fmt.Fprintf(buf, "%s: ", time.Now().Format("15:04:05.000")) + } + switch obj.Type { + case notif.FinalPeer: + fmt.Fprintf(buf, "%s\n", obj.ID) + case notif.PeerResponse: + if verbose { + fmt.Fprintf(buf, "* %s says use ", obj.ID) + for _, p := range obj.Responses { + fmt.Fprintf(buf, "%s ", p.ID) + } + fmt.Fprintln(buf) + } + case notif.SendingQuery: + if verbose { + fmt.Fprintf(buf, "* querying %s\n", obj.ID) + } + case notif.QueryError: + fmt.Fprintf(buf, "error: %s\n", obj.Extra) + default: + fmt.Fprintf(buf, "unrecognized event type: %d\n", obj.Type) + } + return buf, nil + } + + return &cmds.ChannelMarshaler{ + Channel: outChan, + Marshaler: marshal, + }, nil + }, + }, + Type: notif.QueryEvent{}, +} + +var findProvidersDhtCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Run a 'FindProviders' query through the DHT", + ShortDescription: ` +FindProviders will return a list of peers who are able to provide the value requested. +`, + }, + + Arguments: []cmds.Argument{ + cmds.StringArg("key", true, true, "The key to find providers for"), + }, + Options: []cmds.Option{ + cmds.BoolOption("verbose", "v", "Write extra information"), + }, + Run: func(req cmds.Request) (interface{}, error) { + n, err := req.Context().GetNode() + if err != nil { + return nil, err + } + + dht, ok := n.Routing.(*ipdht.IpfsDHT) + if !ok { + return nil, ErrNotDHT + } + + numProviders := 20 + + outChan := make(chan interface{}) + events := make(chan *notif.QueryEvent) + ctx := notif.RegisterForQueryEvents(req.Context().Context, events) + + pchan := dht.FindProvidersAsync(ctx, u.B58KeyDecode(req.Arguments()[0]), numProviders) + go func() { + defer close(outChan) + for e := range events { + outChan <- e + } + }() + + go func() { + defer close(events) + for p := range pchan { + np := p + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.Provider, + Responses: []*peer.PeerInfo{&np}, + }) + } + }() + return outChan, nil + }, + Marshalers: cmds.MarshalerMap{ + cmds.Text: func(res cmds.Response) (io.Reader, error) { + outChan, ok := res.Output().(<-chan interface{}) + if !ok { + return nil, u.ErrCast() + } + + verbose, _, _ := res.Request().Option("v").Bool() + + marshal := func(v interface{}) (io.Reader, error) { + obj, ok := v.(*notif.QueryEvent) + if !ok { + return nil, u.ErrCast() + } + + buf := new(bytes.Buffer) + if verbose { + fmt.Fprintf(buf, "%s: ", time.Now().Format("15:04:05.000")) + } + switch obj.Type { + case notif.FinalPeer: + if verbose { + fmt.Fprintf(buf, "* closest peer %s\n", obj.ID) + } + case notif.Provider: + fmt.Fprintf(buf, "%s\n", obj.ID.Pretty()) + case notif.PeerResponse: + if verbose { + fmt.Fprintf(buf, "* %s says use ", obj.ID) + for _, p := range obj.Responses { + fmt.Fprintf(buf, "%s ", p.ID) + } + fmt.Fprintln(buf) + } + case notif.SendingQuery: + if verbose { + fmt.Fprintf(buf, "* querying %s\n", obj.ID) + } + case notif.QueryError: + fmt.Fprintf(buf, "error: %s\n", obj.Extra) + default: + fmt.Fprintf(buf, "unrecognized event type: %d\n", obj.Type) + } + return buf, nil + } + + return &cmds.ChannelMarshaler{ + Channel: outChan, + Marshaler: marshal, + }, nil + }, + }, + Type: peer.PeerInfo{}, +} + +var findPeerDhtCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Run a 'FindPeer' query through the DHT", + ShortDescription: ``, + }, + + Arguments: []cmds.Argument{ + cmds.StringArg("peerID", true, true, "The peer to search for"), + }, + Run: func(req cmds.Request) (interface{}, error) { + n, err := req.Context().GetNode() + if err != nil { + return nil, err + } + + dht, ok := n.Routing.(*ipdht.IpfsDHT) + if !ok { + return nil, ErrNotDHT + } + + pid, err := peer.IDB58Decode(req.Arguments()[0]) + if err != nil { + return nil, err + } + + outChan := make(chan interface{}) + events := make(chan *notif.QueryEvent) + ctx := notif.RegisterForQueryEvents(req.Context().Context, events) + + go func() { + defer close(outChan) + for v := range events { + outChan <- v + } + }() + + go func() { + defer close(events) + pi, err := dht.FindPeer(ctx, pid) + if err != nil { + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.QueryError, + Extra: err.Error(), + }) + return + } + + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.FinalPeer, + Responses: []*peer.PeerInfo{&pi}, + }) + }() + + return outChan, nil + }, + Marshalers: cmds.MarshalerMap{ + cmds.Text: func(res cmds.Response) (io.Reader, error) { + outChan, ok := res.Output().(<-chan interface{}) + if !ok { + return nil, u.ErrCast() + } + + marshal := func(v interface{}) (io.Reader, error) { + obj, ok := v.(*notif.QueryEvent) + if !ok { + return nil, u.ErrCast() + } + + buf := new(bytes.Buffer) + fmt.Fprintf(buf, "%s: ", time.Now().Format("15:04:05.000")) + switch obj.Type { + case notif.FinalPeer: + pi := obj.Responses[0] + fmt.Fprintf(buf, "%s\n", pi.ID) + for _, a := range pi.Addrs { + fmt.Fprintf(buf, "\t%s\n", a) + } + case notif.PeerResponse: + fmt.Fprintf(buf, "* %s says use ", obj.ID) + for _, p := range obj.Responses { + fmt.Fprintf(buf, "%s ", p.ID) + } + fmt.Fprintln(buf) + case notif.SendingQuery: + fmt.Fprintf(buf, "* querying %s\n", obj.ID) + case notif.QueryError: + fmt.Fprintf(buf, "error: %s\n", obj.Extra) + default: + fmt.Fprintf(buf, "unrecognized event type: %d\n", obj.Type) + } + return buf, nil + } + + return &cmds.ChannelMarshaler{ + Channel: outChan, + Marshaler: marshal, + }, nil + }, + }, + Type: notif.QueryEvent{}, +} diff --git a/core/commands/root.go b/core/commands/root.go index 60fd0a93f..961e7c6fe 100644 --- a/core/commands/root.go +++ b/core/commands/root.go @@ -80,6 +80,7 @@ var rootSubcommands = map[string]*cmds.Command{ "cat": CatCmd, "commands": CommandsDaemonCmd, "config": ConfigCmd, + "dht": DhtCmd, "diag": DiagCmd, "id": IDCmd, "log": LogCmd, diff --git a/notifications/query.go b/notifications/query.go new file mode 100644 index 000000000..77d6fcc58 --- /dev/null +++ b/notifications/query.go @@ -0,0 +1,81 @@ +package notifications + +import ( + "encoding/json" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + peer "github.com/jbenet/go-ipfs/p2p/peer" +) + +const RoutingQueryKey = "RoutingQueryEvent" + +type QueryEventType int + +const ( + SendingQuery QueryEventType = iota + PeerResponse + FinalPeer + QueryError + Provider +) + +type QueryEvent struct { + ID peer.ID + Type QueryEventType + Responses []*peer.PeerInfo + Extra string +} + +func RegisterForQueryEvents(ctx context.Context, ch chan<- *QueryEvent) context.Context { + return context.WithValue(ctx, RoutingQueryKey, ch) +} + +func PublishQueryEvent(ctx context.Context, ev *QueryEvent) { + ich := ctx.Value(RoutingQueryKey) + if ich == nil { + return + } + + ch, ok := ich.(chan<- *QueryEvent) + if !ok { + return + } + + select { + case ch <- ev: + case <-ctx.Done(): + } +} + +func (qe *QueryEvent) MarshalJSON() ([]byte, error) { + out := make(map[string]interface{}) + out["ID"] = peer.IDB58Encode(qe.ID) + out["Type"] = int(qe.Type) + out["Responses"] = qe.Responses + out["Extra"] = qe.Extra + return json.Marshal(out) +} + +func (qe *QueryEvent) UnmarshalJSON(b []byte) error { + temp := struct { + ID string + Type int + Responses []*peer.PeerInfo + Extra string + }{} + err := json.Unmarshal(b, &temp) + if err != nil { + return err + } + if len(temp.ID) > 0 { + pid, err := peer.IDB58Decode(temp.ID) + if err != nil { + return err + } + qe.ID = pid + } + qe.Type = QueryEventType(temp.Type) + qe.Responses = temp.Responses + qe.Extra = temp.Extra + return nil +} diff --git a/p2p/peer/peer.go b/p2p/peer/peer.go index c29e23283..124d6ba6c 100644 --- a/p2p/peer/peer.go +++ b/p2p/peer/peer.go @@ -3,6 +3,7 @@ package peer import ( "encoding/hex" + "encoding/json" "fmt" b58 "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58" @@ -131,6 +132,35 @@ type PeerInfo struct { Addrs []ma.Multiaddr } +func (pi *PeerInfo) MarshalJSON() ([]byte, error) { + out := make(map[string]interface{}) + out["ID"] = IDB58Encode(pi.ID) + var addrs []string + for _, a := range pi.Addrs { + addrs = append(addrs, a.String()) + } + out["Addrs"] = addrs + return json.Marshal(out) +} + +func (pi *PeerInfo) UnmarshalJSON(b []byte) error { + var data map[string]interface{} + err := json.Unmarshal(b, &data) + if err != nil { + return err + } + pid, err := IDB58Decode(data["ID"].(string)) + if err != nil { + return err + } + pi.ID = pid + addrs := data["Addrs"].([]interface{}) + for _, a := range addrs { + pi.Addrs = append(pi.Addrs, ma.StringCast(a.(string))) + } + return nil +} + // IDSlice for sorting peers type IDSlice []ID diff --git a/routing/dht/lookup.go b/routing/dht/lookup.go new file mode 100644 index 000000000..ea1552ba7 --- /dev/null +++ b/routing/dht/lookup.go @@ -0,0 +1,108 @@ +package dht + +import ( + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + + notif "github.com/jbenet/go-ipfs/notifications" + peer "github.com/jbenet/go-ipfs/p2p/peer" + kb "github.com/jbenet/go-ipfs/routing/kbucket" + u "github.com/jbenet/go-ipfs/util" + errors "github.com/jbenet/go-ipfs/util/debugerror" + pset "github.com/jbenet/go-ipfs/util/peerset" +) + +// Required in order for proper JSON marshaling +func pointerizePeerInfos(pis []peer.PeerInfo) []*peer.PeerInfo { + out := make([]*peer.PeerInfo, len(pis)) + for i, p := range pis { + np := p + out[i] = &np + } + return out +} + +// Kademlia 'node lookup' operation. Returns a channel of the K closest peers +// to the given key +func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key u.Key) (<-chan peer.ID, error) { + e := log.EventBegin(ctx, "getClosestPeers", &key) + tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) + if len(tablepeers) == 0 { + return nil, errors.Wrap(kb.ErrLookupFailure) + } + + out := make(chan peer.ID, KValue) + peerset := pset.NewLimited(KValue) + + for _, p := range tablepeers { + select { + case out <- p: + case <-ctx.Done(): + return nil, ctx.Err() + } + peerset.Add(p) + } + + query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { + // For DHT query command + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.SendingQuery, + ID: p, + }) + + closer, err := dht.closerPeersSingle(ctx, key, p) + if err != nil { + log.Errorf("error getting closer peers: %s", err) + return nil, err + } + + var filtered []peer.PeerInfo + for _, clp := range closer { + if kb.Closer(clp, dht.self, key) && peerset.TryAdd(clp) { + select { + case out <- clp: + case <-ctx.Done(): + return nil, ctx.Err() + } + filtered = append(filtered, dht.peerstore.PeerInfo(clp)) + } + } + + // For DHT query command + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.PeerResponse, + ID: p, + Responses: pointerizePeerInfos(filtered), + }) + + return &dhtQueryResult{closerPeers: filtered}, nil + }) + + go func() { + defer close(out) + defer e.Done() + // run it! + _, err := query.Run(ctx, tablepeers) + if err != nil { + log.Debugf("closestPeers query run error: %s", err) + } + }() + + return out, nil +} + +func (dht *IpfsDHT) closerPeersSingle(ctx context.Context, key u.Key, p peer.ID) ([]peer.ID, error) { + pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key)) + if err != nil { + return nil, err + } + + var out []peer.ID + for _, pbp := range pmes.GetCloserPeers() { + pid := peer.ID(pbp.GetId()) + if pid != dht.self { // dont add self + dht.peerstore.AddAddresses(pid, pbp.Addresses()) + out = append(out, pid) + } + } + return out, nil +} diff --git a/routing/dht/query.go b/routing/dht/query.go index 687d2621f..dfaecef98 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -3,6 +3,7 @@ package dht import ( "sync" + notif "github.com/jbenet/go-ipfs/notifications" peer "github.com/jbenet/go-ipfs/p2p/peer" queue "github.com/jbenet/go-ipfs/p2p/peer/queue" "github.com/jbenet/go-ipfs/routing" @@ -223,16 +224,26 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) { // make sure we're connected to the peer. if conns := r.query.dht.host.Network().ConnsToPeer(p); len(conns) == 0 { log.Infof("not connected. dialing.") + // while we dial, we do not take up a rate limit. this is to allow + // forward progress during potentially very high latency dials. + r.rateLimit <- struct{}{} pi := peer.PeerInfo{ID: p} if err := r.query.dht.host.Connect(cg.Context(), pi); err != nil { log.Debugf("Error connecting: %s", err) + + notif.PublishQueryEvent(cg.Context(), ¬if.QueryEvent{ + Type: notif.QueryError, + Extra: err.Error(), + }) + r.Lock() r.errs = append(r.errs, err) r.Unlock() + <-r.rateLimit // need to grab it again, as we deferred. return } - + <-r.rateLimit // need to grab it again, as we deferred. log.Debugf("connected. dial success.") } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 2054e03fd..39002fe64 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -7,6 +7,7 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + notif "github.com/jbenet/go-ipfs/notifications" inet "github.com/jbenet/go-ipfs/p2p/net" peer "github.com/jbenet/go-ipfs/p2p/peer" "github.com/jbenet/go-ipfs/routing" @@ -48,7 +49,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error return err } - pchan, err := dht.getClosestPeers(ctx, key) + pchan, err := dht.GetClosestPeers(ctx, key) if err != nil { return err } @@ -134,7 +135,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { // add self locally dht.providers.AddProvider(key, dht.self) - peers, err := dht.getClosestPeers(ctx, key) + peers, err := dht.GetClosestPeers(ctx, key) if err != nil { return err } @@ -164,79 +165,6 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerIn return providers, nil } -// Kademlia 'node lookup' operation. Returns a channel of the K closest peers -// to the given key -func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key) (<-chan peer.ID, error) { - e := log.EventBegin(ctx, "getClosestPeers", &key) - tablepeers := dht.routingTable.ListPeers() - if len(tablepeers) == 0 { - return nil, errors.Wrap(kb.ErrLookupFailure) - } - - out := make(chan peer.ID, KValue) - peerset := pset.NewLimited(KValue) - - for _, p := range tablepeers { - select { - case out <- p: - case <-ctx.Done(): - return nil, ctx.Err() - } - peerset.Add(p) - } - - query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { - closer, err := dht.closerPeersSingle(ctx, key, p) - if err != nil { - log.Errorf("error getting closer peers: %s", err) - return nil, err - } - - var filtered []peer.PeerInfo - for _, p := range closer { - if kb.Closer(p, dht.self, key) && peerset.TryAdd(p) { - select { - case out <- p: - case <-ctx.Done(): - return nil, ctx.Err() - } - filtered = append(filtered, dht.peerstore.PeerInfo(p)) - } - } - - return &dhtQueryResult{closerPeers: filtered}, nil - }) - - go func() { - defer close(out) - defer e.Done() - // run it! - _, err := query.Run(ctx, tablepeers) - if err != nil { - log.Debugf("closestPeers query run error: %s", err) - } - }() - - return out, nil -} - -func (dht *IpfsDHT) closerPeersSingle(ctx context.Context, key u.Key, p peer.ID) ([]peer.ID, error) { - pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key)) - if err != nil { - return nil, err - } - - var out []peer.ID - for _, pbp := range pmes.GetCloserPeers() { - pid := peer.ID(pbp.GetId()) - if pid != dht.self { // dont add self - dht.peerstore.AddAddresses(pid, pbp.Addresses()) - out = append(out, pid) - } - } - return out, nil -} - // FindProvidersAsync is the same thing as FindProviders, but returns a channel. // Peers will be returned on the channel as soon as they are found, even before // the search query completes. @@ -315,6 +243,10 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co _, err := query.Run(ctx, peers) if err != nil { log.Errorf("Query error: %s", err) + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.QueryError, + Extra: err.Error(), + }) } } @@ -342,6 +274,10 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, er // setup the Query query := dht.newQuery(u.Key(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.SendingQuery, + ID: p, + }) pmes, err := dht.findPeerSingle(ctx, p, id) if err != nil { @@ -361,6 +297,11 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, er } } + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.PeerResponse, + Responses: pointerizePeerInfos(clpeerInfos), + }) + return &dhtQueryResult{closerPeers: clpeerInfos}, nil })