From 7e7a04e21ffbb03a356f85d7d53e881672741134 Mon Sep 17 00:00:00 2001 From: Overbool Date: Sat, 27 Oct 2018 10:57:32 +0800 Subject: [PATCH 1/2] commands/dht: use new cmds lib License: MIT Signed-off-by: Overbool --- core/commands/dht.go | 337 +++++++++++++++--------------------------- core/commands/root.go | 2 +- 2 files changed, 122 insertions(+), 217 deletions(-) diff --git a/core/commands/dht.go b/core/commands/dht.go index 015433702..43bafabdb 100644 --- a/core/commands/dht.go +++ b/core/commands/dht.go @@ -1,15 +1,13 @@ package commands import ( - "bytes" "context" "errors" "fmt" "io" "time" - cmds "github.com/ipfs/go-ipfs/commands" - e "github.com/ipfs/go-ipfs/core/commands/e" + cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv" dag "gx/ipfs/QmSei8kFMfqdJq7Q68d2LMnHbTWKKg2daA29ezUYFAUNgc/go-merkledag" path "gx/ipfs/QmT3rzed1ppXefourpmoZ7tyVQfsGPQZ1pHDngLmCvXxd3/go-path" @@ -21,6 +19,7 @@ import ( routing "gx/ipfs/QmcQ81jSyWCp1jpkQ8CMbtpXT3jK7Wg6ZtYmoyWFgBoF9c/go-libp2p-routing" notif "gx/ipfs/QmcQ81jSyWCp1jpkQ8CMbtpXT3jK7Wg6ZtYmoyWFgBoF9c/go-libp2p-routing/notifications" "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit" + cmds "gx/ipfs/QmSXUokcP4TJpFfqozT69AVAYRtzXVMUjzQVkYX41R9Svs/go-ipfs-cmds" ) var ErrNotDHT = errors.New("routing service is not a DHT") @@ -60,32 +59,28 @@ var queryDhtCmd = &cmds.Command{ Options: []cmdkit.Option{ cmdkit.BoolOption("verbose", dhtVerboseOptionName, "Print extra information."), }, - Run: func(req cmds.Request, res cmds.Response) { - n, err := req.InvocContext().GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + nd, err := cmdenv.GetNode(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - if n.DHT == nil { - res.SetError(ErrNotDHT, cmdkit.ErrNormal) - return + if nd.DHT == nil { + return ErrNotDHT } - id, err := peer.IDB58Decode(req.Arguments()[0]) + id, err := peer.IDB58Decode(req.Arguments[0]) if err != nil { - res.SetError(cmds.ClientError("invalid peer ID"), cmdkit.ErrClient) - return + return cmds.ClientError("invalid peer ID") } - ctx, cancel := context.WithCancel(req.Context()) + ctx, cancel := context.WithCancel(req.Context) ctx, events := notif.RegisterForQueryEvents(ctx) - closestPeers, err := n.DHT.GetClosestPeers(ctx, string(id)) + closestPeers, err := nd.DHT.GetClosestPeers(ctx, string(id)) if err != nil { cancel() - res.SetError(err, cmdkit.ErrNormal) - return + return err } go func() { @@ -99,21 +94,23 @@ var queryDhtCmd = &cmds.Command{ }() outChan := make(chan interface{}) - res.SetOutput((<-chan interface{})(outChan)) go func() { defer close(outChan) for e := range events { select { case outChan <- e: - case <-req.Context().Done(): + case <-req.Context.Done(): return } } }() + + return res.Emit(outChan) + }, - Marshalers: cmds.MarshalerMap{ - cmds.Text: func() cmds.Marshaler { + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error { pfm := pfuncMap{ notif.PeerResponse: func(obj *notif.QueryEvent, out io.Writer, verbose bool) { for _, p := range obj.Responses { @@ -121,25 +118,10 @@ var queryDhtCmd = &cmds.Command{ } }, } - - return func(res cmds.Response) (io.Reader, error) { - v, err := unwrapOutput(res.Output()) - if err != nil { - return nil, err - } - - obj, ok := v.(*notif.QueryEvent) - if !ok { - return nil, e.TypeErr(obj, v) - } - - verbose, _, _ := res.Request().Option(dhtVerboseOptionName).Bool() - - buf := new(bytes.Buffer) - printEvent(obj, buf, verbose, pfm) - return buf, nil - } - }(), + verbose, _ := req.Options[dhtVerboseOptionName].(bool) + printEvent(out, w, verbose, pfm) + return nil + }), }, Type: notif.QueryEvent{}, } @@ -161,39 +143,33 @@ var findProvidersDhtCmd = &cmds.Command{ cmdkit.BoolOption("verbose", dhtVerboseOptionName, "Print extra information."), cmdkit.IntOption(numProvidersOptionName, "n", "The number of providers to find.").WithDefault(20), }, - Run: func(req cmds.Request, res cmds.Response) { - n, err := req.InvocContext().GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + n, err := cmdenv.GetNode(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } if n.Routing == nil { - res.SetError(ErrNotOnline, cmdkit.ErrNormal) - return + return ErrNotOnline } - numProviders, _, err := res.Request().Option(numProvidersOptionName).Int() + numProviders, _ := req.Options[numProvidersOptionName].(int) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } if numProviders < 1 { - res.SetError(fmt.Errorf("number of providers must be greater than 0"), cmdkit.ErrNormal) - return + return fmt.Errorf("number of providers must be greater than 0") } - c, err := cid.Parse(req.Arguments()[0]) + c, err := cid.Parse(req.Arguments[0]) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } outChan := make(chan interface{}) - res.SetOutput((<-chan interface{})(outChan)) - ctx, cancel := context.WithCancel(req.Context()) + ctx, cancel := context.WithCancel(req.Context) ctx, events := notif.RegisterForQueryEvents(ctx) pchan := n.Routing.FindProvidersAsync(ctx, c, numProviders) @@ -202,7 +178,7 @@ var findProvidersDhtCmd = &cmds.Command{ for e := range events { select { case outChan <- e: - case <-req.Context().Done(): + case <-req.Context.Done(): return } } @@ -218,9 +194,11 @@ var findProvidersDhtCmd = &cmds.Command{ }) } }() + + return res.Emit(outChan) }, - Marshalers: cmds.MarshalerMap{ - cmds.Text: func() func(cmds.Response) (io.Reader, error) { + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error { pfm := pfuncMap{ notif.FinalPeer: func(obj *notif.QueryEvent, out io.Writer, verbose bool) { if verbose { @@ -241,23 +219,11 @@ var findProvidersDhtCmd = &cmds.Command{ }, } - return func(res cmds.Response) (io.Reader, error) { - verbose, _, _ := res.Request().Option(dhtVerboseOptionName).Bool() - v, err := unwrapOutput(res.Output()) - if err != nil { - return nil, err - } + verbose, _ := req.Options[dhtVerboseOptionName].(bool) + printEvent(out, w, verbose, pfm) - obj, ok := v.(*notif.QueryEvent) - if !ok { - return nil, e.TypeErr(obj, v) - } - - buf := new(bytes.Buffer) - printEvent(obj, buf, verbose, pfm) - return buf, nil - } - }(), + return nil + }), }, Type: notif.QueryEvent{}, } @@ -278,51 +244,44 @@ var provideRefDhtCmd = &cmds.Command{ cmdkit.BoolOption("verbose", dhtVerboseOptionName, "Print extra information."), cmdkit.BoolOption(recursiveOptionName, "r", "Recursively provide entire graph."), }, - Run: func(req cmds.Request, res cmds.Response) { - n, err := req.InvocContext().GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + nd, err := cmdenv.GetNode(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - if n.Routing == nil { - res.SetError(ErrNotOnline, cmdkit.ErrNormal) - return + if nd.Routing == nil { + return ErrNotOnline } - if len(n.PeerHost.Network().Conns()) == 0 { - res.SetError(errors.New("cannot provide, no connected peers"), cmdkit.ErrNormal) - return + if len(nd.PeerHost.Network().Conns()) == 0 { + return errors.New("cannot provide, no connected peers") } - rec, _, _ := req.Option(recursiveOptionName).Bool() + rec, _ := req.Options[recursiveOptionName].(bool) var cids []cid.Cid - for _, arg := range req.Arguments() { + for _, arg := range req.Arguments { c, err := cid.Decode(arg) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - has, err := n.Blockstore.Has(c) + has, err := nd.Blockstore.Has(c) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } if !has { - res.SetError(fmt.Errorf("block %s not found locally, cannot provide", c), cmdkit.ErrNormal) - return + return fmt.Errorf("block %s not found locally, cannot provide", c) } cids = append(cids, c) } outChan := make(chan interface{}) - res.SetOutput((<-chan interface{})(outChan)) - ctx, cancel := context.WithCancel(req.Context()) + ctx, cancel := context.WithCancel(req.Context) ctx, events := notif.RegisterForQueryEvents(ctx) go func() { @@ -330,7 +289,7 @@ var provideRefDhtCmd = &cmds.Command{ for e := range events { select { case outChan <- e: - case <-req.Context().Done(): + case <-req.Context.Done(): return } } @@ -340,9 +299,9 @@ var provideRefDhtCmd = &cmds.Command{ defer cancel() var err error if rec { - err = provideKeysRec(ctx, n.Routing, n.DAG, cids) + err = provideKeysRec(ctx, nd.Routing, nd.DAG, cids) } else { - err = provideKeys(ctx, n.Routing, cids) + err = provideKeys(ctx, nd.Routing, cids) } if err != nil { notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ @@ -351,9 +310,11 @@ var provideRefDhtCmd = &cmds.Command{ }) } }() + + return res.Emit(outChan) }, - Marshalers: cmds.MarshalerMap{ - cmds.Text: func() func(res cmds.Response) (io.Reader, error) { + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error { pfm := pfuncMap{ notif.FinalPeer: func(obj *notif.QueryEvent, out io.Writer, verbose bool) { if verbose { @@ -362,22 +323,11 @@ var provideRefDhtCmd = &cmds.Command{ }, } - return func(res cmds.Response) (io.Reader, error) { - verbose, _, _ := res.Request().Option(dhtVerboseOptionName).Bool() - v, err := unwrapOutput(res.Output()) - if err != nil { - return nil, err - } - obj, ok := v.(*notif.QueryEvent) - if !ok { - return nil, e.TypeErr(obj, v) - } + verbose, _ := req.Options[dhtVerboseOptionName].(bool) + printEvent(out, w, verbose, pfm) - buf := new(bytes.Buffer) - printEvent(obj, buf, verbose, pfm) - return buf, nil - } - }(), + return nil + }), }, Type: notif.QueryEvent{}, } @@ -430,28 +380,24 @@ var findPeerDhtCmd = &cmds.Command{ Options: []cmdkit.Option{ cmdkit.BoolOption("verbose", dhtVerboseOptionName, "Print extra information."), }, - Run: func(req cmds.Request, res cmds.Response) { - n, err := req.InvocContext().GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + nd, err := cmdenv.GetNode(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - if n.Routing == nil { - res.SetError(ErrNotOnline, cmdkit.ErrNormal) - return + if nd.Routing == nil { + return ErrNotOnline } - pid, err := peer.IDB58Decode(req.Arguments()[0]) + pid, err := peer.IDB58Decode(req.Arguments[0]) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } outChan := make(chan interface{}) - res.SetOutput((<-chan interface{})(outChan)) - ctx, cancel := context.WithCancel(req.Context()) + ctx, cancel := context.WithCancel(req.Context) ctx, events := notif.RegisterForQueryEvents(ctx) go func() { @@ -459,7 +405,7 @@ var findPeerDhtCmd = &cmds.Command{ for v := range events { select { case outChan <- v: - case <-req.Context().Done(): + case <-req.Context.Done(): } } @@ -467,7 +413,7 @@ var findPeerDhtCmd = &cmds.Command{ go func() { defer cancel() - pi, err := n.Routing.FindPeer(ctx, pid) + pi, err := nd.Routing.FindPeer(ctx, pid) if err != nil { notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ Type: notif.QueryError, @@ -481,9 +427,11 @@ var findPeerDhtCmd = &cmds.Command{ Responses: []*pstore.PeerInfo{&pi}, }) }() + + return res.Emit(outChan) }, - Marshalers: cmds.MarshalerMap{ - cmds.Text: func() func(cmds.Response) (io.Reader, error) { + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error { pfm := pfuncMap{ notif.FinalPeer: func(obj *notif.QueryEvent, out io.Writer, verbose bool) { pi := obj.Responses[0] @@ -493,24 +441,10 @@ var findPeerDhtCmd = &cmds.Command{ }, } - return func(res cmds.Response) (io.Reader, error) { - verbose, _, _ := res.Request().Option(dhtVerboseOptionName).Bool() - v, err := unwrapOutput(res.Output()) - if err != nil { - return nil, err - } - - obj, ok := v.(*notif.QueryEvent) - if !ok { - return nil, e.TypeErr(obj, v) - } - - buf := new(bytes.Buffer) - printEvent(obj, buf, verbose, pfm) - - return buf, nil - } - }(), + verbose, _ := req.Options[dhtVerboseOptionName].(bool) + printEvent(out, w, verbose, pfm) + return nil + }), }, Type: notif.QueryEvent{}, } @@ -535,28 +469,24 @@ Different key types can specify other 'best' rules. Options: []cmdkit.Option{ cmdkit.BoolOption("verbose", dhtVerboseOptionName, "Print extra information."), }, - Run: func(req cmds.Request, res cmds.Response) { - n, err := req.InvocContext().GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + nd, err := cmdenv.GetNode(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - if n.Routing == nil { - res.SetError(ErrNotOnline, cmdkit.ErrNormal) - return + if nd.Routing == nil { + return ErrNotOnline } - dhtkey, err := escapeDhtKey(req.Arguments()[0]) + dhtkey, err := escapeDhtKey(req.Arguments[0]) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } outChan := make(chan interface{}) - res.SetOutput((<-chan interface{})(outChan)) - ctx, cancel := context.WithCancel(req.Context()) + ctx, cancel := context.WithCancel(req.Context) ctx, events := notif.RegisterForQueryEvents(ctx) go func() { @@ -564,14 +494,14 @@ Different key types can specify other 'best' rules. for e := range events { select { case outChan <- e: - case <-req.Context().Done(): + case <-req.Context.Done(): } } }() go func() { defer cancel() - val, err := n.Routing.GetValue(ctx, dhtkey) + val, err := nd.Routing.GetValue(ctx, dhtkey) if err != nil { notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ Type: notif.QueryError, @@ -584,9 +514,11 @@ Different key types can specify other 'best' rules. }) } }() + + return res.Emit(outChan) }, - Marshalers: cmds.MarshalerMap{ - cmds.Text: func() func(cmds.Response) (io.Reader, error) { + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error { pfm := pfuncMap{ notif.Value: func(obj *notif.QueryEvent, out io.Writer, verbose bool) { if verbose { @@ -597,25 +529,11 @@ Different key types can specify other 'best' rules. }, } - return func(res cmds.Response) (io.Reader, error) { - verbose, _, _ := res.Request().Option(dhtVerboseOptionName).Bool() - v, err := unwrapOutput(res.Output()) - if err != nil { - return nil, err - } + verbose, _ := req.Options[dhtVerboseOptionName].(bool) + printEvent(out, w, verbose, pfm) - obj, ok := v.(*notif.QueryEvent) - if !ok { - return nil, e.TypeErr(obj, v) - } - - buf := new(bytes.Buffer) - - printEvent(obj, buf, verbose, pfm) - - return buf, nil - } - }(), + return nil + }), }, Type: notif.QueryEvent{}, } @@ -649,30 +567,26 @@ NOTE: A value may not exceed 2048 bytes. Options: []cmdkit.Option{ cmdkit.BoolOption("verbose", dhtVerboseOptionName, "Print extra information."), }, - Run: func(req cmds.Request, res cmds.Response) { - n, err := req.InvocContext().GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + nd, err := cmdenv.GetNode(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - if n.Routing == nil { - res.SetError(ErrNotOnline, cmdkit.ErrNormal) - return + if nd.Routing == nil { + return ErrNotOnline } - key, err := escapeDhtKey(req.Arguments()[0]) + key, err := escapeDhtKey(req.Arguments[0]) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } outChan := make(chan interface{}) - res.SetOutput((<-chan interface{})(outChan)) - data := req.Arguments()[1] + data := req.Arguments[1] - ctx, cancel := context.WithCancel(req.Context()) + ctx, cancel := context.WithCancel(req.Context) ctx, events := notif.RegisterForQueryEvents(ctx) go func() { @@ -680,7 +594,7 @@ NOTE: A value may not exceed 2048 bytes. for e := range events { select { case outChan <- e: - case <-req.Context().Done(): + case <-req.Context.Done(): return } } @@ -688,7 +602,7 @@ NOTE: A value may not exceed 2048 bytes. go func() { defer cancel() - err := n.Routing.PutValue(ctx, key, []byte(data)) + err := nd.Routing.PutValue(ctx, key, []byte(data)) if err != nil { notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ Type: notif.QueryError, @@ -696,9 +610,11 @@ NOTE: A value may not exceed 2048 bytes. }) } }() + + return res.Emit(outChan) }, - Marshalers: cmds.MarshalerMap{ - cmds.Text: func() func(cmds.Response) (io.Reader, error) { + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error { pfm := pfuncMap{ notif.FinalPeer: func(obj *notif.QueryEvent, out io.Writer, verbose bool) { if verbose { @@ -710,23 +626,12 @@ NOTE: A value may not exceed 2048 bytes. }, } - return func(res cmds.Response) (io.Reader, error) { - verbose, _, _ := res.Request().Option(dhtVerboseOptionName).Bool() - v, err := unwrapOutput(res.Output()) - if err != nil { - return nil, err - } - obj, ok := v.(*notif.QueryEvent) - if !ok { - return nil, e.TypeErr(obj, v) - } + verbose, _ := req.Options[dhtVerboseOptionName].(bool) - buf := new(bytes.Buffer) - printEvent(obj, buf, verbose, pfm) + printEvent(out, w, verbose, pfm) - return buf, nil - } - }(), + return nil + }), }, Type: notif.QueryEvent{}, } diff --git a/core/commands/root.go b/core/commands/root.go index ff15cf4ae..7afb3ea17 100644 --- a/core/commands/root.go +++ b/core/commands/root.go @@ -125,7 +125,7 @@ var rootSubcommands = map[string]*cmds.Command{ "bootstrap": lgc.NewCommand(BootstrapCmd), "config": lgc.NewCommand(ConfigCmd), "dag": dag.DagCmd, - "dht": lgc.NewCommand(DhtCmd), + "dht": DhtCmd, "diag": lgc.NewCommand(DiagCmd), "dns": DNSCmd, "id": IDCmd, From 4a2c3cb3cf86929f3219c43a5aaf708e4863980d Mon Sep 17 00:00:00 2001 From: Overbool Date: Sun, 28 Oct 2018 12:41:31 +0800 Subject: [PATCH 2/2] commands/dht: use res.Emit directly License: MIT Signed-off-by: Overbool --- core/commands/dht.go | 128 +++++++++++++++---------------------------- 1 file changed, 43 insertions(+), 85 deletions(-) diff --git a/core/commands/dht.go b/core/commands/dht.go index 43bafabdb..0b8916a5a 100644 --- a/core/commands/dht.go +++ b/core/commands/dht.go @@ -8,18 +8,18 @@ import ( "time" cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv" - dag "gx/ipfs/QmSei8kFMfqdJq7Q68d2LMnHbTWKKg2daA29ezUYFAUNgc/go-merkledag" - path "gx/ipfs/QmT3rzed1ppXefourpmoZ7tyVQfsGPQZ1pHDngLmCvXxd3/go-path" cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" ipld "gx/ipfs/QmR7TcHkR9nxkUorfi8XMTAMLUK7GiP64TWWBzY3aacc1o/go-ipld-format" + cmds "gx/ipfs/QmSXUokcP4TJpFfqozT69AVAYRtzXVMUjzQVkYX41R9Svs/go-ipfs-cmds" + dag "gx/ipfs/QmSei8kFMfqdJq7Q68d2LMnHbTWKKg2daA29ezUYFAUNgc/go-merkledag" + path "gx/ipfs/QmT3rzed1ppXefourpmoZ7tyVQfsGPQZ1pHDngLmCvXxd3/go-path" peer "gx/ipfs/QmTRhk7cgjUf2gfQ3p2M9KPECNZEW9XUrmHcFCgog4cPgB/go-libp2p-peer" pstore "gx/ipfs/QmTTJcDL3gsnGDALjh2fDGg1onGRUdVgNL2hU2WEZcVrMX/go-libp2p-peerstore" b58 "gx/ipfs/QmWFAMPqsEyUX7gDUsRVmMWz59FxSpJ1b2v6bJ1yYzo7jY/go-base58-fast/base58" routing "gx/ipfs/QmcQ81jSyWCp1jpkQ8CMbtpXT3jK7Wg6ZtYmoyWFgBoF9c/go-libp2p-routing" notif "gx/ipfs/QmcQ81jSyWCp1jpkQ8CMbtpXT3jK7Wg6ZtYmoyWFgBoF9c/go-libp2p-routing/notifications" - "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit" - cmds "gx/ipfs/QmSXUokcP4TJpFfqozT69AVAYRtzXVMUjzQVkYX41R9Svs/go-ipfs-cmds" + cmdkit "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit" ) var ErrNotDHT = errors.New("routing service is not a DHT") @@ -93,21 +93,13 @@ var queryDhtCmd = &cmds.Command{ } }() - outChan := make(chan interface{}) - - go func() { - defer close(outChan) - for e := range events { - select { - case outChan <- e: - case <-req.Context.Done(): - return - } + for e := range events { + if err := res.Emit(e); err != nil { + return err } - }() - - return res.Emit(outChan) + } + return nil }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error { @@ -167,22 +159,10 @@ var findProvidersDhtCmd = &cmds.Command{ return err } - outChan := make(chan interface{}) - ctx, cancel := context.WithCancel(req.Context) ctx, events := notif.RegisterForQueryEvents(ctx) pchan := n.Routing.FindProvidersAsync(ctx, c, numProviders) - go func() { - defer close(outChan) - for e := range events { - select { - case outChan <- e: - case <-req.Context.Done(): - return - } - } - }() go func() { defer cancel() @@ -194,8 +174,13 @@ var findProvidersDhtCmd = &cmds.Command{ }) } }() + for e := range events { + if err := res.Emit(e); err != nil { + return err + } + } - return res.Emit(outChan) + return nil }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error { @@ -279,22 +264,9 @@ var provideRefDhtCmd = &cmds.Command{ cids = append(cids, c) } - outChan := make(chan interface{}) - ctx, cancel := context.WithCancel(req.Context) ctx, events := notif.RegisterForQueryEvents(ctx) - go func() { - defer close(outChan) - for e := range events { - select { - case outChan <- e: - case <-req.Context.Done(): - return - } - } - }() - go func() { defer cancel() var err error @@ -311,7 +283,13 @@ var provideRefDhtCmd = &cmds.Command{ } }() - return res.Emit(outChan) + for e := range events { + if err := res.Emit(e); err != nil { + return err + } + } + + return nil }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error { @@ -395,22 +373,9 @@ var findPeerDhtCmd = &cmds.Command{ return err } - outChan := make(chan interface{}) - ctx, cancel := context.WithCancel(req.Context) ctx, events := notif.RegisterForQueryEvents(ctx) - go func() { - defer close(outChan) - for v := range events { - select { - case outChan <- v: - case <-req.Context.Done(): - } - - } - }() - go func() { defer cancel() pi, err := nd.Routing.FindPeer(ctx, pid) @@ -428,7 +393,13 @@ var findPeerDhtCmd = &cmds.Command{ }) }() - return res.Emit(outChan) + for e := range events { + if err := res.Emit(e); err != nil { + return err + } + } + + return nil }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error { @@ -484,21 +455,9 @@ Different key types can specify other 'best' rules. return err } - outChan := make(chan interface{}) - ctx, cancel := context.WithCancel(req.Context) ctx, events := notif.RegisterForQueryEvents(ctx) - go func() { - defer close(outChan) - for e := range events { - select { - case outChan <- e: - case <-req.Context.Done(): - } - } - }() - go func() { defer cancel() val, err := nd.Routing.GetValue(ctx, dhtkey) @@ -515,7 +474,13 @@ Different key types can specify other 'best' rules. } }() - return res.Emit(outChan) + for e := range events { + if err := res.Emit(e); err != nil { + return err + } + } + + return nil }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error { @@ -582,24 +547,11 @@ NOTE: A value may not exceed 2048 bytes. return err } - outChan := make(chan interface{}) - data := req.Arguments[1] ctx, cancel := context.WithCancel(req.Context) ctx, events := notif.RegisterForQueryEvents(ctx) - go func() { - defer close(outChan) - for e := range events { - select { - case outChan <- e: - case <-req.Context.Done(): - return - } - } - }() - go func() { defer cancel() err := nd.Routing.PutValue(ctx, key, []byte(data)) @@ -611,7 +563,13 @@ NOTE: A value may not exceed 2048 bytes. } }() - return res.Emit(outChan) + for e := range events { + if err := res.Emit(e); err != nil { + return err + } + } + + return nil }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error {