mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-26 04:47:45 +08:00
fix(bitswap/notifications) subscribe to many
License: MIT Signed-off-by: Brian Tiger Chow <brian@perfmode.com>
This commit is contained in:
parent
1fb80330da
commit
03324f7765
@ -29,10 +29,7 @@ func (ps *impl) Publish(block *blocks.Block) {
|
||||
ps.wrapped.Pub(block, topic)
|
||||
}
|
||||
|
||||
// Subscribe returns a channel of blocks for the given |keys|. |blockChannel|
|
||||
// is closed if the |ctx| times out or is cancelled, or after sending len(keys)
|
||||
// blocks.
|
||||
func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block {
|
||||
func (ps *impl) SubscribeDeprec(ctx context.Context, keys ...u.Key) <-chan *blocks.Block {
|
||||
topics := make([]string, 0)
|
||||
for _, key := range keys {
|
||||
topics = append(topics, string(key))
|
||||
@ -57,3 +54,49 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Blo
|
||||
func (ps *impl) Shutdown() {
|
||||
ps.wrapped.Shutdown()
|
||||
}
|
||||
|
||||
// Subscribe returns a channel of blocks for the given |keys|. |blockChannel|
|
||||
// is closed if the |ctx| times out or is cancelled, or after sending len(keys)
|
||||
// blocks.
|
||||
func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block {
|
||||
topics := toStrings(keys)
|
||||
blocksCh := make(chan *blocks.Block, len(keys))
|
||||
valuesCh := make(chan interface{}, len(keys))
|
||||
ps.wrapped.AddSub(valuesCh, topics...)
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
ps.wrapped.Unsub(valuesCh, topics...)
|
||||
close(blocksCh)
|
||||
}()
|
||||
for _, _ = range keys {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case val, ok := <-valuesCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
block, ok := val.(*blocks.Block)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case blocksCh <- block: // continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return blocksCh
|
||||
}
|
||||
|
||||
func toStrings(keys []u.Key) []string {
|
||||
strs := make([]string, 0)
|
||||
for _, key := range keys {
|
||||
strs = append(strs, string(key))
|
||||
}
|
||||
return strs
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user