From 60e288ed4b44e21d3e3911eda8f4dc0d18662291 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Sun, 18 Jan 2015 23:37:04 -0800 Subject: [PATCH 1/2] fix(bitswap.decision.Engine) enqueue only the freshest messages Before, the engine worker would pop a task and block on send to the bitswap worker even if the bitswap worker wasn't to receive. Since the task could have been invalidated during this blocking send, a small number of stale (already acquired) blocks would be send to partners. Now, tasks are only popped off of the queue when bitswap is ready to send them over the wire. This is accomplished by removing the outboxChanBuffer and implementing a two-phase communication sequence. --- exchange/bitswap/bitswap.go | 11 ++-- exchange/bitswap/decision/engine.go | 82 ++++++++++++++---------- exchange/bitswap/decision/engine_test.go | 53 +++++++++------ 3 files changed, 89 insertions(+), 57 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index fe6b8d7c4..f27f0cc36 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -277,10 +277,13 @@ func (bs *bitswap) taskWorker(ctx context.Context) { case <-ctx.Done(): log.Debugf("exiting") return - case envelope := <-bs.engine.Outbox(): - log.Debugf("message to %s sending...", envelope.Peer) - bs.send(ctx, envelope.Peer, envelope.Message) - log.Debugf("message to %s sent", envelope.Peer) + case nextEnvelope := <-bs.engine.Outbox(): + select { + case <-ctx.Done(): + return + case envelope := <-nextEnvelope: + bs.send(ctx, envelope.Peer, envelope.Message) + } } } } diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index ea0491c2c..b84732e82 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -44,7 +44,8 @@ import ( var log = eventlog.Logger("engine") const ( - sizeOutboxChan = 4 + // outboxChanBuffer must be 0 to prevent stale messages from being sent + outboxChanBuffer = 0 ) // Envelope contains a message for a Peer @@ -68,8 +69,9 @@ type Engine struct { // that case, no lock would be required. workSignal chan struct{} - // outbox contains outgoing messages to peers - outbox chan Envelope + // outbox contains outgoing messages to peers. This is owned by the + // taskWorker goroutine + outbox chan (<-chan Envelope) bs bstore.Blockstore @@ -83,7 +85,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { ledgerMap: make(map[peer.ID]*ledger), bs: bs, peerRequestQueue: newPRQ(), - outbox: make(chan Envelope, sizeOutboxChan), + outbox: make(chan (<-chan Envelope), outboxChanBuffer), workSignal: make(chan struct{}), } go e.taskWorker(ctx) @@ -91,45 +93,55 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { } func (e *Engine) taskWorker(ctx context.Context) { - log := log.Prefix("bitswap.Engine.taskWorker") + defer close(e.outbox) // because taskWorker uses the channel exclusively for { - nextTask := e.peerRequestQueue.Pop() - if nextTask == nil { - // No tasks in the list? - // Wait until there are! - select { - case <-ctx.Done(): - log.Debugf("exiting: %s", ctx.Err()) - return - case <-e.workSignal: - log.Debugf("woken up") - } - continue - } - log := log.Prefix("%s", nextTask) - log.Debugf("processing") - - block, err := e.bs.Get(nextTask.Entry.Key) - if err != nil { - log.Warning("engine: task exists to send block, but block is not in blockstore") - continue - } - // construct message here so we can make decisions about any additional - // information we may want to include at this time. - m := bsmsg.New() - m.AddBlock(block) - // TODO: maybe add keys from our wantlist? - log.Debugf("sending...") + oneTimeUse := make(chan Envelope, 1) // buffer to prevent blocking select { case <-ctx.Done(): return - case e.outbox <- Envelope{Peer: nextTask.Target, Message: m}: - log.Debugf("sent") + case e.outbox <- oneTimeUse: } + // receiver is ready for an outoing envelope. let's prepare one. first, + // we must acquire a task from the PQ... + envelope, err := e.nextEnvelope(ctx) + if err != nil { + close(oneTimeUse) + return // ctx cancelled + } + oneTimeUse <- *envelope // buffered. won't block + close(oneTimeUse) } } -func (e *Engine) Outbox() <-chan Envelope { +// nextEnvelope runs in the taskWorker goroutine. Returns an error if the +// context is cancelled before the next Envelope can be created. +func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { + for { + nextTask := e.peerRequestQueue.Pop() + for nextTask == nil { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-e.workSignal: + nextTask = e.peerRequestQueue.Pop() + } + } + + // with a task in hand, we're ready to prepare the envelope... + + block, err := e.bs.Get(nextTask.Entry.Key) + if err != nil { + continue + } + + m := bsmsg.New() // TODO: maybe add keys from our wantlist? + m.AddBlock(block) + return &Envelope{Peer: nextTask.Target, Message: m}, nil + } +} + +// Outbox returns a channel of one-time use Envelope channels. +func (e *Engine) Outbox() <-chan (<-chan Envelope) { return e.outbox } diff --git a/exchange/bitswap/decision/engine_test.go b/exchange/bitswap/decision/engine_test.go index b2583a020..8e5ab672c 100644 --- a/exchange/bitswap/decision/engine_test.go +++ b/exchange/bitswap/decision/engine_test.go @@ -1,6 +1,8 @@ package decision import ( + "errors" + "fmt" "math" "strings" "sync" @@ -104,7 +106,8 @@ func TestOutboxClosedWhenEngineClosed(t *testing.T) { var wg sync.WaitGroup wg.Add(1) go func() { - for _ = range e.Outbox() { + for nextEnvelope := range e.Outbox() { + <-nextEnvelope } wg.Done() }() @@ -116,6 +119,10 @@ func TestOutboxClosedWhenEngineClosed(t *testing.T) { } func TestPartnerWantsThenCancels(t *testing.T) { + numRounds := 10 + if testing.Short() { + numRounds = 1 + } alphabet := strings.Split("abcdefghijklmnopqrstuvwxyz", "") vowels := strings.Split("aeiou", "") @@ -129,23 +136,31 @@ func TestPartnerWantsThenCancels(t *testing.T) { }, } - for _, testcase := range testcases { - set := testcase[0] - cancels := testcase[1] - keeps := stringsComplement(set, cancels) - - bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) - e := NewEngine(context.Background(), bs) - partner := testutil.RandPeerIDFatal(t) - for _, letter := range set { - block := blocks.NewBlock([]byte(letter)) - bs.Put(block) + bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + for _, letter := range alphabet { + block := blocks.NewBlock([]byte(letter)) + if err := bs.Put(block); err != nil { + t.Fatal(err) } - partnerWants(e, set, partner) - partnerCancels(e, cancels, partner) - assertPoppedInOrder(t, e, keeps) } + for i := 0; i < numRounds; i++ { + for _, testcase := range testcases { + set := testcase[0] + cancels := testcase[1] + keeps := stringsComplement(set, cancels) + + e := NewEngine(context.Background(), bs) + partner := testutil.RandPeerIDFatal(t) + + partnerWants(e, set, partner) + partnerCancels(e, cancels, partner) + if err := checkHandledInOrder(t, e, keeps); err != nil { + t.Logf("run #%d of %d", i, numRounds) + t.Fatal(err) + } + } + } } func partnerWants(e *Engine, keys []string, partner peer.ID) { @@ -166,15 +181,17 @@ func partnerCancels(e *Engine, keys []string, partner peer.ID) { e.MessageReceived(partner, cancels) } -func assertPoppedInOrder(t *testing.T, e *Engine, keys []string) { +func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error { for _, k := range keys { - envelope := <-e.Outbox() + next := <-e.Outbox() + envelope := <-next received := envelope.Message.Blocks()[0] expected := blocks.NewBlock([]byte(k)) if received.Key() != expected.Key() { - t.Fatal("received", string(received.Data), "expected", string(expected.Data)) + return errors.New(fmt.Sprintln("received", string(received.Data), "expected", string(expected.Data))) } } + return nil } func stringsComplement(set, subset []string) []string { From 598585409993a6c41ad2acd34d5cfb28bb84bb83 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Mon, 19 Jan 2015 02:35:09 -0800 Subject: [PATCH 2/2] fix: return pointer @whyrusleeping --- exchange/bitswap/bitswap.go | 5 ++++- exchange/bitswap/decision/engine.go | 10 +++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index f27f0cc36..dfa72ff2f 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -281,7 +281,10 @@ func (bs *bitswap) taskWorker(ctx context.Context) { select { case <-ctx.Done(): return - case envelope := <-nextEnvelope: + case envelope, ok := <-nextEnvelope: + if !ok { + continue + } bs.send(ctx, envelope.Peer, envelope.Message) } } diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index b84732e82..05687b312 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -71,7 +71,7 @@ type Engine struct { // outbox contains outgoing messages to peers. This is owned by the // taskWorker goroutine - outbox chan (<-chan Envelope) + outbox chan (<-chan *Envelope) bs bstore.Blockstore @@ -85,7 +85,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { ledgerMap: make(map[peer.ID]*ledger), bs: bs, peerRequestQueue: newPRQ(), - outbox: make(chan (<-chan Envelope), outboxChanBuffer), + outbox: make(chan (<-chan *Envelope), outboxChanBuffer), workSignal: make(chan struct{}), } go e.taskWorker(ctx) @@ -95,7 +95,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { func (e *Engine) taskWorker(ctx context.Context) { defer close(e.outbox) // because taskWorker uses the channel exclusively for { - oneTimeUse := make(chan Envelope, 1) // buffer to prevent blocking + oneTimeUse := make(chan *Envelope, 1) // buffer to prevent blocking select { case <-ctx.Done(): return @@ -108,7 +108,7 @@ func (e *Engine) taskWorker(ctx context.Context) { close(oneTimeUse) return // ctx cancelled } - oneTimeUse <- *envelope // buffered. won't block + oneTimeUse <- envelope // buffered. won't block close(oneTimeUse) } } @@ -141,7 +141,7 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { } // Outbox returns a channel of one-time use Envelope channels. -func (e *Engine) Outbox() <-chan (<-chan Envelope) { +func (e *Engine) Outbox() <-chan (<-chan *Envelope) { return e.outbox }