From d16591ad219af0bc0e5af317063cc9283c3dd8a4 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 19 May 2016 14:32:56 -0700 Subject: [PATCH] Make bitswap better License: MIT Signed-off-by: Jeromy --- exchange/bitswap/decision/engine.go | 21 ++++--- exchange/bitswap/decision/ledger.go | 3 + .../bitswap/decision/peer_request_queue.go | 57 ++++++++++++++++++- .../decision/peer_request_queue_test.go | 2 + exchange/bitswap/network/interface.go | 7 +++ exchange/bitswap/network/ipfs_impl.go | 21 +++++++ exchange/bitswap/testnet/virtual.go | 24 ++++++++ exchange/bitswap/wantmanager.go | 42 ++++++++++---- test/integration/addcat_test.go | 2 + test/integration/grandcentral_test.go | 2 + test/integration/three_legged_cat_test.go | 1 + 11 files changed, 161 insertions(+), 21 deletions(-) diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index 99b8088cf..a31ad6d7a 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -3,6 +3,7 @@ package decision import ( "sync" + "time" blocks "github.com/ipfs/go-ipfs/blocks" bstore "github.com/ipfs/go-ipfs/blocks/blockstore" @@ -68,7 +69,7 @@ type Engine struct { // peerRequestQueue is a priority queue of requests received from peers. // Requests are popped from the queue, packaged up, and placed in the // outbox. - peerRequestQueue peerRequestQueue + peerRequestQueue *prq // FIXME it's a bit odd for the client and the worker to both share memory // (both modify the peerRequestQueue) and also to communicate over the @@ -86,6 +87,8 @@ type Engine struct { lock sync.Mutex // protects the fields immediatly below // ledgerMap lists Ledgers by their Partner key. ledgerMap map[peer.ID]*ledger + + ticker *time.Ticker } func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { @@ -95,6 +98,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { peerRequestQueue: newPRQ(), outbox: make(chan (<-chan *Envelope), outboxChanBuffer), workSignal: make(chan struct{}, 1), + ticker: time.NewTicker(time.Millisecond * 100), } go e.taskWorker(ctx) return e @@ -142,6 +146,9 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { return nil, ctx.Err() case <-e.workSignal: nextTask = e.peerRequestQueue.Pop() + case <-e.ticker.C: + e.peerRequestQueue.thawRound() + nextTask = e.peerRequestQueue.Pop() } } @@ -191,9 +198,6 @@ func (e *Engine) Peers() []peer.ID { // MessageReceived performs book-keeping. Returns error if passed invalid // arguments. func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { - e.lock.Lock() - defer e.lock.Unlock() - if len(m.Wantlist()) == 0 && len(m.Blocks()) == 0 { log.Debugf("received empty message from %s", p) } @@ -206,6 +210,8 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { }() l := e.findOrCreate(p) + l.lk.Lock() + defer l.lk.Unlock() if m.Full() { l.wantList = wl.New() } @@ -236,10 +242,12 @@ func (e *Engine) addBlock(block blocks.Block) { work := false for _, l := range e.ledgerMap { + l.lk.Lock() if entry, ok := l.WantListContains(block.Key()); ok { e.peerRequestQueue.Push(entry, l.Partner) work = true } + l.lk.Unlock() } if work { @@ -261,9 +269,6 @@ func (e *Engine) AddBlock(block blocks.Block) { // send happen atomically func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error { - e.lock.Lock() - defer e.lock.Unlock() - l := e.findOrCreate(p) for _, block := range m.Blocks() { l.SentBytes(len(block.Data())) @@ -290,11 +295,13 @@ func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 { // ledger lazily instantiates a ledger func (e *Engine) findOrCreate(p peer.ID) *ledger { + e.lock.Lock() l, ok := e.ledgerMap[p] if !ok { l = newLedger(p) e.ledgerMap[p] = l } + e.lock.Unlock() return l } diff --git a/exchange/bitswap/decision/ledger.go b/exchange/bitswap/decision/ledger.go index 479027678..dddefb596 100644 --- a/exchange/bitswap/decision/ledger.go +++ b/exchange/bitswap/decision/ledger.go @@ -1,6 +1,7 @@ package decision import ( + "sync" "time" key "github.com/ipfs/go-ipfs/blocks/key" @@ -44,6 +45,8 @@ type ledger struct { // sentToPeer is a set of keys to ensure we dont send duplicate blocks // to a given peer sentToPeer map[key.Key]time.Time + + lk sync.Mutex } type debtRatio struct { diff --git a/exchange/bitswap/decision/peer_request_queue.go b/exchange/bitswap/decision/peer_request_queue.go index 54cd19357..21d219a71 100644 --- a/exchange/bitswap/decision/peer_request_queue.go +++ b/exchange/bitswap/decision/peer_request_queue.go @@ -15,14 +15,16 @@ type peerRequestQueue interface { Pop() *peerRequestTask Push(entry wantlist.Entry, to peer.ID) Remove(k key.Key, p peer.ID) + // NB: cannot expose simply expose taskQueue.Len because trashed elements // may exist. These trashed elements should not contribute to the count. } -func newPRQ() peerRequestQueue { +func newPRQ() *prq { return &prq{ taskMap: make(map[string]*peerRequestTask), partners: make(map[peer.ID]*activePartner), + frozen: make(map[peer.ID]*activePartner), pQueue: pq.New(partnerCompare), } } @@ -38,6 +40,8 @@ type prq struct { pQueue pq.PQ taskMap map[string]*peerRequestTask partners map[peer.ID]*activePartner + + frozen map[peer.ID]*activePartner } // Push currently adds a new peerRequestTask to the end of the list @@ -92,7 +96,7 @@ func (tl *prq) Pop() *peerRequestTask { partner := tl.pQueue.Pop().(*activePartner) var out *peerRequestTask - for partner.taskQueue.Len() > 0 { + for partner.taskQueue.Len() > 0 && partner.freezeVal == 0 { out = partner.taskQueue.Pop().(*peerRequestTask) delete(tl.taskMap, out.Key()) if out.trash { @@ -120,11 +124,47 @@ func (tl *prq) Remove(k key.Key, p peer.ID) { t.trash = true // having canceled a block, we now account for that in the given partner - tl.partners[p].requests-- + partner := tl.partners[p] + partner.requests-- + + // we now also 'freeze' that partner. If they sent us a cancel for a + // block we were about to send them, we should wait a short period of time + // to make sure we receive any other in-flight cancels before sending + // them a block they already potentially have + if partner.freezeVal == 0 { + tl.frozen[p] = partner + } + + partner.freezeVal++ + tl.pQueue.Update(partner.index) } tl.lock.Unlock() } +func (tl *prq) fullThaw() { + tl.lock.Lock() + defer tl.lock.Unlock() + + for id, partner := range tl.frozen { + partner.freezeVal = 0 + delete(tl.frozen, id) + tl.pQueue.Update(partner.index) + } +} + +func (tl *prq) thawRound() { + tl.lock.Lock() + defer tl.lock.Unlock() + + for id, partner := range tl.frozen { + partner.freezeVal -= (partner.freezeVal + 1) / 2 + if partner.freezeVal <= 0 { + delete(tl.frozen, id) + } + tl.pQueue.Update(partner.index) + } +} + type peerRequestTask struct { Entry wantlist.Entry Target peer.ID @@ -196,6 +236,8 @@ type activePartner struct { // for the PQ interface index int + freezeVal int + // priority queue of tasks belonging to this peer taskQueue pq.PQ } @@ -208,6 +250,7 @@ func newActivePartner() *activePartner { } // partnerCompare implements pq.ElemComparator +// returns true if peer 'a' has higher priority than peer 'b' func partnerCompare(a, b pq.Elem) bool { pa := a.(*activePartner) pb := b.(*activePartner) @@ -220,6 +263,14 @@ func partnerCompare(a, b pq.Elem) bool { if pb.requests == 0 { return true } + + if pa.freezeVal > pb.freezeVal { + return false + } + if pa.freezeVal < pb.freezeVal { + return true + } + if pa.active == pb.active { // sorting by taskQueue.Len() aids in cleaning out trash entries faster // if we sorted instead by requests, one peer could potentially build up diff --git a/exchange/bitswap/decision/peer_request_queue_test.go b/exchange/bitswap/decision/peer_request_queue_test.go index a2d96a9c6..b1091c03c 100644 --- a/exchange/bitswap/decision/peer_request_queue_test.go +++ b/exchange/bitswap/decision/peer_request_queue_test.go @@ -47,6 +47,8 @@ func TestPushPop(t *testing.T) { prq.Remove(key.Key(consonant), partner) } + prq.fullThaw() + var out []string for { received := prq.Pop() diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go index 57692551f..42d509f63 100644 --- a/exchange/bitswap/network/interface.go +++ b/exchange/bitswap/network/interface.go @@ -25,9 +25,16 @@ type BitSwapNetwork interface { ConnectTo(context.Context, peer.ID) error + NewMessageSender(context.Context, peer.ID) (MessageSender, error) + Routing } +type MessageSender interface { + SendMsg(bsmsg.BitSwapMessage) error + Close() error +} + // Implement Receiver to receive messages from the BitSwapNetwork type Receiver interface { ReceiveMessage( diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index d5a168dc0..21f7f59f7 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -42,6 +42,27 @@ type impl struct { receiver Receiver } +type streamMessageSender struct { + s inet.Stream +} + +func (s *streamMessageSender) Close() error { + return s.s.Close() +} + +func (s *streamMessageSender) SendMsg(msg bsmsg.BitSwapMessage) error { + return msg.ToNet(s.s) +} + +func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID) (MessageSender, error) { + s, err := bsnet.newStreamToPeer(ctx, p) + if err != nil { + return nil, err + } + + return &streamMessageSender{s: s}, nil +} + func (bsnet *impl) newStreamToPeer(ctx context.Context, p peer.ID) (inet.Stream, error) { // first, make sure we're connected. diff --git a/exchange/bitswap/testnet/virtual.go b/exchange/bitswap/testnet/virtual.go index 89833b682..d0555ff37 100644 --- a/exchange/bitswap/testnet/virtual.go +++ b/exchange/bitswap/testnet/virtual.go @@ -112,6 +112,30 @@ func (nc *networkClient) FindProvidersAsync(ctx context.Context, k key.Key, max return out } +type messagePasser struct { + net *network + target peer.ID + local peer.ID + ctx context.Context +} + +func (mp *messagePasser) SendMsg(m bsmsg.BitSwapMessage) error { + return mp.net.SendMessage(mp.ctx, mp.local, mp.target, m) +} + +func (mp *messagePasser) Close() error { + return nil +} + +func (n *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet.MessageSender, error) { + return &messagePasser{ + net: n.network, + target: p, + local: n.local, + ctx: ctx, + }, nil +} + // Provide provides the key to the network func (nc *networkClient) Provide(ctx context.Context, k key.Key) error { return nc.routing.Provide(ctx, k) diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index 50fdb37da..24fd75c1e 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -26,9 +26,11 @@ type WantManager struct { network bsnet.BitSwapNetwork ctx context.Context + cancel func() } func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager { + ctx, cancel := context.WithCancel(ctx) return &WantManager{ incoming: make(chan []*bsmsg.Entry, 10), connect: make(chan peer.ID, 10), @@ -38,6 +40,7 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana wl: wantlist.NewThreadSafe(), network: network, ctx: ctx, + cancel: cancel, } } @@ -58,6 +61,8 @@ type msgQueue struct { out bsmsg.BitSwapMessage network bsnet.BitSwapNetwork + sender bsnet.MessageSender + refcnt int work chan struct{} @@ -150,6 +155,11 @@ func (pm *WantManager) stopPeerHandler(p peer.ID) { } func (mq *msgQueue) runQueue(ctx context.Context) { + defer func() { + if mq.sender != nil { + mq.sender.Close() + } + }() for { select { case <-mq.work: // there is work to be done @@ -166,14 +176,25 @@ func (mq *msgQueue) doWork(ctx context.Context) { // allow ten minutes for connections // this includes looking them up in the dht // dialing them, and handshaking - conctx, cancel := context.WithTimeout(ctx, time.Minute*10) - defer cancel() + if mq.sender == nil { + conctx, cancel := context.WithTimeout(ctx, time.Minute*10) + defer cancel() - err := mq.network.ConnectTo(conctx, mq.p) - if err != nil { - log.Infof("cant connect to peer %s: %s", mq.p, err) - // TODO: cant connect, what now? - return + err := mq.network.ConnectTo(conctx, mq.p) + if err != nil { + log.Infof("cant connect to peer %s: %s", mq.p, err) + // TODO: cant connect, what now? + return + } + + nsender, err := mq.network.NewMessageSender(ctx, mq.p) + if err != nil { + log.Infof("cant open new stream to peer %s: %s", mq.p, err) + // TODO: cant open stream, what now? + return + } + + mq.sender = nsender } // grab outgoing message @@ -186,13 +207,12 @@ func (mq *msgQueue) doWork(ctx context.Context) { mq.out = nil mq.outlk.Unlock() - sendctx, cancel := context.WithTimeout(ctx, time.Minute*5) - defer cancel() - // send wantlist updates - err = mq.network.SendMessage(sendctx, mq.p, wlm) + err := mq.sender.SendMsg(wlm) if err != nil { log.Infof("bitswap send error: %s", err) + mq.sender.Close() + mq.sender = nil // TODO: what do we do if this fails? return } diff --git a/test/integration/addcat_test.go b/test/integration/addcat_test.go index bb9f4f74f..052665f0d 100644 --- a/test/integration/addcat_test.go +++ b/test/integration/addcat_test.go @@ -149,6 +149,8 @@ func DirectAddCat(data []byte, conf testutil.LatencyConfig) error { if 0 != bytes.Compare(bufout.Bytes(), data) { return errors.New("catted data does not match added data") } + + cancel() return nil } diff --git a/test/integration/grandcentral_test.go b/test/integration/grandcentral_test.go index 671165086..48eb3b64a 100644 --- a/test/integration/grandcentral_test.go +++ b/test/integration/grandcentral_test.go @@ -73,6 +73,7 @@ func RunSupernodeBootstrappedAddCat(data []byte, conf testutil.LatencyConfig) er if 0 != bytes.Compare(bufout.Bytes(), data) { return errors.New("catted data does not match added data") } + cancel() return nil } @@ -177,5 +178,6 @@ func RunSupernodePutRecordGetRecord(conf testutil.LatencyConfig) error { if 0 != bytes.Compare(note, received) { return errors.New("record doesn't match") } + cancel() return nil } diff --git a/test/integration/three_legged_cat_test.go b/test/integration/three_legged_cat_test.go index f1a2bb674..723165d8b 100644 --- a/test/integration/three_legged_cat_test.go +++ b/test/integration/three_legged_cat_test.go @@ -128,5 +128,6 @@ func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error { if 0 != bytes.Compare(bufout.Bytes(), data) { return errors.New("catted data does not match added data") } + cancel() return nil }