mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-07 09:17:49 +08:00
switch to new raceless routing event interface
fixes #5616 License: MIT Signed-off-by: Steven Allen <steven@stebalien.com>
This commit is contained in:
parent
c97c3459be
commit
e41ac96207
@ -72,23 +72,24 @@ var queryDhtCmd = &cmds.Command{
|
||||
return
|
||||
}
|
||||
|
||||
events := make(chan *notif.QueryEvent)
|
||||
ctx := notif.RegisterForQueryEvents(req.Context(), events)
|
||||
|
||||
id, err := peer.IDB58Decode(req.Arguments()[0])
|
||||
if err != nil {
|
||||
res.SetError(cmds.ClientError("invalid peer ID"), cmdkit.ErrClient)
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(req.Context())
|
||||
ctx, events := notif.RegisterForQueryEvents(ctx)
|
||||
|
||||
closestPeers, err := n.DHT.GetClosestPeers(ctx, string(id))
|
||||
if err != nil {
|
||||
cancel()
|
||||
res.SetError(err, cmdkit.ErrNormal)
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer close(events)
|
||||
defer cancel()
|
||||
for p := range closestPeers {
|
||||
notif.PublishQueryEvent(ctx, ¬if.QueryEvent{
|
||||
ID: p,
|
||||
@ -182,8 +183,6 @@ var findProvidersDhtCmd = &cmds.Command{
|
||||
return
|
||||
}
|
||||
|
||||
events := make(chan *notif.QueryEvent)
|
||||
ctx := notif.RegisterForQueryEvents(req.Context(), events)
|
||||
c, err := cid.Parse(req.Arguments()[0])
|
||||
|
||||
if err != nil {
|
||||
@ -194,6 +193,9 @@ var findProvidersDhtCmd = &cmds.Command{
|
||||
outChan := make(chan interface{})
|
||||
res.SetOutput((<-chan interface{})(outChan))
|
||||
|
||||
ctx, cancel := context.WithCancel(req.Context())
|
||||
ctx, events := notif.RegisterForQueryEvents(ctx)
|
||||
|
||||
pchan := n.Routing.FindProvidersAsync(ctx, c, numProviders)
|
||||
go func() {
|
||||
defer close(outChan)
|
||||
@ -207,7 +209,7 @@ var findProvidersDhtCmd = &cmds.Command{
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer close(events)
|
||||
defer cancel()
|
||||
for p := range pchan {
|
||||
np := p
|
||||
notif.PublishQueryEvent(ctx, ¬if.QueryEvent{
|
||||
@ -320,8 +322,8 @@ var provideRefDhtCmd = &cmds.Command{
|
||||
outChan := make(chan interface{})
|
||||
res.SetOutput((<-chan interface{})(outChan))
|
||||
|
||||
events := make(chan *notif.QueryEvent)
|
||||
ctx := notif.RegisterForQueryEvents(req.Context(), events)
|
||||
ctx, cancel := context.WithCancel(req.Context())
|
||||
ctx, events := notif.RegisterForQueryEvents(ctx)
|
||||
|
||||
go func() {
|
||||
defer close(outChan)
|
||||
@ -335,7 +337,7 @@ var provideRefDhtCmd = &cmds.Command{
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer close(events)
|
||||
defer cancel()
|
||||
var err error
|
||||
if rec {
|
||||
err = provideKeysRec(ctx, n.Routing, n.DAG, cids)
|
||||
@ -449,8 +451,8 @@ var findPeerDhtCmd = &cmds.Command{
|
||||
outChan := make(chan interface{})
|
||||
res.SetOutput((<-chan interface{})(outChan))
|
||||
|
||||
events := make(chan *notif.QueryEvent)
|
||||
ctx := notif.RegisterForQueryEvents(req.Context(), events)
|
||||
ctx, cancel := context.WithCancel(req.Context())
|
||||
ctx, events := notif.RegisterForQueryEvents(ctx)
|
||||
|
||||
go func() {
|
||||
defer close(outChan)
|
||||
@ -464,7 +466,7 @@ var findPeerDhtCmd = &cmds.Command{
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer close(events)
|
||||
defer cancel()
|
||||
pi, err := n.Routing.FindPeer(ctx, pid)
|
||||
if err != nil {
|
||||
notif.PublishQueryEvent(ctx, ¬if.QueryEvent{
|
||||
@ -554,8 +556,8 @@ Different key types can specify other 'best' rules.
|
||||
outChan := make(chan interface{})
|
||||
res.SetOutput((<-chan interface{})(outChan))
|
||||
|
||||
events := make(chan *notif.QueryEvent)
|
||||
ctx := notif.RegisterForQueryEvents(req.Context(), events)
|
||||
ctx, cancel := context.WithCancel(req.Context())
|
||||
ctx, events := notif.RegisterForQueryEvents(ctx)
|
||||
|
||||
go func() {
|
||||
defer close(outChan)
|
||||
@ -568,7 +570,7 @@ Different key types can specify other 'best' rules.
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer close(events)
|
||||
defer cancel()
|
||||
val, err := n.Routing.GetValue(ctx, dhtkey)
|
||||
if err != nil {
|
||||
notif.PublishQueryEvent(ctx, ¬if.QueryEvent{
|
||||
@ -659,9 +661,6 @@ NOTE: A value may not exceed 2048 bytes.
|
||||
return
|
||||
}
|
||||
|
||||
events := make(chan *notif.QueryEvent)
|
||||
ctx := notif.RegisterForQueryEvents(req.Context(), events)
|
||||
|
||||
key, err := escapeDhtKey(req.Arguments()[0])
|
||||
if err != nil {
|
||||
res.SetError(err, cmdkit.ErrNormal)
|
||||
@ -673,6 +672,9 @@ NOTE: A value may not exceed 2048 bytes.
|
||||
|
||||
data := req.Arguments()[1]
|
||||
|
||||
ctx, cancel := context.WithCancel(req.Context())
|
||||
ctx, events := notif.RegisterForQueryEvents(ctx)
|
||||
|
||||
go func() {
|
||||
defer close(outChan)
|
||||
for e := range events {
|
||||
@ -685,7 +687,7 @@ NOTE: A value may not exceed 2048 bytes.
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer close(events)
|
||||
defer cancel()
|
||||
err := n.Routing.PutValue(ctx, key, []byte(data))
|
||||
if err != nil {
|
||||
notif.PublishQueryEvent(ctx, ¬if.QueryEvent{
|
||||
|
||||
Loading…
Reference in New Issue
Block a user