mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-01 14:28:02 +08:00
commands/dht: use res.Emit directly
License: MIT Signed-off-by: Overbool <overbool.xu@gmail.com>
This commit is contained in:
parent
7e7a04e21f
commit
4a2c3cb3cf
@ -8,18 +8,18 @@ import (
|
||||
"time"
|
||||
|
||||
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
|
||||
dag "gx/ipfs/QmSei8kFMfqdJq7Q68d2LMnHbTWKKg2daA29ezUYFAUNgc/go-merkledag"
|
||||
path "gx/ipfs/QmT3rzed1ppXefourpmoZ7tyVQfsGPQZ1pHDngLmCvXxd3/go-path"
|
||||
|
||||
cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
|
||||
ipld "gx/ipfs/QmR7TcHkR9nxkUorfi8XMTAMLUK7GiP64TWWBzY3aacc1o/go-ipld-format"
|
||||
cmds "gx/ipfs/QmSXUokcP4TJpFfqozT69AVAYRtzXVMUjzQVkYX41R9Svs/go-ipfs-cmds"
|
||||
dag "gx/ipfs/QmSei8kFMfqdJq7Q68d2LMnHbTWKKg2daA29ezUYFAUNgc/go-merkledag"
|
||||
path "gx/ipfs/QmT3rzed1ppXefourpmoZ7tyVQfsGPQZ1pHDngLmCvXxd3/go-path"
|
||||
peer "gx/ipfs/QmTRhk7cgjUf2gfQ3p2M9KPECNZEW9XUrmHcFCgog4cPgB/go-libp2p-peer"
|
||||
pstore "gx/ipfs/QmTTJcDL3gsnGDALjh2fDGg1onGRUdVgNL2hU2WEZcVrMX/go-libp2p-peerstore"
|
||||
b58 "gx/ipfs/QmWFAMPqsEyUX7gDUsRVmMWz59FxSpJ1b2v6bJ1yYzo7jY/go-base58-fast/base58"
|
||||
routing "gx/ipfs/QmcQ81jSyWCp1jpkQ8CMbtpXT3jK7Wg6ZtYmoyWFgBoF9c/go-libp2p-routing"
|
||||
notif "gx/ipfs/QmcQ81jSyWCp1jpkQ8CMbtpXT3jK7Wg6ZtYmoyWFgBoF9c/go-libp2p-routing/notifications"
|
||||
"gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
|
||||
cmds "gx/ipfs/QmSXUokcP4TJpFfqozT69AVAYRtzXVMUjzQVkYX41R9Svs/go-ipfs-cmds"
|
||||
cmdkit "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
|
||||
)
|
||||
|
||||
var ErrNotDHT = errors.New("routing service is not a DHT")
|
||||
@ -93,21 +93,13 @@ var queryDhtCmd = &cmds.Command{
|
||||
}
|
||||
}()
|
||||
|
||||
outChan := make(chan interface{})
|
||||
|
||||
go func() {
|
||||
defer close(outChan)
|
||||
for e := range events {
|
||||
select {
|
||||
case outChan <- e:
|
||||
case <-req.Context.Done():
|
||||
return
|
||||
}
|
||||
for e := range events {
|
||||
if err := res.Emit(e); err != nil {
|
||||
return err
|
||||
}
|
||||
}()
|
||||
|
||||
return res.Emit(outChan)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
Encoders: cmds.EncoderMap{
|
||||
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error {
|
||||
@ -167,22 +159,10 @@ var findProvidersDhtCmd = &cmds.Command{
|
||||
return err
|
||||
}
|
||||
|
||||
outChan := make(chan interface{})
|
||||
|
||||
ctx, cancel := context.WithCancel(req.Context)
|
||||
ctx, events := notif.RegisterForQueryEvents(ctx)
|
||||
|
||||
pchan := n.Routing.FindProvidersAsync(ctx, c, numProviders)
|
||||
go func() {
|
||||
defer close(outChan)
|
||||
for e := range events {
|
||||
select {
|
||||
case outChan <- e:
|
||||
case <-req.Context.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer cancel()
|
||||
@ -194,8 +174,13 @@ var findProvidersDhtCmd = &cmds.Command{
|
||||
})
|
||||
}
|
||||
}()
|
||||
for e := range events {
|
||||
if err := res.Emit(e); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return res.Emit(outChan)
|
||||
return nil
|
||||
},
|
||||
Encoders: cmds.EncoderMap{
|
||||
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error {
|
||||
@ -279,22 +264,9 @@ var provideRefDhtCmd = &cmds.Command{
|
||||
cids = append(cids, c)
|
||||
}
|
||||
|
||||
outChan := make(chan interface{})
|
||||
|
||||
ctx, cancel := context.WithCancel(req.Context)
|
||||
ctx, events := notif.RegisterForQueryEvents(ctx)
|
||||
|
||||
go func() {
|
||||
defer close(outChan)
|
||||
for e := range events {
|
||||
select {
|
||||
case outChan <- e:
|
||||
case <-req.Context.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer cancel()
|
||||
var err error
|
||||
@ -311,7 +283,13 @@ var provideRefDhtCmd = &cmds.Command{
|
||||
}
|
||||
}()
|
||||
|
||||
return res.Emit(outChan)
|
||||
for e := range events {
|
||||
if err := res.Emit(e); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
Encoders: cmds.EncoderMap{
|
||||
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error {
|
||||
@ -395,22 +373,9 @@ var findPeerDhtCmd = &cmds.Command{
|
||||
return err
|
||||
}
|
||||
|
||||
outChan := make(chan interface{})
|
||||
|
||||
ctx, cancel := context.WithCancel(req.Context)
|
||||
ctx, events := notif.RegisterForQueryEvents(ctx)
|
||||
|
||||
go func() {
|
||||
defer close(outChan)
|
||||
for v := range events {
|
||||
select {
|
||||
case outChan <- v:
|
||||
case <-req.Context.Done():
|
||||
}
|
||||
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer cancel()
|
||||
pi, err := nd.Routing.FindPeer(ctx, pid)
|
||||
@ -428,7 +393,13 @@ var findPeerDhtCmd = &cmds.Command{
|
||||
})
|
||||
}()
|
||||
|
||||
return res.Emit(outChan)
|
||||
for e := range events {
|
||||
if err := res.Emit(e); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
Encoders: cmds.EncoderMap{
|
||||
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error {
|
||||
@ -484,21 +455,9 @@ Different key types can specify other 'best' rules.
|
||||
return err
|
||||
}
|
||||
|
||||
outChan := make(chan interface{})
|
||||
|
||||
ctx, cancel := context.WithCancel(req.Context)
|
||||
ctx, events := notif.RegisterForQueryEvents(ctx)
|
||||
|
||||
go func() {
|
||||
defer close(outChan)
|
||||
for e := range events {
|
||||
select {
|
||||
case outChan <- e:
|
||||
case <-req.Context.Done():
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer cancel()
|
||||
val, err := nd.Routing.GetValue(ctx, dhtkey)
|
||||
@ -515,7 +474,13 @@ Different key types can specify other 'best' rules.
|
||||
}
|
||||
}()
|
||||
|
||||
return res.Emit(outChan)
|
||||
for e := range events {
|
||||
if err := res.Emit(e); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
Encoders: cmds.EncoderMap{
|
||||
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error {
|
||||
@ -582,24 +547,11 @@ NOTE: A value may not exceed 2048 bytes.
|
||||
return err
|
||||
}
|
||||
|
||||
outChan := make(chan interface{})
|
||||
|
||||
data := req.Arguments[1]
|
||||
|
||||
ctx, cancel := context.WithCancel(req.Context)
|
||||
ctx, events := notif.RegisterForQueryEvents(ctx)
|
||||
|
||||
go func() {
|
||||
defer close(outChan)
|
||||
for e := range events {
|
||||
select {
|
||||
case outChan <- e:
|
||||
case <-req.Context.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer cancel()
|
||||
err := nd.Routing.PutValue(ctx, key, []byte(data))
|
||||
@ -611,7 +563,13 @@ NOTE: A value may not exceed 2048 bytes.
|
||||
}
|
||||
}()
|
||||
|
||||
return res.Emit(outChan)
|
||||
for e := range events {
|
||||
if err := res.Emit(e); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
Encoders: cmds.EncoderMap{
|
||||
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user