mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-07 01:08:08 +08:00
Merge pull request #2798 from ipfs/feat/smarter-bitswap
Make bitswap better
This commit is contained in:
commit
c9ddc7d1d5
@ -3,6 +3,7 @@ package decision
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
blocks "github.com/ipfs/go-ipfs/blocks"
|
||||
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
|
||||
@ -68,7 +69,7 @@ type Engine struct {
|
||||
// peerRequestQueue is a priority queue of requests received from peers.
|
||||
// Requests are popped from the queue, packaged up, and placed in the
|
||||
// outbox.
|
||||
peerRequestQueue peerRequestQueue
|
||||
peerRequestQueue *prq
|
||||
|
||||
// FIXME it's a bit odd for the client and the worker to both share memory
|
||||
// (both modify the peerRequestQueue) and also to communicate over the
|
||||
@ -86,6 +87,8 @@ type Engine struct {
|
||||
lock sync.Mutex // protects the fields immediatly below
|
||||
// ledgerMap lists Ledgers by their Partner key.
|
||||
ledgerMap map[peer.ID]*ledger
|
||||
|
||||
ticker *time.Ticker
|
||||
}
|
||||
|
||||
func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
|
||||
@ -95,6 +98,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
|
||||
peerRequestQueue: newPRQ(),
|
||||
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
|
||||
workSignal: make(chan struct{}, 1),
|
||||
ticker: time.NewTicker(time.Millisecond * 100),
|
||||
}
|
||||
go e.taskWorker(ctx)
|
||||
return e
|
||||
@ -142,6 +146,9 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
|
||||
return nil, ctx.Err()
|
||||
case <-e.workSignal:
|
||||
nextTask = e.peerRequestQueue.Pop()
|
||||
case <-e.ticker.C:
|
||||
e.peerRequestQueue.thawRound()
|
||||
nextTask = e.peerRequestQueue.Pop()
|
||||
}
|
||||
}
|
||||
|
||||
@ -191,9 +198,6 @@ func (e *Engine) Peers() []peer.ID {
|
||||
// MessageReceived performs book-keeping. Returns error if passed invalid
|
||||
// arguments.
|
||||
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
|
||||
e.lock.Lock()
|
||||
defer e.lock.Unlock()
|
||||
|
||||
if len(m.Wantlist()) == 0 && len(m.Blocks()) == 0 {
|
||||
log.Debugf("received empty message from %s", p)
|
||||
}
|
||||
@ -206,6 +210,8 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
|
||||
}()
|
||||
|
||||
l := e.findOrCreate(p)
|
||||
l.lk.Lock()
|
||||
defer l.lk.Unlock()
|
||||
if m.Full() {
|
||||
l.wantList = wl.New()
|
||||
}
|
||||
@ -236,10 +242,12 @@ func (e *Engine) addBlock(block blocks.Block) {
|
||||
work := false
|
||||
|
||||
for _, l := range e.ledgerMap {
|
||||
l.lk.Lock()
|
||||
if entry, ok := l.WantListContains(block.Key()); ok {
|
||||
e.peerRequestQueue.Push(entry, l.Partner)
|
||||
work = true
|
||||
}
|
||||
l.lk.Unlock()
|
||||
}
|
||||
|
||||
if work {
|
||||
@ -261,9 +269,6 @@ func (e *Engine) AddBlock(block blocks.Block) {
|
||||
// send happen atomically
|
||||
|
||||
func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error {
|
||||
e.lock.Lock()
|
||||
defer e.lock.Unlock()
|
||||
|
||||
l := e.findOrCreate(p)
|
||||
for _, block := range m.Blocks() {
|
||||
l.SentBytes(len(block.Data()))
|
||||
@ -290,11 +295,13 @@ func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 {
|
||||
|
||||
// ledger lazily instantiates a ledger
|
||||
func (e *Engine) findOrCreate(p peer.ID) *ledger {
|
||||
e.lock.Lock()
|
||||
l, ok := e.ledgerMap[p]
|
||||
if !ok {
|
||||
l = newLedger(p)
|
||||
e.ledgerMap[p] = l
|
||||
}
|
||||
e.lock.Unlock()
|
||||
return l
|
||||
}
|
||||
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package decision
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
@ -44,6 +45,8 @@ type ledger struct {
|
||||
// sentToPeer is a set of keys to ensure we dont send duplicate blocks
|
||||
// to a given peer
|
||||
sentToPeer map[key.Key]time.Time
|
||||
|
||||
lk sync.Mutex
|
||||
}
|
||||
|
||||
type debtRatio struct {
|
||||
|
||||
@ -15,14 +15,16 @@ type peerRequestQueue interface {
|
||||
Pop() *peerRequestTask
|
||||
Push(entry wantlist.Entry, to peer.ID)
|
||||
Remove(k key.Key, p peer.ID)
|
||||
|
||||
// NB: cannot expose simply expose taskQueue.Len because trashed elements
|
||||
// may exist. These trashed elements should not contribute to the count.
|
||||
}
|
||||
|
||||
func newPRQ() peerRequestQueue {
|
||||
func newPRQ() *prq {
|
||||
return &prq{
|
||||
taskMap: make(map[string]*peerRequestTask),
|
||||
partners: make(map[peer.ID]*activePartner),
|
||||
frozen: make(map[peer.ID]*activePartner),
|
||||
pQueue: pq.New(partnerCompare),
|
||||
}
|
||||
}
|
||||
@ -38,6 +40,8 @@ type prq struct {
|
||||
pQueue pq.PQ
|
||||
taskMap map[string]*peerRequestTask
|
||||
partners map[peer.ID]*activePartner
|
||||
|
||||
frozen map[peer.ID]*activePartner
|
||||
}
|
||||
|
||||
// Push currently adds a new peerRequestTask to the end of the list
|
||||
@ -92,7 +96,7 @@ func (tl *prq) Pop() *peerRequestTask {
|
||||
partner := tl.pQueue.Pop().(*activePartner)
|
||||
|
||||
var out *peerRequestTask
|
||||
for partner.taskQueue.Len() > 0 {
|
||||
for partner.taskQueue.Len() > 0 && partner.freezeVal == 0 {
|
||||
out = partner.taskQueue.Pop().(*peerRequestTask)
|
||||
delete(tl.taskMap, out.Key())
|
||||
if out.trash {
|
||||
@ -120,11 +124,47 @@ func (tl *prq) Remove(k key.Key, p peer.ID) {
|
||||
t.trash = true
|
||||
|
||||
// having canceled a block, we now account for that in the given partner
|
||||
tl.partners[p].requests--
|
||||
partner := tl.partners[p]
|
||||
partner.requests--
|
||||
|
||||
// we now also 'freeze' that partner. If they sent us a cancel for a
|
||||
// block we were about to send them, we should wait a short period of time
|
||||
// to make sure we receive any other in-flight cancels before sending
|
||||
// them a block they already potentially have
|
||||
if partner.freezeVal == 0 {
|
||||
tl.frozen[p] = partner
|
||||
}
|
||||
|
||||
partner.freezeVal++
|
||||
tl.pQueue.Update(partner.index)
|
||||
}
|
||||
tl.lock.Unlock()
|
||||
}
|
||||
|
||||
func (tl *prq) fullThaw() {
|
||||
tl.lock.Lock()
|
||||
defer tl.lock.Unlock()
|
||||
|
||||
for id, partner := range tl.frozen {
|
||||
partner.freezeVal = 0
|
||||
delete(tl.frozen, id)
|
||||
tl.pQueue.Update(partner.index)
|
||||
}
|
||||
}
|
||||
|
||||
func (tl *prq) thawRound() {
|
||||
tl.lock.Lock()
|
||||
defer tl.lock.Unlock()
|
||||
|
||||
for id, partner := range tl.frozen {
|
||||
partner.freezeVal -= (partner.freezeVal + 1) / 2
|
||||
if partner.freezeVal <= 0 {
|
||||
delete(tl.frozen, id)
|
||||
}
|
||||
tl.pQueue.Update(partner.index)
|
||||
}
|
||||
}
|
||||
|
||||
type peerRequestTask struct {
|
||||
Entry wantlist.Entry
|
||||
Target peer.ID
|
||||
@ -196,6 +236,8 @@ type activePartner struct {
|
||||
// for the PQ interface
|
||||
index int
|
||||
|
||||
freezeVal int
|
||||
|
||||
// priority queue of tasks belonging to this peer
|
||||
taskQueue pq.PQ
|
||||
}
|
||||
@ -208,6 +250,7 @@ func newActivePartner() *activePartner {
|
||||
}
|
||||
|
||||
// partnerCompare implements pq.ElemComparator
|
||||
// returns true if peer 'a' has higher priority than peer 'b'
|
||||
func partnerCompare(a, b pq.Elem) bool {
|
||||
pa := a.(*activePartner)
|
||||
pb := b.(*activePartner)
|
||||
@ -220,6 +263,14 @@ func partnerCompare(a, b pq.Elem) bool {
|
||||
if pb.requests == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
if pa.freezeVal > pb.freezeVal {
|
||||
return false
|
||||
}
|
||||
if pa.freezeVal < pb.freezeVal {
|
||||
return true
|
||||
}
|
||||
|
||||
if pa.active == pb.active {
|
||||
// sorting by taskQueue.Len() aids in cleaning out trash entries faster
|
||||
// if we sorted instead by requests, one peer could potentially build up
|
||||
|
||||
@ -47,6 +47,8 @@ func TestPushPop(t *testing.T) {
|
||||
prq.Remove(key.Key(consonant), partner)
|
||||
}
|
||||
|
||||
prq.fullThaw()
|
||||
|
||||
var out []string
|
||||
for {
|
||||
received := prq.Pop()
|
||||
|
||||
@ -25,9 +25,16 @@ type BitSwapNetwork interface {
|
||||
|
||||
ConnectTo(context.Context, peer.ID) error
|
||||
|
||||
NewMessageSender(context.Context, peer.ID) (MessageSender, error)
|
||||
|
||||
Routing
|
||||
}
|
||||
|
||||
type MessageSender interface {
|
||||
SendMsg(bsmsg.BitSwapMessage) error
|
||||
Close() error
|
||||
}
|
||||
|
||||
// Implement Receiver to receive messages from the BitSwapNetwork
|
||||
type Receiver interface {
|
||||
ReceiveMessage(
|
||||
|
||||
@ -42,6 +42,27 @@ type impl struct {
|
||||
receiver Receiver
|
||||
}
|
||||
|
||||
type streamMessageSender struct {
|
||||
s inet.Stream
|
||||
}
|
||||
|
||||
func (s *streamMessageSender) Close() error {
|
||||
return s.s.Close()
|
||||
}
|
||||
|
||||
func (s *streamMessageSender) SendMsg(msg bsmsg.BitSwapMessage) error {
|
||||
return msg.ToNet(s.s)
|
||||
}
|
||||
|
||||
func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID) (MessageSender, error) {
|
||||
s, err := bsnet.newStreamToPeer(ctx, p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &streamMessageSender{s: s}, nil
|
||||
}
|
||||
|
||||
func (bsnet *impl) newStreamToPeer(ctx context.Context, p peer.ID) (inet.Stream, error) {
|
||||
|
||||
// first, make sure we're connected.
|
||||
|
||||
@ -112,6 +112,30 @@ func (nc *networkClient) FindProvidersAsync(ctx context.Context, k key.Key, max
|
||||
return out
|
||||
}
|
||||
|
||||
type messagePasser struct {
|
||||
net *network
|
||||
target peer.ID
|
||||
local peer.ID
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (mp *messagePasser) SendMsg(m bsmsg.BitSwapMessage) error {
|
||||
return mp.net.SendMessage(mp.ctx, mp.local, mp.target, m)
|
||||
}
|
||||
|
||||
func (mp *messagePasser) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet.MessageSender, error) {
|
||||
return &messagePasser{
|
||||
net: n.network,
|
||||
target: p,
|
||||
local: n.local,
|
||||
ctx: ctx,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Provide provides the key to the network
|
||||
func (nc *networkClient) Provide(ctx context.Context, k key.Key) error {
|
||||
return nc.routing.Provide(ctx, k)
|
||||
|
||||
@ -26,9 +26,11 @@ type WantManager struct {
|
||||
|
||||
network bsnet.BitSwapNetwork
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
}
|
||||
|
||||
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
return &WantManager{
|
||||
incoming: make(chan []*bsmsg.Entry, 10),
|
||||
connect: make(chan peer.ID, 10),
|
||||
@ -38,6 +40,7 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana
|
||||
wl: wantlist.NewThreadSafe(),
|
||||
network: network,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
@ -58,6 +61,8 @@ type msgQueue struct {
|
||||
out bsmsg.BitSwapMessage
|
||||
network bsnet.BitSwapNetwork
|
||||
|
||||
sender bsnet.MessageSender
|
||||
|
||||
refcnt int
|
||||
|
||||
work chan struct{}
|
||||
@ -150,6 +155,11 @@ func (pm *WantManager) stopPeerHandler(p peer.ID) {
|
||||
}
|
||||
|
||||
func (mq *msgQueue) runQueue(ctx context.Context) {
|
||||
defer func() {
|
||||
if mq.sender != nil {
|
||||
mq.sender.Close()
|
||||
}
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-mq.work: // there is work to be done
|
||||
@ -166,14 +176,25 @@ func (mq *msgQueue) doWork(ctx context.Context) {
|
||||
// allow ten minutes for connections
|
||||
// this includes looking them up in the dht
|
||||
// dialing them, and handshaking
|
||||
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
|
||||
defer cancel()
|
||||
if mq.sender == nil {
|
||||
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
|
||||
defer cancel()
|
||||
|
||||
err := mq.network.ConnectTo(conctx, mq.p)
|
||||
if err != nil {
|
||||
log.Infof("cant connect to peer %s: %s", mq.p, err)
|
||||
// TODO: cant connect, what now?
|
||||
return
|
||||
err := mq.network.ConnectTo(conctx, mq.p)
|
||||
if err != nil {
|
||||
log.Infof("cant connect to peer %s: %s", mq.p, err)
|
||||
// TODO: cant connect, what now?
|
||||
return
|
||||
}
|
||||
|
||||
nsender, err := mq.network.NewMessageSender(ctx, mq.p)
|
||||
if err != nil {
|
||||
log.Infof("cant open new stream to peer %s: %s", mq.p, err)
|
||||
// TODO: cant open stream, what now?
|
||||
return
|
||||
}
|
||||
|
||||
mq.sender = nsender
|
||||
}
|
||||
|
||||
// grab outgoing message
|
||||
@ -186,13 +207,12 @@ func (mq *msgQueue) doWork(ctx context.Context) {
|
||||
mq.out = nil
|
||||
mq.outlk.Unlock()
|
||||
|
||||
sendctx, cancel := context.WithTimeout(ctx, time.Minute*5)
|
||||
defer cancel()
|
||||
|
||||
// send wantlist updates
|
||||
err = mq.network.SendMessage(sendctx, mq.p, wlm)
|
||||
err := mq.sender.SendMsg(wlm)
|
||||
if err != nil {
|
||||
log.Infof("bitswap send error: %s", err)
|
||||
mq.sender.Close()
|
||||
mq.sender = nil
|
||||
// TODO: what do we do if this fails?
|
||||
return
|
||||
}
|
||||
|
||||
@ -149,6 +149,8 @@ func DirectAddCat(data []byte, conf testutil.LatencyConfig) error {
|
||||
if 0 != bytes.Compare(bufout.Bytes(), data) {
|
||||
return errors.New("catted data does not match added data")
|
||||
}
|
||||
|
||||
cancel()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -73,6 +73,7 @@ func RunSupernodeBootstrappedAddCat(data []byte, conf testutil.LatencyConfig) er
|
||||
if 0 != bytes.Compare(bufout.Bytes(), data) {
|
||||
return errors.New("catted data does not match added data")
|
||||
}
|
||||
cancel()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -177,5 +178,6 @@ func RunSupernodePutRecordGetRecord(conf testutil.LatencyConfig) error {
|
||||
if 0 != bytes.Compare(note, received) {
|
||||
return errors.New("record doesn't match")
|
||||
}
|
||||
cancel()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -128,5 +128,6 @@ func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error {
|
||||
if 0 != bytes.Compare(bufout.Bytes(), data) {
|
||||
return errors.New("catted data does not match added data")
|
||||
}
|
||||
cancel()
|
||||
return nil
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user