diff --git a/core/commands/dht.go b/core/commands/dht.go index 43bafabdb..0b8916a5a 100644 --- a/core/commands/dht.go +++ b/core/commands/dht.go @@ -8,18 +8,18 @@ import ( "time" cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv" - dag "gx/ipfs/QmSei8kFMfqdJq7Q68d2LMnHbTWKKg2daA29ezUYFAUNgc/go-merkledag" - path "gx/ipfs/QmT3rzed1ppXefourpmoZ7tyVQfsGPQZ1pHDngLmCvXxd3/go-path" cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" ipld "gx/ipfs/QmR7TcHkR9nxkUorfi8XMTAMLUK7GiP64TWWBzY3aacc1o/go-ipld-format" + cmds "gx/ipfs/QmSXUokcP4TJpFfqozT69AVAYRtzXVMUjzQVkYX41R9Svs/go-ipfs-cmds" + dag "gx/ipfs/QmSei8kFMfqdJq7Q68d2LMnHbTWKKg2daA29ezUYFAUNgc/go-merkledag" + path "gx/ipfs/QmT3rzed1ppXefourpmoZ7tyVQfsGPQZ1pHDngLmCvXxd3/go-path" peer "gx/ipfs/QmTRhk7cgjUf2gfQ3p2M9KPECNZEW9XUrmHcFCgog4cPgB/go-libp2p-peer" pstore "gx/ipfs/QmTTJcDL3gsnGDALjh2fDGg1onGRUdVgNL2hU2WEZcVrMX/go-libp2p-peerstore" b58 "gx/ipfs/QmWFAMPqsEyUX7gDUsRVmMWz59FxSpJ1b2v6bJ1yYzo7jY/go-base58-fast/base58" routing "gx/ipfs/QmcQ81jSyWCp1jpkQ8CMbtpXT3jK7Wg6ZtYmoyWFgBoF9c/go-libp2p-routing" notif "gx/ipfs/QmcQ81jSyWCp1jpkQ8CMbtpXT3jK7Wg6ZtYmoyWFgBoF9c/go-libp2p-routing/notifications" - "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit" - cmds "gx/ipfs/QmSXUokcP4TJpFfqozT69AVAYRtzXVMUjzQVkYX41R9Svs/go-ipfs-cmds" + cmdkit "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit" ) var ErrNotDHT = errors.New("routing service is not a DHT") @@ -93,21 +93,13 @@ var queryDhtCmd = &cmds.Command{ } }() - outChan := make(chan interface{}) - - go func() { - defer close(outChan) - for e := range events { - select { - case outChan <- e: - case <-req.Context.Done(): - return - } + for e := range events { + if err := res.Emit(e); err != nil { + return err } - }() - - return res.Emit(outChan) + } + return nil }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error { @@ -167,22 +159,10 @@ var findProvidersDhtCmd = &cmds.Command{ return err } - outChan := make(chan interface{}) - ctx, cancel := context.WithCancel(req.Context) ctx, events := notif.RegisterForQueryEvents(ctx) pchan := n.Routing.FindProvidersAsync(ctx, c, numProviders) - go func() { - defer close(outChan) - for e := range events { - select { - case outChan <- e: - case <-req.Context.Done(): - return - } - } - }() go func() { defer cancel() @@ -194,8 +174,13 @@ var findProvidersDhtCmd = &cmds.Command{ }) } }() + for e := range events { + if err := res.Emit(e); err != nil { + return err + } + } - return res.Emit(outChan) + return nil }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error { @@ -279,22 +264,9 @@ var provideRefDhtCmd = &cmds.Command{ cids = append(cids, c) } - outChan := make(chan interface{}) - ctx, cancel := context.WithCancel(req.Context) ctx, events := notif.RegisterForQueryEvents(ctx) - go func() { - defer close(outChan) - for e := range events { - select { - case outChan <- e: - case <-req.Context.Done(): - return - } - } - }() - go func() { defer cancel() var err error @@ -311,7 +283,13 @@ var provideRefDhtCmd = &cmds.Command{ } }() - return res.Emit(outChan) + for e := range events { + if err := res.Emit(e); err != nil { + return err + } + } + + return nil }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error { @@ -395,22 +373,9 @@ var findPeerDhtCmd = &cmds.Command{ return err } - outChan := make(chan interface{}) - ctx, cancel := context.WithCancel(req.Context) ctx, events := notif.RegisterForQueryEvents(ctx) - go func() { - defer close(outChan) - for v := range events { - select { - case outChan <- v: - case <-req.Context.Done(): - } - - } - }() - go func() { defer cancel() pi, err := nd.Routing.FindPeer(ctx, pid) @@ -428,7 +393,13 @@ var findPeerDhtCmd = &cmds.Command{ }) }() - return res.Emit(outChan) + for e := range events { + if err := res.Emit(e); err != nil { + return err + } + } + + return nil }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error { @@ -484,21 +455,9 @@ Different key types can specify other 'best' rules. return err } - outChan := make(chan interface{}) - ctx, cancel := context.WithCancel(req.Context) ctx, events := notif.RegisterForQueryEvents(ctx) - go func() { - defer close(outChan) - for e := range events { - select { - case outChan <- e: - case <-req.Context.Done(): - } - } - }() - go func() { defer cancel() val, err := nd.Routing.GetValue(ctx, dhtkey) @@ -515,7 +474,13 @@ Different key types can specify other 'best' rules. } }() - return res.Emit(outChan) + for e := range events { + if err := res.Emit(e); err != nil { + return err + } + } + + return nil }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error { @@ -582,24 +547,11 @@ NOTE: A value may not exceed 2048 bytes. return err } - outChan := make(chan interface{}) - data := req.Arguments[1] ctx, cancel := context.WithCancel(req.Context) ctx, events := notif.RegisterForQueryEvents(ctx) - go func() { - defer close(outChan) - for e := range events { - select { - case outChan <- e: - case <-req.Context.Done(): - return - } - } - }() - go func() { defer cancel() err := nd.Routing.PutValue(ctx, key, []byte(data)) @@ -611,7 +563,13 @@ NOTE: A value may not exceed 2048 bytes. } }() - return res.Emit(outChan) + for e := range events { + if err := res.Emit(e); err != nil { + return err + } + } + + return nil }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error {