From e41ac96207abfee7eaf3fdb813834cf028d12275 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 19 Oct 2018 17:08:22 +0100 Subject: [PATCH] switch to new raceless routing event interface fixes #5616 License: MIT Signed-off-by: Steven Allen --- core/commands/dht.go | 42 ++++++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/core/commands/dht.go b/core/commands/dht.go index 617476596..8d7855a24 100644 --- a/core/commands/dht.go +++ b/core/commands/dht.go @@ -72,23 +72,24 @@ var queryDhtCmd = &cmds.Command{ return } - events := make(chan *notif.QueryEvent) - ctx := notif.RegisterForQueryEvents(req.Context(), events) - id, err := peer.IDB58Decode(req.Arguments()[0]) if err != nil { res.SetError(cmds.ClientError("invalid peer ID"), cmdkit.ErrClient) return } + ctx, cancel := context.WithCancel(req.Context()) + ctx, events := notif.RegisterForQueryEvents(ctx) + closestPeers, err := n.DHT.GetClosestPeers(ctx, string(id)) if err != nil { + cancel() res.SetError(err, cmdkit.ErrNormal) return } go func() { - defer close(events) + defer cancel() for p := range closestPeers { notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ ID: p, @@ -182,8 +183,6 @@ var findProvidersDhtCmd = &cmds.Command{ return } - events := make(chan *notif.QueryEvent) - ctx := notif.RegisterForQueryEvents(req.Context(), events) c, err := cid.Parse(req.Arguments()[0]) if err != nil { @@ -194,6 +193,9 @@ var findProvidersDhtCmd = &cmds.Command{ outChan := make(chan interface{}) res.SetOutput((<-chan interface{})(outChan)) + ctx, cancel := context.WithCancel(req.Context()) + ctx, events := notif.RegisterForQueryEvents(ctx) + pchan := n.Routing.FindProvidersAsync(ctx, c, numProviders) go func() { defer close(outChan) @@ -207,7 +209,7 @@ var findProvidersDhtCmd = &cmds.Command{ }() go func() { - defer close(events) + defer cancel() for p := range pchan { np := p notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ @@ -320,8 +322,8 @@ var provideRefDhtCmd = &cmds.Command{ outChan := make(chan interface{}) res.SetOutput((<-chan interface{})(outChan)) - events := make(chan *notif.QueryEvent) - ctx := notif.RegisterForQueryEvents(req.Context(), events) + ctx, cancel := context.WithCancel(req.Context()) + ctx, events := notif.RegisterForQueryEvents(ctx) go func() { defer close(outChan) @@ -335,7 +337,7 @@ var provideRefDhtCmd = &cmds.Command{ }() go func() { - defer close(events) + defer cancel() var err error if rec { err = provideKeysRec(ctx, n.Routing, n.DAG, cids) @@ -449,8 +451,8 @@ var findPeerDhtCmd = &cmds.Command{ outChan := make(chan interface{}) res.SetOutput((<-chan interface{})(outChan)) - events := make(chan *notif.QueryEvent) - ctx := notif.RegisterForQueryEvents(req.Context(), events) + ctx, cancel := context.WithCancel(req.Context()) + ctx, events := notif.RegisterForQueryEvents(ctx) go func() { defer close(outChan) @@ -464,7 +466,7 @@ var findPeerDhtCmd = &cmds.Command{ }() go func() { - defer close(events) + defer cancel() pi, err := n.Routing.FindPeer(ctx, pid) if err != nil { notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ @@ -554,8 +556,8 @@ Different key types can specify other 'best' rules. outChan := make(chan interface{}) res.SetOutput((<-chan interface{})(outChan)) - events := make(chan *notif.QueryEvent) - ctx := notif.RegisterForQueryEvents(req.Context(), events) + ctx, cancel := context.WithCancel(req.Context()) + ctx, events := notif.RegisterForQueryEvents(ctx) go func() { defer close(outChan) @@ -568,7 +570,7 @@ Different key types can specify other 'best' rules. }() go func() { - defer close(events) + defer cancel() val, err := n.Routing.GetValue(ctx, dhtkey) if err != nil { notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ @@ -659,9 +661,6 @@ NOTE: A value may not exceed 2048 bytes. return } - events := make(chan *notif.QueryEvent) - ctx := notif.RegisterForQueryEvents(req.Context(), events) - key, err := escapeDhtKey(req.Arguments()[0]) if err != nil { res.SetError(err, cmdkit.ErrNormal) @@ -673,6 +672,9 @@ NOTE: A value may not exceed 2048 bytes. data := req.Arguments()[1] + ctx, cancel := context.WithCancel(req.Context()) + ctx, events := notif.RegisterForQueryEvents(ctx) + go func() { defer close(outChan) for e := range events { @@ -685,7 +687,7 @@ NOTE: A value may not exceed 2048 bytes. }() go func() { - defer close(events) + defer cancel() err := n.Routing.PutValue(ctx, key, []byte(data)) if err != nil { notif.PublishQueryEvent(ctx, ¬if.QueryEvent{