Merge pull request #601 from jbenet/fix/bitswap-decision-engine-freshness

fix(bitswap.decision.Engine) send only the freshest messages
This commit is contained in:
Brian Tiger Chow 2015-01-19 20:45:36 -08:00
commit 7a322c9538
3 changed files with 92 additions and 57 deletions

View File

@ -277,10 +277,16 @@ func (bs *bitswap) taskWorker(ctx context.Context) {
case <-ctx.Done():
log.Debugf("exiting")
return
case envelope := <-bs.engine.Outbox():
log.Debugf("message to %s sending...", envelope.Peer)
bs.send(ctx, envelope.Peer, envelope.Message)
log.Debugf("message to %s sent", envelope.Peer)
case nextEnvelope := <-bs.engine.Outbox():
select {
case <-ctx.Done():
return
case envelope, ok := <-nextEnvelope:
if !ok {
continue
}
bs.send(ctx, envelope.Peer, envelope.Message)
}
}
}
}

View File

@ -44,7 +44,8 @@ import (
var log = eventlog.Logger("engine")
const (
sizeOutboxChan = 4
// outboxChanBuffer must be 0 to prevent stale messages from being sent
outboxChanBuffer = 0
)
// Envelope contains a message for a Peer
@ -68,8 +69,9 @@ type Engine struct {
// that case, no lock would be required.
workSignal chan struct{}
// outbox contains outgoing messages to peers
outbox chan Envelope
// outbox contains outgoing messages to peers. This is owned by the
// taskWorker goroutine
outbox chan (<-chan *Envelope)
bs bstore.Blockstore
@ -83,7 +85,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
ledgerMap: make(map[peer.ID]*ledger),
bs: bs,
peerRequestQueue: newPRQ(),
outbox: make(chan Envelope, sizeOutboxChan),
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}),
}
go e.taskWorker(ctx)
@ -91,45 +93,55 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
}
func (e *Engine) taskWorker(ctx context.Context) {
log := log.Prefix("bitswap.Engine.taskWorker")
defer close(e.outbox) // because taskWorker uses the channel exclusively
for {
nextTask := e.peerRequestQueue.Pop()
if nextTask == nil {
// No tasks in the list?
// Wait until there are!
select {
case <-ctx.Done():
log.Debugf("exiting: %s", ctx.Err())
return
case <-e.workSignal:
log.Debugf("woken up")
}
continue
}
log := log.Prefix("%s", nextTask)
log.Debugf("processing")
block, err := e.bs.Get(nextTask.Entry.Key)
if err != nil {
log.Warning("engine: task exists to send block, but block is not in blockstore")
continue
}
// construct message here so we can make decisions about any additional
// information we may want to include at this time.
m := bsmsg.New()
m.AddBlock(block)
// TODO: maybe add keys from our wantlist?
log.Debugf("sending...")
oneTimeUse := make(chan *Envelope, 1) // buffer to prevent blocking
select {
case <-ctx.Done():
return
case e.outbox <- Envelope{Peer: nextTask.Target, Message: m}:
log.Debugf("sent")
case e.outbox <- oneTimeUse:
}
// receiver is ready for an outoing envelope. let's prepare one. first,
// we must acquire a task from the PQ...
envelope, err := e.nextEnvelope(ctx)
if err != nil {
close(oneTimeUse)
return // ctx cancelled
}
oneTimeUse <- envelope // buffered. won't block
close(oneTimeUse)
}
}
func (e *Engine) Outbox() <-chan Envelope {
// nextEnvelope runs in the taskWorker goroutine. Returns an error if the
// context is cancelled before the next Envelope can be created.
func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
for {
nextTask := e.peerRequestQueue.Pop()
for nextTask == nil {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-e.workSignal:
nextTask = e.peerRequestQueue.Pop()
}
}
// with a task in hand, we're ready to prepare the envelope...
block, err := e.bs.Get(nextTask.Entry.Key)
if err != nil {
continue
}
m := bsmsg.New() // TODO: maybe add keys from our wantlist?
m.AddBlock(block)
return &Envelope{Peer: nextTask.Target, Message: m}, nil
}
}
// Outbox returns a channel of one-time use Envelope channels.
func (e *Engine) Outbox() <-chan (<-chan *Envelope) {
return e.outbox
}

View File

@ -1,6 +1,8 @@
package decision
import (
"errors"
"fmt"
"math"
"strings"
"sync"
@ -104,7 +106,8 @@ func TestOutboxClosedWhenEngineClosed(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
for _ = range e.Outbox() {
for nextEnvelope := range e.Outbox() {
<-nextEnvelope
}
wg.Done()
}()
@ -116,6 +119,10 @@ func TestOutboxClosedWhenEngineClosed(t *testing.T) {
}
func TestPartnerWantsThenCancels(t *testing.T) {
numRounds := 10
if testing.Short() {
numRounds = 1
}
alphabet := strings.Split("abcdefghijklmnopqrstuvwxyz", "")
vowels := strings.Split("aeiou", "")
@ -129,23 +136,31 @@ func TestPartnerWantsThenCancels(t *testing.T) {
},
}
for _, testcase := range testcases {
set := testcase[0]
cancels := testcase[1]
keeps := stringsComplement(set, cancels)
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
e := NewEngine(context.Background(), bs)
partner := testutil.RandPeerIDFatal(t)
for _, letter := range set {
block := blocks.NewBlock([]byte(letter))
bs.Put(block)
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
for _, letter := range alphabet {
block := blocks.NewBlock([]byte(letter))
if err := bs.Put(block); err != nil {
t.Fatal(err)
}
partnerWants(e, set, partner)
partnerCancels(e, cancels, partner)
assertPoppedInOrder(t, e, keeps)
}
for i := 0; i < numRounds; i++ {
for _, testcase := range testcases {
set := testcase[0]
cancels := testcase[1]
keeps := stringsComplement(set, cancels)
e := NewEngine(context.Background(), bs)
partner := testutil.RandPeerIDFatal(t)
partnerWants(e, set, partner)
partnerCancels(e, cancels, partner)
if err := checkHandledInOrder(t, e, keeps); err != nil {
t.Logf("run #%d of %d", i, numRounds)
t.Fatal(err)
}
}
}
}
func partnerWants(e *Engine, keys []string, partner peer.ID) {
@ -166,15 +181,17 @@ func partnerCancels(e *Engine, keys []string, partner peer.ID) {
e.MessageReceived(partner, cancels)
}
func assertPoppedInOrder(t *testing.T, e *Engine, keys []string) {
func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error {
for _, k := range keys {
envelope := <-e.Outbox()
next := <-e.Outbox()
envelope := <-next
received := envelope.Message.Blocks()[0]
expected := blocks.NewBlock([]byte(k))
if received.Key() != expected.Key() {
t.Fatal("received", string(received.Data), "expected", string(expected.Data))
return errors.New(fmt.Sprintln("received", string(received.Data), "expected", string(expected.Data)))
}
}
return nil
}
func stringsComplement(set, subset []string) []string {