kubo/exchange/bitswap/decision/engine.go
Brian Tiger Chow e36d656632 log unusual event
License: MIT
Signed-off-by: Brian Tiger Chow <brian@perfmode.com>
2014-12-17 23:44:45 -08:00

185 lines
4.5 KiB
Go

package decision
import (
"sync"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
var log = u.Logger("engine")
// Envelope contains a message for a Peer
type Envelope struct {
// Peer is the intended recipient
Peer peer.Peer
// Message is the payload
Message bsmsg.BitSwapMessage
}
type Engine struct {
// FIXME peerRequestQueue isn't threadsafe nor is it protected by a mutex.
// consider a way to avoid sharing the peerRequestQueue between the worker
// and the receiver
peerRequestQueue *taskQueue
workSignal chan struct{}
outbox chan Envelope
bs bstore.Blockstore
lock sync.RWMutex
// ledgerMap lists Ledgers by their Partner key.
ledgerMap map[u.Key]*ledger
}
func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
e := &Engine{
ledgerMap: make(map[u.Key]*ledger),
bs: bs,
peerRequestQueue: newTaskQueue(),
outbox: make(chan Envelope, 4), // TODO extract constant
workSignal: make(chan struct{}),
}
go e.taskWorker(ctx)
return e
}
func (e *Engine) taskWorker(ctx context.Context) {
for {
nextTask := e.peerRequestQueue.Pop()
if nextTask == nil {
// No tasks in the list?
// Wait until there are!
select {
case <-ctx.Done():
return
case <-e.workSignal:
}
continue
}
block, err := e.bs.Get(nextTask.Entry.Key)
if err != nil {
log.Warning("engine: task exists to send block, but block is not in blockstore")
continue
}
// construct message here so we can make decisions about any additional
// information we may want to include at this time.
m := bsmsg.New()
m.AddBlock(block)
// TODO: maybe add keys from our wantlist?
select {
case <-ctx.Done():
return
case e.outbox <- Envelope{Peer: nextTask.Target, Message: m}:
}
}
}
func (e *Engine) Outbox() <-chan Envelope {
return e.outbox
}
// Returns a slice of Peers with whom the local node has active sessions
func (e *Engine) Peers() []peer.Peer {
e.lock.RLock()
defer e.lock.RUnlock()
response := make([]peer.Peer, 0)
for _, ledger := range e.ledgerMap {
response = append(response, ledger.Partner)
}
return response
}
// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
func (e *Engine) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
newWorkExists := false
defer func() {
if newWorkExists {
// Signal task generation to restart (if stopped!)
select {
case e.workSignal <- struct{}{}:
default:
}
}
}()
e.lock.Lock()
defer e.lock.Unlock()
l := e.findOrCreate(p)
if m.Full() {
l.wantList = wl.New()
}
for _, entry := range m.Wantlist() {
if entry.Cancel {
l.CancelWant(entry.Key)
e.peerRequestQueue.Remove(entry.Key, p)
} else {
l.Wants(entry.Key, entry.Priority)
if exists, err := e.bs.Has(entry.Key); err == nil && exists {
newWorkExists = true
e.peerRequestQueue.Push(entry.Key, entry.Priority, p)
}
}
}
for _, block := range m.Blocks() {
// FIXME extract blocks.NumBytes(block) or block.NumBytes() method
l.ReceivedBytes(len(block.Data))
for _, l := range e.ledgerMap {
if l.WantListContains(block.Key()) {
newWorkExists = true
e.peerRequestQueue.Push(block.Key(), 1, l.Partner)
}
}
}
return nil
}
// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically
func (e *Engine) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
e.lock.Lock()
defer e.lock.Unlock()
l := e.findOrCreate(p)
for _, block := range m.Blocks() {
l.SentBytes(len(block.Data))
l.wantList.Remove(block.Key())
e.peerRequestQueue.Remove(block.Key(), p)
}
return nil
}
func (e *Engine) numBytesSentTo(p peer.Peer) uint64 {
// NB not threadsafe
return e.findOrCreate(p).Accounting.BytesSent
}
func (e *Engine) numBytesReceivedFrom(p peer.Peer) uint64 {
// NB not threadsafe
return e.findOrCreate(p).Accounting.BytesRecv
}
// ledger lazily instantiates a ledger
func (e *Engine) findOrCreate(p peer.Peer) *ledger {
l, ok := e.ledgerMap[p.Key()]
if !ok {
l = newLedger(p)
e.ledgerMap[p.Key()] = l
}
return l
}