From 03324f776517a681383eea2eae6dcf5b9dcd9a9b Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Fri, 21 Nov 2014 17:18:40 -0800 Subject: [PATCH] fix(bitswap/notifications) subscribe to many License: MIT Signed-off-by: Brian Tiger Chow --- .../bitswap/notifications/notifications.go | 51 +++++++++++++++++-- 1 file changed, 47 insertions(+), 4 deletions(-) diff --git a/exchange/bitswap/notifications/notifications.go b/exchange/bitswap/notifications/notifications.go index 1de7bf909..74833810a 100644 --- a/exchange/bitswap/notifications/notifications.go +++ b/exchange/bitswap/notifications/notifications.go @@ -29,10 +29,7 @@ func (ps *impl) Publish(block *blocks.Block) { ps.wrapped.Pub(block, topic) } -// Subscribe returns a channel of blocks for the given |keys|. |blockChannel| -// is closed if the |ctx| times out or is cancelled, or after sending len(keys) -// blocks. -func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block { +func (ps *impl) SubscribeDeprec(ctx context.Context, keys ...u.Key) <-chan *blocks.Block { topics := make([]string, 0) for _, key := range keys { topics = append(topics, string(key)) @@ -57,3 +54,49 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Blo func (ps *impl) Shutdown() { ps.wrapped.Shutdown() } + +// Subscribe returns a channel of blocks for the given |keys|. |blockChannel| +// is closed if the |ctx| times out or is cancelled, or after sending len(keys) +// blocks. +func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block { + topics := toStrings(keys) + blocksCh := make(chan *blocks.Block, len(keys)) + valuesCh := make(chan interface{}, len(keys)) + ps.wrapped.AddSub(valuesCh, topics...) + + go func() { + defer func() { + ps.wrapped.Unsub(valuesCh, topics...) + close(blocksCh) + }() + for _, _ = range keys { + select { + case <-ctx.Done(): + return + case val, ok := <-valuesCh: + if !ok { + return + } + block, ok := val.(*blocks.Block) + if !ok { + return + } + select { + case <-ctx.Done(): + return + case blocksCh <- block: // continue + } + } + } + }() + + return blocksCh +} + +func toStrings(keys []u.Key) []string { + strs := make([]string, 0) + for _, key := range keys { + strs = append(strs, string(key)) + } + return strs +}