From 47479b672a6040ae8463cf125d276a8a9b4e93d1 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 10 Apr 2017 22:05:29 -0700 Subject: [PATCH 01/11] track wantlists sent to peers individually License: MIT Signed-off-by: Jeromy --- exchange/bitswap/wantmanager.go | 54 ++++++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 14 deletions(-) diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index c6cce7ff7..34bf78572 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -17,7 +17,7 @@ import ( type WantManager struct { // sync channels for Run loop - incoming chan []*bsmsg.Entry + incoming chan *wantSet connect chan peer.ID // notification channel for new peers connecting disconnect chan peer.ID // notification channel for peers disconnecting peerReqs chan chan []peer.ID // channel to request connected peers on @@ -41,7 +41,7 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+ " this bitswap").Histogram(metricsBuckets) return &WantManager{ - incoming: make(chan []*bsmsg.Entry, 10), + incoming: make(chan *wantSet, 10), connect: make(chan peer.ID, 10), disconnect: make(chan peer.ID, 10), peerReqs: make(chan chan []peer.ID), @@ -61,6 +61,7 @@ type msgQueue struct { outlk sync.Mutex out bsmsg.BitSwapMessage network bsnet.BitSwapNetwork + wl *wantlist.Wantlist sender bsnet.MessageSender @@ -76,8 +77,12 @@ func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid) { } func (pm *WantManager) CancelWants(ks []*cid.Cid) { - log.Infof("cancel wants: %s", ks) - pm.addEntries(context.TODO(), ks, true) + pm.addEntries(context.Background(), ks, true) +} + +type wantSet struct { + entries []*bsmsg.Entry + targets []peer.ID } func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, cancel bool) { @@ -93,7 +98,7 @@ func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, cancel boo }) } select { - case pm.incoming <- entries: + case pm.incoming <- &wantSet{entries: entries}: case <-pm.ctx.Done(): case <-ctx.Done(): } @@ -133,6 +138,8 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue { // new peer, we will want to give them our full wantlist fullwantlist := bsmsg.New(true) for _, e := range pm.wl.Entries() { + ne := *e + mq.wl.AddEntry(&ne) fullwantlist.AddEntry(e.Cid, e.Priority) } mq.out = fullwantlist @@ -278,27 +285,35 @@ func (pm *WantManager) Run() { defer tock.Stop() for { select { - case entries := <-pm.incoming: + case ws := <-pm.incoming: // add changes to our wantlist - var filtered []*bsmsg.Entry - for _, e := range entries { + for _, e := range ws.entries { if e.Cancel { if pm.wl.Remove(e.Cid) { pm.wantlistGauge.Dec() - filtered = append(filtered, e) } } else { if pm.wl.AddEntry(e.Entry) { pm.wantlistGauge.Inc() - filtered = append(filtered, e) } } } // broadcast those wantlist changes - for _, p := range pm.peers { - p.addMessage(filtered) + if len(ws.targets) == 0 { + for _, p := range pm.peers { + p.addMessage(ws.entries) + } + } else { + for _, t := range ws.targets { + p, ok := pm.peers[t] + if !ok { + log.Warning("tried sending wantlist change to non-partner peer") + continue + } + p.addMessage(ws.entries) + } } case <-tock.C: @@ -335,6 +350,7 @@ func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue { return &msgQueue{ done: make(chan struct{}), work: make(chan struct{}, 1), + wl: wantlist.New(), network: wm.network, p: p, refcnt: 1, @@ -342,9 +358,13 @@ func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue { } func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) { + var work bool mq.outlk.Lock() defer func() { mq.outlk.Unlock() + if !work { + return + } select { case mq.work <- struct{}{}: default: @@ -361,9 +381,15 @@ func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) { // one passed in for _, e := range entries { if e.Cancel { - mq.out.Cancel(e.Cid) + if mq.wl.Remove(e.Cid) { + work = true + mq.out.Cancel(e.Cid) + } } else { - mq.out.AddEntry(e.Cid, e.Priority) + if mq.wl.Add(e.Cid, e.Priority) { + work = true + mq.out.AddEntry(e.Cid, e.Priority) + } } } } From bda8c3a6873e35eb674ad17633f35e6720931f94 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 3 Apr 2017 19:21:52 -0700 Subject: [PATCH 02/11] implement bitswap sessions License: MIT Signed-off-by: Jeromy --- blocks/blocksutil/block_generator.go | 4 +- core/corehttp/gateway_handler.go | 2 +- exchange/bitswap/bitswap.go | 73 ++++----- exchange/bitswap/bitswap_test.go | 13 +- exchange/bitswap/decision/engine.go | 2 +- exchange/bitswap/get.go | 100 ++++++++++++ exchange/bitswap/session.go | 221 +++++++++++++++++++++++++++ exchange/bitswap/session_test.go | 152 ++++++++++++++++++ exchange/bitswap/testutils.go | 4 +- exchange/bitswap/wantmanager.go | 12 +- exchange/bitswap/workers.go | 2 +- 11 files changed, 528 insertions(+), 57 deletions(-) create mode 100644 exchange/bitswap/get.go create mode 100644 exchange/bitswap/session.go create mode 100644 exchange/bitswap/session_test.go diff --git a/blocks/blocksutil/block_generator.go b/blocks/blocksutil/block_generator.go index 324da54a7..a8520a7a2 100644 --- a/blocks/blocksutil/block_generator.go +++ b/blocks/blocksutil/block_generator.go @@ -25,8 +25,8 @@ func (bg *BlockGenerator) Next() *blocks.BasicBlock { } // Blocks generates as many BasicBlocks as specified by n. -func (bg *BlockGenerator) Blocks(n int) []*blocks.BasicBlock { - blocks := make([]*blocks.BasicBlock, 0) +func (bg *BlockGenerator) Blocks(n int) []blocks.Block { + blocks := make([]blocks.Block, 0, n) for i := 0; i < n; i++ { b := bg.Next() blocks = append(blocks, b) diff --git a/core/corehttp/gateway_handler.go b/core/corehttp/gateway_handler.go index 6ec2c3893..a8ebc390a 100644 --- a/core/corehttp/gateway_handler.go +++ b/core/corehttp/gateway_handler.go @@ -27,7 +27,7 @@ import ( node "gx/ipfs/QmPAKbSsgEX5B6fpmxa61jXYnoWzZr5sNafd3qgPiSH8Uv/go-ipld-format" humanize "gx/ipfs/QmPSBJL4momYnE7DcUyk2DVhD6rH488ZmHBGLbxNdhU44K/go-humanize" cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid" - multibase "gx/ipfs/QmcxkxTVuURV2Ptse8TvkqH5BQDwV62X1x19JqqvbBzwUM/go-multibase" + multibase "gx/ipfs/Qme4T6BE4sQxg7ZouamF5M7Tx1ZFTqzcns7BkyQPXpoT99/go-multibase" ) const ( diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index ce7bd6b26..74c70b108 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -7,6 +7,7 @@ import ( "errors" "math" "sync" + "sync/atomic" "time" blockstore "github.com/ipfs/go-ipfs/blocks/blockstore" @@ -17,13 +18,13 @@ import ( notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications" flags "github.com/ipfs/go-ipfs/flags" "github.com/ipfs/go-ipfs/thirdparty/delay" - blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format" metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface" process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" loggables "gx/ipfs/QmVesPmqbPp7xRGyY96tnBwzDtVV1nqv4SCVxo5zCqKyH8/go-libp2p-loggables" + blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format" cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid" peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer" ) @@ -159,10 +160,15 @@ type Bitswap struct { blocksSent int dataSent uint64 dataRecvd uint64 + messagesRecvd uint64 // Metrics interface metrics dupMetric metrics.Histogram allMetric metrics.Histogram + + // Sessions + sessions []*Session + sessLk sync.Mutex } type blockRequest struct { @@ -173,45 +179,7 @@ type blockRequest struct { // GetBlock attempts to retrieve a particular block from peers within the // deadline enforced by the context. func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) { - if k == nil { - log.Error("nil cid in GetBlock") - return nil, blockstore.ErrNotFound - } - - // Any async work initiated by this function must end when this function - // returns. To ensure this, derive a new context. Note that it is okay to - // listen on parent in this scope, but NOT okay to pass |parent| to - // functions called by this one. Otherwise those functions won't return - // when this context's cancel func is executed. This is difficult to - // enforce. May this comment keep you safe. - ctx, cancelFunc := context.WithCancel(parent) - - // TODO: this request ID should come in from a higher layer so we can track - // across multiple 'GetBlock' invocations - ctx = logging.ContextWithLoggable(ctx, loggables.Uuid("GetBlockRequest")) - log.Event(ctx, "Bitswap.GetBlockRequest.Start", k) - defer log.Event(ctx, "Bitswap.GetBlockRequest.End", k) - defer cancelFunc() - - promise, err := bs.GetBlocks(ctx, []*cid.Cid{k}) - if err != nil { - return nil, err - } - - select { - case block, ok := <-promise: - if !ok { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - return nil, errors.New("promise channel was closed") - } - } - return block, nil - case <-parent.Done(): - return nil, parent.Err() - } + return getBlock(parent, k, bs.GetBlocks) } func (bs *Bitswap) WantlistForPeer(p peer.ID) []*cid.Cid { @@ -251,7 +219,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block log.Event(ctx, "Bitswap.GetBlockRequest.Start", k) } - bs.wm.WantBlocks(ctx, keys) + bs.wm.WantBlocks(ctx, keys, nil) // NB: Optimization. Assumes that providers of key[0] are likely to // be able to provide for all keys. This currently holds true in most @@ -304,7 +272,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block // CancelWant removes a given key from the wantlist func (bs *Bitswap) CancelWants(cids []*cid.Cid) { - bs.wm.CancelWants(cids) + bs.wm.CancelWants(context.Background(), cids, nil) } // HasBlock announces the existance of a block to this bitswap service. The @@ -340,7 +308,22 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error { return nil } +func (bs *Bitswap) SessionsForBlock(c *cid.Cid) []*Session { + bs.sessLk.Lock() + defer bs.sessLk.Unlock() + + var out []*Session + for _, s := range bs.sessions { + if s.InterestedIn(c) { + out = append(out, s) + } + } + return out +} + func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) { + atomic.AddUint64(&bs.messagesRecvd, 1) + // This call records changes to wantlists, blocks received, // and number of bytes transfered. bs.engine.MessageReceived(p, incoming) @@ -362,7 +345,8 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg } keys = append(keys, block.Cid()) } - bs.wm.CancelWants(keys) + + bs.wm.CancelWants(context.Background(), keys, nil) wg := sync.WaitGroup{} for _, block := range iblocks { @@ -375,6 +359,9 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg k := b.Cid() log.Event(ctx, "Bitswap.GetBlockRequest.End", k) + for _, ses := range bs.SessionsForBlock(k) { + ses.ReceiveBlock(p, b) + } log.Debugf("got block %s from %s", b, p) if err := bs.HasBlock(b); err != nil { log.Warningf("ReceiveMessage HasBlock error: %s", err) diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 770041c9f..76a28d5dc 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -370,6 +370,9 @@ func TestDoubleGet(t *testing.T) { instances := sg.Instances(2) blocks := bg.Blocks(1) + // NOTE: A race condition can happen here where these GetBlocks requests go + // through before the peers even get connected. This is okay, bitswap + // *should* be able to handle this. ctx1, cancel1 := context.WithCancel(context.Background()) blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []*cid.Cid{blocks[0].Cid()}) if err != nil { @@ -385,7 +388,7 @@ func TestDoubleGet(t *testing.T) { } // ensure both requests make it into the wantlist at the same time - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Millisecond * 20) cancel1() _, ok := <-blkch1 @@ -405,6 +408,14 @@ func TestDoubleGet(t *testing.T) { } t.Log(blk) case <-time.After(time.Second * 5): + p1wl := instances[0].Exchange.WantlistForPeer(instances[1].Peer) + if len(p1wl) != 1 { + t.Logf("wantlist view didnt have 1 item (had %d)", len(p1wl)) + } else if !p1wl[0].Equals(blocks[0].Cid()) { + t.Logf("had 1 item, it was wrong: %s %s", blocks[0].Cid(), p1wl[0]) + } else { + t.Log("had correct wantlist, somehow") + } t.Fatal("timed out waiting on block") } diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index a51610e60..973a7eb85 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -2,10 +2,10 @@ package decision import ( + "context" "sync" "time" - context "context" bstore "github.com/ipfs/go-ipfs/blocks/blockstore" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" diff --git a/exchange/bitswap/get.go b/exchange/bitswap/get.go new file mode 100644 index 000000000..3a64f5117 --- /dev/null +++ b/exchange/bitswap/get.go @@ -0,0 +1,100 @@ +package bitswap + +import ( + "context" + "errors" + + blocks "github.com/ipfs/go-ipfs/blocks" + blockstore "github.com/ipfs/go-ipfs/blocks/blockstore" + notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications" + + cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid" +) + +type getBlocksFunc func(context.Context, []*cid.Cid) (<-chan blocks.Block, error) + +func getBlock(p context.Context, k *cid.Cid, gb getBlocksFunc) (blocks.Block, error) { + if k == nil { + log.Error("nil cid in GetBlock") + return nil, blockstore.ErrNotFound + } + + // Any async work initiated by this function must end when this function + // returns. To ensure this, derive a new context. Note that it is okay to + // listen on parent in this scope, but NOT okay to pass |parent| to + // functions called by this one. Otherwise those functions won't return + // when this context's cancel func is executed. This is difficult to + // enforce. May this comment keep you safe. + ctx, cancel := context.WithCancel(p) + defer cancel() + + promise, err := gb(ctx, []*cid.Cid{k}) + if err != nil { + return nil, err + } + + select { + case block, ok := <-promise: + if !ok { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + return nil, errors.New("promise channel was closed") + } + } + return block, nil + case <-p.Done(): + return nil, p.Err() + } +} + +type wantFunc func(context.Context, []*cid.Cid) + +func getBlocksImpl(ctx context.Context, keys []*cid.Cid, notif notifications.PubSub, want wantFunc, cwants func([]*cid.Cid)) (<-chan blocks.Block, error) { + if len(keys) == 0 { + out := make(chan blocks.Block) + close(out) + return out, nil + } + + remaining := cid.NewSet() + promise := notif.Subscribe(ctx, keys...) + for _, k := range keys { + log.Event(ctx, "Bitswap.GetBlockRequest.Start", k) + remaining.Add(k) + } + + want(ctx, keys) + + out := make(chan blocks.Block) + go handleIncoming(ctx, remaining, promise, out, cwants) + return out, nil +} + +func handleIncoming(ctx context.Context, remaining *cid.Set, in <-chan blocks.Block, out chan blocks.Block, cfun func([]*cid.Cid)) { + ctx, cancel := context.WithCancel(ctx) + defer func() { + cancel() + close(out) + // can't just defer this call on its own, arguments are resolved *when* the defer is created + cfun(remaining.Keys()) + }() + for { + select { + case blk, ok := <-in: + if !ok { + return + } + + remaining.Remove(blk.Cid()) + select { + case out <- blk: + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } +} diff --git a/exchange/bitswap/session.go b/exchange/bitswap/session.go new file mode 100644 index 000000000..84ab680dd --- /dev/null +++ b/exchange/bitswap/session.go @@ -0,0 +1,221 @@ +package bitswap + +import ( + "context" + "time" + + blocks "github.com/ipfs/go-ipfs/blocks" + notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications" + + logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" + lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru" + loggables "gx/ipfs/QmVesPmqbPp7xRGyY96tnBwzDtVV1nqv4SCVxo5zCqKyH8/go-libp2p-loggables" + cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid" + peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer" +) + +const activeWantsLimit = 16 + +type Session struct { + ctx context.Context + tofetch []*cid.Cid + activePeers map[peer.ID]struct{} + activePeersArr []peer.ID + + bs *Bitswap + incoming chan blkRecv + newReqs chan []*cid.Cid + cancelKeys chan []*cid.Cid + + interest *lru.Cache + liveWants map[string]time.Time + liveCnt int + + tick *time.Timer + baseTickDelay time.Duration + + latTotal time.Duration + fetchcnt int + + notif notifications.PubSub + + uuid logging.Loggable +} + +func (bs *Bitswap) NewSession(ctx context.Context) *Session { + s := &Session{ + activePeers: make(map[peer.ID]struct{}), + liveWants: make(map[string]time.Time), + newReqs: make(chan []*cid.Cid), + cancelKeys: make(chan []*cid.Cid), + ctx: ctx, + bs: bs, + incoming: make(chan blkRecv), + notif: notifications.New(), + uuid: loggables.Uuid("GetBlockRequest"), + baseTickDelay: time.Millisecond * 500, + } + + cache, _ := lru.New(2048) + s.interest = cache + + bs.sessLk.Lock() + bs.sessions = append(bs.sessions, s) + bs.sessLk.Unlock() + + go s.run(ctx) + + return s +} + +type blkRecv struct { + from peer.ID + blk blocks.Block +} + +func (s *Session) ReceiveBlock(from peer.ID, blk blocks.Block) { + s.incoming <- blkRecv{from: from, blk: blk} +} + +func (s *Session) InterestedIn(c *cid.Cid) bool { + return s.interest.Contains(c.KeyString()) +} + +const provSearchDelay = time.Second * 10 + +func (s *Session) addActivePeer(p peer.ID) { + if _, ok := s.activePeers[p]; !ok { + s.activePeers[p] = struct{}{} + s.activePeersArr = append(s.activePeersArr, p) + } +} + +func (s *Session) resetTick() { + if s.latTotal == 0 { + s.tick.Reset(provSearchDelay) + } else { + avLat := s.latTotal / time.Duration(s.fetchcnt) + s.tick.Reset(s.baseTickDelay + (3 * avLat)) + } +} + +func (s *Session) run(ctx context.Context) { + s.tick = time.NewTimer(provSearchDelay) + newpeers := make(chan peer.ID, 16) + for { + select { + case blk := <-s.incoming: + s.tick.Stop() + + s.addActivePeer(blk.from) + + s.receiveBlock(ctx, blk.blk) + + s.resetTick() + case keys := <-s.newReqs: + for _, k := range keys { + s.interest.Add(k.KeyString(), nil) + } + if s.liveCnt < activeWantsLimit { + toadd := activeWantsLimit - s.liveCnt + if toadd > len(keys) { + toadd = len(keys) + } + s.liveCnt += toadd + + now := keys[:toadd] + keys = keys[toadd:] + + s.wantBlocks(ctx, now) + } + s.tofetch = append(s.tofetch, keys...) + case keys := <-s.cancelKeys: + s.cancel(keys) + + case <-s.tick.C: + var live []*cid.Cid + for c, _ := range s.liveWants { + cs, _ := cid.Cast([]byte(c)) + live = append(live, cs) + s.liveWants[c] = time.Now() + } + + // Broadcast these keys to everyone we're connected to + s.bs.wm.WantBlocks(ctx, live, nil) + + if len(live) > 0 { + go func() { + for p := range s.bs.network.FindProvidersAsync(ctx, live[0], 10) { + newpeers <- p + } + }() + } + s.resetTick() + case p := <-newpeers: + s.addActivePeer(p) + case <-ctx.Done(): + return + } + } +} + +func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) { + ks := blk.Cid().KeyString() + if _, ok := s.liveWants[ks]; ok { + s.liveCnt-- + tval := s.liveWants[ks] + s.latTotal += time.Since(tval) + s.fetchcnt++ + delete(s.liveWants, ks) + s.notif.Publish(blk) + + if len(s.tofetch) > 0 { + next := s.tofetch[0:1] + s.tofetch = s.tofetch[1:] + s.wantBlocks(ctx, next) + } + } +} + +func (s *Session) wantBlocks(ctx context.Context, ks []*cid.Cid) { + for _, c := range ks { + s.liveWants[c.KeyString()] = time.Now() + } + s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr) +} + +func (s *Session) cancel(keys []*cid.Cid) { + sset := cid.NewSet() + for _, c := range keys { + sset.Add(c) + } + var i, j int + for ; j < len(s.tofetch); j++ { + if sset.Has(s.tofetch[j]) { + continue + } + s.tofetch[i] = s.tofetch[j] + i++ + } + s.tofetch = s.tofetch[:i] +} + +func (s *Session) cancelWants(keys []*cid.Cid) { + s.cancelKeys <- keys +} + +func (s *Session) fetch(ctx context.Context, keys []*cid.Cid) { + select { + case s.newReqs <- keys: + case <-ctx.Done(): + } +} + +func (s *Session) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks.Block, error) { + ctx = logging.ContextWithLoggable(ctx, s.uuid) + return getBlocksImpl(ctx, keys, s.notif, s.fetch, s.cancelWants) +} + +func (s *Session) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) { + return getBlock(parent, k, s.GetBlocks) +} diff --git a/exchange/bitswap/session_test.go b/exchange/bitswap/session_test.go new file mode 100644 index 000000000..426acd90a --- /dev/null +++ b/exchange/bitswap/session_test.go @@ -0,0 +1,152 @@ +package bitswap + +import ( + "context" + "fmt" + "testing" + "time" + + blocks "github.com/ipfs/go-ipfs/blocks" + blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil" + + cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid" +) + +func TestBasicSessions(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vnet := getVirtualNetwork() + sesgen := NewTestSessionGenerator(vnet) + defer sesgen.Close() + bgen := blocksutil.NewBlockGenerator() + + block := bgen.Next() + inst := sesgen.Instances(2) + + a := inst[0] + b := inst[1] + + if err := b.Blockstore().Put(block); err != nil { + t.Fatal(err) + } + + sesa := a.Exchange.NewSession(ctx) + + blkout, err := sesa.GetBlock(ctx, block.Cid()) + if err != nil { + t.Fatal(err) + } + + if !blkout.Cid().Equals(block.Cid()) { + t.Fatal("got wrong block") + } +} + +func assertBlockLists(got, exp []blocks.Block) error { + if len(got) != len(exp) { + return fmt.Errorf("got wrong number of blocks, %d != %d", len(got), len(exp)) + } + + h := cid.NewSet() + for _, b := range got { + h.Add(b.Cid()) + } + for _, b := range exp { + if !h.Has(b.Cid()) { + return fmt.Errorf("didnt have: %s", b.Cid()) + } + } + return nil +} + +func TestSessionBetweenPeers(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vnet := getVirtualNetwork() + sesgen := NewTestSessionGenerator(vnet) + defer sesgen.Close() + bgen := blocksutil.NewBlockGenerator() + + inst := sesgen.Instances(10) + + blks := bgen.Blocks(101) + if err := inst[0].Blockstore().PutMany(blks); err != nil { + t.Fatal(err) + } + + var cids []*cid.Cid + for _, blk := range blks { + cids = append(cids, blk.Cid()) + } + + ses := inst[1].Exchange.NewSession(ctx) + if _, err := ses.GetBlock(ctx, cids[0]); err != nil { + t.Fatal(err) + } + blks = blks[1:] + cids = cids[1:] + + for i := 0; i < 10; i++ { + ch, err := ses.GetBlocks(ctx, cids[i*10:(i+1)*10]) + if err != nil { + t.Fatal(err) + } + + var got []blocks.Block + for b := range ch { + got = append(got, b) + } + if err := assertBlockLists(got, blks[i*10:(i+1)*10]); err != nil { + t.Fatal(err) + } + } + for _, is := range inst[2:] { + if is.Exchange.messagesRecvd > 2 { + t.Fatal("uninvolved nodes should only receive two messages", is.Exchange.messagesRecvd) + } + } +} + +func TestSessionSplitFetch(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vnet := getVirtualNetwork() + sesgen := NewTestSessionGenerator(vnet) + defer sesgen.Close() + bgen := blocksutil.NewBlockGenerator() + + inst := sesgen.Instances(11) + + blks := bgen.Blocks(100) + for i := 0; i < 10; i++ { + if err := inst[i].Blockstore().PutMany(blks[i*10 : (i+1)*10]); err != nil { + t.Fatal(err) + } + } + + var cids []*cid.Cid + for _, blk := range blks { + cids = append(cids, blk.Cid()) + } + + ses := inst[10].Exchange.NewSession(ctx) + ses.baseTickDelay = time.Millisecond * 10 + + for i := 0; i < 10; i++ { + ch, err := ses.GetBlocks(ctx, cids[i*10:(i+1)*10]) + if err != nil { + t.Fatal(err) + } + + var got []blocks.Block + for b := range ch { + got = append(got, b) + } + if err := assertBlockLists(got, blks[i*10:(i+1)*10]); err != nil { + t.Fatal(err) + } + } +} diff --git a/exchange/bitswap/testutils.go b/exchange/bitswap/testutils.go index 4bae29ce3..d3bb98b0e 100644 --- a/exchange/bitswap/testutils.go +++ b/exchange/bitswap/testutils.go @@ -47,7 +47,7 @@ func (g *SessionGenerator) Next() Instance { if err != nil { panic("FIXME") // TODO change signature } - return Session(g.ctx, g.net, p) + return MkSession(g.ctx, g.net, p) } func (g *SessionGenerator) Instances(n int) []Instance { @@ -86,7 +86,7 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration { // NB: It's easy make mistakes by providing the same peer ID to two different // sessions. To safeguard, use the SessionGenerator to generate sessions. It's // just a much better idea. -func Session(ctx context.Context, net tn.Network, p testutil.Identity) Instance { +func MkSession(ctx context.Context, net tn.Network, p testutil.Identity) Instance { bsdelay := delay.Fixed(0) adapter := net.Adapter(p) diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index 34bf78572..c8a617724 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -71,13 +71,13 @@ type msgQueue struct { done chan struct{} } -func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid) { +func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid, peers []peer.ID) { log.Infof("want blocks: %s", ks) - pm.addEntries(ctx, ks, false) + pm.addEntries(ctx, ks, peers, false) } -func (pm *WantManager) CancelWants(ks []*cid.Cid) { - pm.addEntries(context.Background(), ks, true) +func (pm *WantManager) CancelWants(ctx context.Context, ks []*cid.Cid, peers []peer.ID) { + pm.addEntries(context.Background(), ks, peers, true) } type wantSet struct { @@ -85,7 +85,7 @@ type wantSet struct { targets []peer.ID } -func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, cancel bool) { +func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, targets []peer.ID, cancel bool) { var entries []*bsmsg.Entry for i, k := range ks { entries = append(entries, &bsmsg.Entry{ @@ -98,7 +98,7 @@ func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, cancel boo }) } select { - case pm.incoming <- &wantSet{entries: entries}: + case pm.incoming <- &wantSet{entries: entries, targets: targets}: case <-pm.ctx.Done(): case <-ctx.Done(): } diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index 648bfa403..ac1e41eb8 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -49,7 +49,7 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { func (bs *Bitswap) taskWorker(ctx context.Context, id int) { idmap := logging.LoggableMap{"ID": id} - defer log.Info("bitswap task worker shutting down...") + defer log.Debug("bitswap task worker shutting down...") for { log.Event(ctx, "Bitswap.TaskWorker.Loop", idmap) select { From e43d1317bb7cf4e6ad0ad68a2c8f0b1e8422f680 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 27 Apr 2017 17:38:46 -0700 Subject: [PATCH 03/11] rework how refcounted wantlists work License: MIT Signed-off-by: Jeromy --- core/commands/bitswap.go | 6 +- exchange/bitswap/bitswap.go | 28 +++++-- exchange/bitswap/bitswap_test.go | 5 ++ exchange/bitswap/session.go | 22 ++++-- exchange/bitswap/wantlist/wantlist.go | 92 ++++++++++++++++------ exchange/bitswap/wantlist/wantlist_test.go | 87 ++++++++++++++++++++ exchange/bitswap/wantmanager.go | 23 +++--- 7 files changed, 211 insertions(+), 52 deletions(-) create mode 100644 exchange/bitswap/wantlist/wantlist_test.go diff --git a/core/commands/bitswap.go b/core/commands/bitswap.go index e7d94e912..542402b89 100644 --- a/core/commands/bitswap.go +++ b/core/commands/bitswap.go @@ -64,7 +64,11 @@ var unwantCmd = &cmds.Command{ ks = append(ks, c) } - bs.CancelWants(ks) + // TODO: This should maybe find *all* sessions for this request and cancel them? + // (why): in reality, i think this command should be removed. Its + // messing with the internal state of bitswap. You should cancel wants + // by killing the command that caused the want. + bs.CancelWants(ks, 0) }, } diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 74c70b108..065c209a9 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -169,6 +169,9 @@ type Bitswap struct { // Sessions sessions []*Session sessLk sync.Mutex + + sessID uint64 + sessIDLk sync.Mutex } type blockRequest struct { @@ -219,7 +222,9 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block log.Event(ctx, "Bitswap.GetBlockRequest.Start", k) } - bs.wm.WantBlocks(ctx, keys, nil) + mses := bs.getNextSessionID() + + bs.wm.WantBlocks(ctx, keys, nil, mses) // NB: Optimization. Assumes that providers of key[0] are likely to // be able to provide for all keys. This currently holds true in most @@ -241,7 +246,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block defer close(out) defer func() { // can't just defer this call on its own, arguments are resolved *when* the defer is created - bs.CancelWants(remaining.Keys()) + bs.CancelWants(remaining.Keys(), mses) }() for { select { @@ -250,6 +255,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block return } + bs.CancelWants([]*cid.Cid{blk.Cid()}, mses) remaining.Remove(blk.Cid()) select { case out <- blk: @@ -270,9 +276,16 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block } } +func (bs *Bitswap) getNextSessionID() uint64 { + bs.sessIDLk.Lock() + defer bs.sessIDLk.Unlock() + bs.sessID++ + return bs.sessID +} + // CancelWant removes a given key from the wantlist -func (bs *Bitswap) CancelWants(cids []*cid.Cid) { - bs.wm.CancelWants(context.Background(), cids, nil) +func (bs *Bitswap) CancelWants(cids []*cid.Cid, ses uint64) { + bs.wm.CancelWants(context.Background(), cids, nil, ses) } // HasBlock announces the existance of a block to this bitswap service. The @@ -314,7 +327,7 @@ func (bs *Bitswap) SessionsForBlock(c *cid.Cid) []*Session { var out []*Session for _, s := range bs.sessions { - if s.InterestedIn(c) { + if s.interestedIn(c) { out = append(out, s) } } @@ -346,8 +359,6 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg keys = append(keys, block.Cid()) } - bs.wm.CancelWants(context.Background(), keys, nil) - wg := sync.WaitGroup{} for _, block := range iblocks { wg.Add(1) @@ -360,7 +371,8 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg log.Event(ctx, "Bitswap.GetBlockRequest.End", k) for _, ses := range bs.SessionsForBlock(k) { - ses.ReceiveBlock(p, b) + ses.receiveBlockFrom(p, b) + bs.CancelWants([]*cid.Cid{k}, ses.id) } log.Debugf("got block %s from %s", b, p) if err := bs.HasBlock(b); err != nil { diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 76a28d5dc..e73022f62 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -332,6 +332,11 @@ func TestBasicBitswap(t *testing.T) { t.Fatal(err) } + time.Sleep(time.Millisecond * 20) + if len(instances[1].Exchange.GetWantlist()) != 0 { + t.Fatal("shouldnt have anything in wantlist") + } + st0, err := instances[0].Exchange.Stat() if err != nil { t.Fatal(err) diff --git a/exchange/bitswap/session.go b/exchange/bitswap/session.go index 84ab680dd..0a5c7426a 100644 --- a/exchange/bitswap/session.go +++ b/exchange/bitswap/session.go @@ -16,6 +16,9 @@ import ( const activeWantsLimit = 16 +// Session holds state for an individual bitswap transfer operation. +// This allows bitswap to make smarter decisions about who to send wantlist +// info to, and who to request blocks from type Session struct { ctx context.Context tofetch []*cid.Cid @@ -40,8 +43,12 @@ type Session struct { notif notifications.PubSub uuid logging.Loggable + + id uint64 } +// NewSession creates a new bitswap session whose lifetime is bounded by the +// given context func (bs *Bitswap) NewSession(ctx context.Context) *Session { s := &Session{ activePeers: make(map[peer.ID]struct{}), @@ -54,6 +61,7 @@ func (bs *Bitswap) NewSession(ctx context.Context) *Session { notif: notifications.New(), uuid: loggables.Uuid("GetBlockRequest"), baseTickDelay: time.Millisecond * 500, + id: bs.getNextSessionID(), } cache, _ := lru.New(2048) @@ -73,11 +81,11 @@ type blkRecv struct { blk blocks.Block } -func (s *Session) ReceiveBlock(from peer.ID, blk blocks.Block) { +func (s *Session) receiveBlockFrom(from peer.ID, blk blocks.Block) { s.incoming <- blkRecv{from: from, blk: blk} } -func (s *Session) InterestedIn(c *cid.Cid) bool { +func (s *Session) interestedIn(c *cid.Cid) bool { return s.interest.Contains(c.KeyString()) } @@ -134,14 +142,14 @@ func (s *Session) run(ctx context.Context) { case <-s.tick.C: var live []*cid.Cid - for c, _ := range s.liveWants { + for c := range s.liveWants { cs, _ := cid.Cast([]byte(c)) live = append(live, cs) s.liveWants[c] = time.Now() } // Broadcast these keys to everyone we're connected to - s.bs.wm.WantBlocks(ctx, live, nil) + s.bs.wm.WantBlocks(ctx, live, nil, s.id) if len(live) > 0 { go func() { @@ -181,7 +189,7 @@ func (s *Session) wantBlocks(ctx context.Context, ks []*cid.Cid) { for _, c := range ks { s.liveWants[c.KeyString()] = time.Now() } - s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr) + s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id) } func (s *Session) cancel(keys []*cid.Cid) { @@ -211,11 +219,15 @@ func (s *Session) fetch(ctx context.Context, keys []*cid.Cid) { } } +// GetBlocks fetches a set of blocks within the context of this session and +// returns a channel that found blocks will be returned on. No order is +// guaranteed on the returned blocks. func (s *Session) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks.Block, error) { ctx = logging.ContextWithLoggable(ctx, s.uuid) return getBlocksImpl(ctx, keys, s.notif, s.fetch, s.cancelWants) } +// GetBlock fetches a single block func (s *Session) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) { return getBlock(parent, k, s.GetBlocks) } diff --git a/exchange/bitswap/wantlist/wantlist.go b/exchange/bitswap/wantlist/wantlist.go index 7c77998b3..06b5b80dc 100644 --- a/exchange/bitswap/wantlist/wantlist.go +++ b/exchange/bitswap/wantlist/wantlist.go @@ -10,8 +10,8 @@ import ( ) type ThreadSafe struct { - lk sync.RWMutex - Wantlist Wantlist + lk sync.RWMutex + set map[string]*Entry } // not threadsafe @@ -23,7 +23,16 @@ type Entry struct { Cid *cid.Cid Priority int - RefCnt int + SesTrk map[uint64]struct{} +} + +// NewRefEntry creates a new reference tracked wantlist entry +func NewRefEntry(c *cid.Cid, p int) *Entry { + return &Entry{ + Cid: c, + Priority: p, + SesTrk: make(map[uint64]struct{}), + } } type entrySlice []*Entry @@ -34,7 +43,7 @@ func (es entrySlice) Less(i, j int) bool { return es[i].Priority > es[j].Priorit func NewThreadSafe() *ThreadSafe { return &ThreadSafe{ - Wantlist: *New(), + set: make(map[string]*Entry), } } @@ -44,46 +53,86 @@ func New() *Wantlist { } } -func (w *ThreadSafe) Add(k *cid.Cid, priority int) bool { +func (w *ThreadSafe) Add(c *cid.Cid, priority int, ses uint64) bool { w.lk.Lock() defer w.lk.Unlock() - return w.Wantlist.Add(k, priority) + k := c.KeyString() + if e, ok := w.set[k]; ok { + e.SesTrk[ses] = struct{}{} + return false + } + + w.set[k] = &Entry{ + Cid: c, + Priority: priority, + SesTrk: map[uint64]struct{}{ses: struct{}{}}, + } + + return true } -func (w *ThreadSafe) AddEntry(e *Entry) bool { +func (w *ThreadSafe) AddEntry(e *Entry, ses uint64) bool { w.lk.Lock() defer w.lk.Unlock() - return w.Wantlist.AddEntry(e) + k := e.Cid.KeyString() + if ex, ok := w.set[k]; ok { + ex.SesTrk[ses] = struct{}{} + return false + } + w.set[k] = e + e.SesTrk[ses] = struct{}{} + return true } -func (w *ThreadSafe) Remove(k *cid.Cid) bool { +func (w *ThreadSafe) Remove(c *cid.Cid, ses uint64) bool { w.lk.Lock() defer w.lk.Unlock() - return w.Wantlist.Remove(k) + k := c.KeyString() + e, ok := w.set[k] + if !ok { + return false + } + + delete(e.SesTrk, ses) + if len(e.SesTrk) == 0 { + delete(w.set, k) + return true + } + return false } func (w *ThreadSafe) Contains(k *cid.Cid) (*Entry, bool) { w.lk.RLock() defer w.lk.RUnlock() - return w.Wantlist.Contains(k) + e, ok := w.set[k.KeyString()] + return e, ok } func (w *ThreadSafe) Entries() []*Entry { w.lk.RLock() defer w.lk.RUnlock() - return w.Wantlist.Entries() + var es entrySlice + for _, e := range w.set { + es = append(es, e) + } + return es } func (w *ThreadSafe) SortedEntries() []*Entry { w.lk.RLock() defer w.lk.RUnlock() - return w.Wantlist.SortedEntries() + var es entrySlice + for _, e := range w.set { + es = append(es, e) + } + sort.Sort(es) + return es } func (w *ThreadSafe) Len() int { w.lk.RLock() defer w.lk.RUnlock() - return w.Wantlist.Len() + return len(w.set) } func (w *Wantlist) Len() int { @@ -92,15 +141,13 @@ func (w *Wantlist) Len() int { func (w *Wantlist) Add(c *cid.Cid, priority int) bool { k := c.KeyString() - if e, ok := w.set[k]; ok { - e.RefCnt++ + if _, ok := w.set[k]; ok { return false } w.set[k] = &Entry{ Cid: c, Priority: priority, - RefCnt: 1, } return true @@ -108,8 +155,7 @@ func (w *Wantlist) Add(c *cid.Cid, priority int) bool { func (w *Wantlist) AddEntry(e *Entry) bool { k := e.Cid.KeyString() - if ex, ok := w.set[k]; ok { - ex.RefCnt++ + if _, ok := w.set[k]; ok { return false } w.set[k] = e @@ -118,16 +164,12 @@ func (w *Wantlist) AddEntry(e *Entry) bool { func (w *Wantlist) Remove(c *cid.Cid) bool { k := c.KeyString() - e, ok := w.set[k] + _, ok := w.set[k] if !ok { return false } - e.RefCnt-- - if e.RefCnt <= 0 { - delete(w.set, k) - return true - } + delete(w.set, k) return false } diff --git a/exchange/bitswap/wantlist/wantlist_test.go b/exchange/bitswap/wantlist/wantlist_test.go new file mode 100644 index 000000000..a88825dcd --- /dev/null +++ b/exchange/bitswap/wantlist/wantlist_test.go @@ -0,0 +1,87 @@ +package wantlist + +import ( + "testing" + + cid "gx/ipfs/QmYhQaCYEcaPPjxJX7YcPcVKkQfRy6sJ7B3XmGFk82XYdQ/go-cid" +) + +var testcids []*cid.Cid + +func init() { + strs := []string{ + "QmQL8LqkEgYXaDHdNYCG2mmpow7Sp8Z8Kt3QS688vyBeC7", + "QmcBDsdjgSXU7BP4A4V8LJCXENE5xVwnhrhRGVTJr9YCVj", + "QmQakgd2wDxc3uUF4orGdEm28zUT9Mmimp5pyPG2SFS9Gj", + } + for _, s := range strs { + c, err := cid.Decode(s) + if err != nil { + panic(err) + } + testcids = append(testcids, c) + } + +} + +type wli interface { + Contains(*cid.Cid) (*Entry, bool) +} + +func assertHasCid(t *testing.T, w wli, c *cid.Cid) { + e, ok := w.Contains(c) + if !ok { + t.Fatal("expected to have ", c) + } + if !e.Cid.Equals(c) { + t.Fatal("returned entry had wrong cid value") + } +} + +func assertNotHasCid(t *testing.T, w wli, c *cid.Cid) { + _, ok := w.Contains(c) + if ok { + t.Fatal("expected not to have ", c) + } +} + +func TestBasicWantlist(t *testing.T) { + wl := New() + + wl.Add(testcids[0], 5) + assertHasCid(t, wl, testcids[0]) + wl.Add(testcids[1], 4) + assertHasCid(t, wl, testcids[0]) + assertHasCid(t, wl, testcids[1]) + + if wl.Len() != 2 { + t.Fatal("should have had two items") + } + + wl.Add(testcids[1], 4) + assertHasCid(t, wl, testcids[0]) + assertHasCid(t, wl, testcids[1]) + + if wl.Len() != 2 { + t.Fatal("should have had two items") + } + + wl.Remove(testcids[0]) + assertHasCid(t, wl, testcids[1]) + if _, has := wl.Contains(testcids[0]); has { + t.Fatal("shouldnt have this cid") + } +} + +func TestSesRefWantlist(t *testing.T) { + wl := NewThreadSafe() + + wl.Add(testcids[0], 5, 1) + assertHasCid(t, wl, testcids[0]) + wl.Remove(testcids[0], 2) + assertHasCid(t, wl, testcids[0]) + wl.Add(testcids[0], 5, 1) + assertHasCid(t, wl, testcids[0]) + wl.Remove(testcids[0], 1) + assertNotHasCid(t, wl, testcids[0]) +} diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index c8a617724..cb5627b10 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -71,34 +71,31 @@ type msgQueue struct { done chan struct{} } -func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid, peers []peer.ID) { +func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) { log.Infof("want blocks: %s", ks) - pm.addEntries(ctx, ks, peers, false) + pm.addEntries(ctx, ks, peers, false, ses) } -func (pm *WantManager) CancelWants(ctx context.Context, ks []*cid.Cid, peers []peer.ID) { - pm.addEntries(context.Background(), ks, peers, true) +func (pm *WantManager) CancelWants(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) { + pm.addEntries(context.Background(), ks, peers, true, ses) } type wantSet struct { entries []*bsmsg.Entry targets []peer.ID + from uint64 } -func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, targets []peer.ID, cancel bool) { +func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, targets []peer.ID, cancel bool, ses uint64) { var entries []*bsmsg.Entry for i, k := range ks { entries = append(entries, &bsmsg.Entry{ Cancel: cancel, - Entry: &wantlist.Entry{ - Cid: k, - Priority: kMaxPriority - i, - RefCnt: 1, - }, + Entry: wantlist.NewRefEntry(k, kMaxPriority-i), }) } select { - case pm.incoming <- &wantSet{entries: entries, targets: targets}: + case pm.incoming <- &wantSet{entries: entries, targets: targets, from: ses}: case <-pm.ctx.Done(): case <-ctx.Done(): } @@ -290,11 +287,11 @@ func (pm *WantManager) Run() { // add changes to our wantlist for _, e := range ws.entries { if e.Cancel { - if pm.wl.Remove(e.Cid) { + if pm.wl.Remove(e.Cid, ws.from) { pm.wantlistGauge.Dec() } } else { - if pm.wl.AddEntry(e.Entry) { + if pm.wl.AddEntry(e.Entry, ws.from) { pm.wantlistGauge.Inc() } } From b680f49363f9d699086644f1ad7d3a02c11e8e11 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 2 May 2017 22:54:01 -0700 Subject: [PATCH 04/11] fix wantlist removal accounting, add tests License: MIT Signed-off-by: Jeromy --- exchange/bitswap/bitswap.go | 3 ++ exchange/bitswap/bitswap_test.go | 6 +++- exchange/bitswap/decision/engine.go | 11 +++----- exchange/bitswap/wantlist/wantlist.go | 2 +- exchange/bitswap/wantlist/wantlist_test.go | 33 ++++++++++++++++------ 5 files changed, 38 insertions(+), 17 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 065c209a9..85f9a05da 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -285,6 +285,9 @@ func (bs *Bitswap) getNextSessionID() uint64 { // CancelWant removes a given key from the wantlist func (bs *Bitswap) CancelWants(cids []*cid.Cid, ses uint64) { + if len(cids) == 0 { + return + } bs.wm.CancelWants(context.Background(), cids, nil, ses) } diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index e73022f62..26ea61f43 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -318,7 +318,7 @@ func TestBasicBitswap(t *testing.T) { t.Log("Test a one node trying to get one block from another") - instances := sg.Instances(2) + instances := sg.Instances(3) blocks := bg.Blocks(1) err := instances[0].Exchange.HasBlock(blocks[0]) if err != nil { @@ -333,6 +333,10 @@ func TestBasicBitswap(t *testing.T) { } time.Sleep(time.Millisecond * 20) + wl := instances[2].Exchange.WantlistForPeer(instances[1].Peer) + if len(wl) != 0 { + t.Fatal("should have no items in other peers wantlist") + } if len(instances[1].Exchange.GetWantlist()) != 0 { t.Fatal("shouldnt have anything in wantlist") } diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index 973a7eb85..600df11f2 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -105,13 +105,10 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { } func (e *Engine) WantlistForPeer(p peer.ID) (out []*wl.Entry) { - e.lock.Lock() - partner, ok := e.ledgerMap[p] - if ok { - out = partner.wantList.SortedEntries() - } - e.lock.Unlock() - return out + partner := e.findOrCreate(p) + partner.lk.Lock() + defer partner.lk.Unlock() + return partner.wantList.SortedEntries() } func (e *Engine) LedgerForPeer(p peer.ID) *Receipt { diff --git a/exchange/bitswap/wantlist/wantlist.go b/exchange/bitswap/wantlist/wantlist.go index 06b5b80dc..73b45815b 100644 --- a/exchange/bitswap/wantlist/wantlist.go +++ b/exchange/bitswap/wantlist/wantlist.go @@ -170,7 +170,7 @@ func (w *Wantlist) Remove(c *cid.Cid) bool { } delete(w.set, k) - return false + return true } func (w *Wantlist) Contains(k *cid.Cid) (*Entry, bool) { diff --git a/exchange/bitswap/wantlist/wantlist_test.go b/exchange/bitswap/wantlist/wantlist_test.go index a88825dcd..e3aee3060 100644 --- a/exchange/bitswap/wantlist/wantlist_test.go +++ b/exchange/bitswap/wantlist/wantlist_test.go @@ -48,9 +48,13 @@ func assertNotHasCid(t *testing.T, w wli, c *cid.Cid) { func TestBasicWantlist(t *testing.T) { wl := New() - wl.Add(testcids[0], 5) + if !wl.Add(testcids[0], 5) { + t.Fatal("expected true") + } assertHasCid(t, wl, testcids[0]) - wl.Add(testcids[1], 4) + if !wl.Add(testcids[1], 4) { + t.Fatal("expected true") + } assertHasCid(t, wl, testcids[0]) assertHasCid(t, wl, testcids[1]) @@ -58,7 +62,9 @@ func TestBasicWantlist(t *testing.T) { t.Fatal("should have had two items") } - wl.Add(testcids[1], 4) + if wl.Add(testcids[1], 4) { + t.Fatal("add shouldnt report success on second add") + } assertHasCid(t, wl, testcids[0]) assertHasCid(t, wl, testcids[1]) @@ -66,7 +72,10 @@ func TestBasicWantlist(t *testing.T) { t.Fatal("should have had two items") } - wl.Remove(testcids[0]) + if !wl.Remove(testcids[0]) { + t.Fatal("should have gotten true") + } + assertHasCid(t, wl, testcids[1]) if _, has := wl.Contains(testcids[0]); has { t.Fatal("shouldnt have this cid") @@ -76,12 +85,20 @@ func TestBasicWantlist(t *testing.T) { func TestSesRefWantlist(t *testing.T) { wl := NewThreadSafe() - wl.Add(testcids[0], 5, 1) + if !wl.Add(testcids[0], 5, 1) { + t.Fatal("should have added") + } assertHasCid(t, wl, testcids[0]) - wl.Remove(testcids[0], 2) + if wl.Remove(testcids[0], 2) { + t.Fatal("shouldnt have removed") + } assertHasCid(t, wl, testcids[0]) - wl.Add(testcids[0], 5, 1) + if wl.Add(testcids[0], 5, 1) { + t.Fatal("shouldnt have added") + } assertHasCid(t, wl, testcids[0]) - wl.Remove(testcids[0], 1) + if !wl.Remove(testcids[0], 1) { + t.Fatal("should have removed") + } assertNotHasCid(t, wl, testcids[0]) } From b1247d3323d2c13fa451210fc2e3ea31177380a9 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 4 May 2017 18:00:15 -0700 Subject: [PATCH 05/11] WIP: wire sessions up through into FetchGraph License: MIT Signed-off-by: Jeromy --- blockservice/blockservice.go | 55 +++++++++++++++++++--- exchange/bitswap/bitswap.go | 1 - exchange/bitswap/get.go | 2 +- exchange/bitswap/session.go | 2 +- exchange/bitswap/session_test.go | 2 +- exchange/bitswap/wantlist/wantlist_test.go | 2 +- exchange/interface.go | 11 +++-- merkledag/merkledag.go | 23 ++++++++- 8 files changed, 81 insertions(+), 17 deletions(-) diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 98afac842..e034b46fb 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -10,9 +10,10 @@ import ( "github.com/ipfs/go-ipfs/blocks/blockstore" exchange "github.com/ipfs/go-ipfs/exchange" - blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format" + bitswap "github.com/ipfs/go-ipfs/exchange/bitswap" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" + blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format" cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid" ) @@ -31,6 +32,7 @@ type BlockService interface { GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block DeleteBlock(o blocks.Block) error + NewSession(context.Context) *Session Close() error } @@ -77,6 +79,21 @@ func (bs *blockService) Exchange() exchange.Interface { return bs.exchange } +func (bs *blockService) NewSession(ctx context.Context) *Session { + bswap, ok := bs.Exchange().(*bitswap.Bitswap) + if ok { + ses := bswap.NewSession(ctx) + return &Session{ + ses: ses, + bs: bs.blockstore, + } + } + return &Session{ + ses: bs.exchange, + bs: bs.blockstore, + } +} + // AddBlock adds a particular block to the service, Putting it into the datastore. // TODO pass a context into this if the remote.HasBlock is going to remain here. func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) { @@ -141,16 +158,25 @@ func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) { func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) { log.Debugf("BlockService GetBlock: '%s'", c) - block, err := s.blockstore.Get(c) + var f exchange.Fetcher + if s.exchange != nil { + f = s.exchange + } + + return getBlock(ctx, c, s.blockstore, f) +} + +func getBlock(ctx context.Context, c *cid.Cid, bs blockstore.Blockstore, f exchange.Fetcher) (blocks.Block, error) { + block, err := bs.Get(c) if err == nil { return block, nil } - if err == blockstore.ErrNotFound && s.exchange != nil { + if err == blockstore.ErrNotFound && f != nil { // TODO be careful checking ErrNotFound. If the underlying // implementation changes, this will break. log.Debug("Blockservice: Searching bitswap") - blk, err := s.exchange.GetBlock(ctx, c) + blk, err := f.GetBlock(ctx, c) if err != nil { if err == blockstore.ErrNotFound { return nil, ErrNotFound @@ -172,12 +198,16 @@ func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, // the returned channel. // NB: No guarantees are made about order. func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block { + return getBlocks(ctx, ks, s.blockstore, s.exchange) +} + +func getBlocks(ctx context.Context, ks []*cid.Cid, bs blockstore.Blockstore, f exchange.Fetcher) <-chan blocks.Block { out := make(chan blocks.Block) go func() { defer close(out) var misses []*cid.Cid for _, c := range ks { - hit, err := s.blockstore.Get(c) + hit, err := bs.Get(c) if err != nil { misses = append(misses, c) continue @@ -194,7 +224,7 @@ func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc return } - rblocks, err := s.exchange.GetBlocks(ctx, misses) + rblocks, err := f.GetBlocks(ctx, misses) if err != nil { log.Debugf("Error with GetBlocks: %s", err) return @@ -220,3 +250,16 @@ func (s *blockService) Close() error { log.Debug("blockservice is shutting down...") return s.exchange.Close() } + +type Session struct { + bs blockstore.Blockstore + ses exchange.Fetcher +} + +func (s *Session) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) { + return getBlock(ctx, c, s.bs, s.ses) +} + +func (s *Session) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block { + return getBlocks(ctx, ks, s.bs, s.ses) +} diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 85f9a05da..dd58aee7a 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -23,7 +23,6 @@ import ( process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" - loggables "gx/ipfs/QmVesPmqbPp7xRGyY96tnBwzDtVV1nqv4SCVxo5zCqKyH8/go-libp2p-loggables" blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format" cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid" peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer" diff --git a/exchange/bitswap/get.go b/exchange/bitswap/get.go index 3a64f5117..a72ead83a 100644 --- a/exchange/bitswap/get.go +++ b/exchange/bitswap/get.go @@ -4,9 +4,9 @@ import ( "context" "errors" - blocks "github.com/ipfs/go-ipfs/blocks" blockstore "github.com/ipfs/go-ipfs/blocks/blockstore" notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications" + blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format" cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid" ) diff --git a/exchange/bitswap/session.go b/exchange/bitswap/session.go index 0a5c7426a..7f1e21d03 100644 --- a/exchange/bitswap/session.go +++ b/exchange/bitswap/session.go @@ -4,8 +4,8 @@ import ( "context" "time" - blocks "github.com/ipfs/go-ipfs/blocks" notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications" + blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru" diff --git a/exchange/bitswap/session_test.go b/exchange/bitswap/session_test.go index 426acd90a..d7808b89d 100644 --- a/exchange/bitswap/session_test.go +++ b/exchange/bitswap/session_test.go @@ -6,8 +6,8 @@ import ( "testing" "time" - blocks "github.com/ipfs/go-ipfs/blocks" blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil" + blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format" cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid" ) diff --git a/exchange/bitswap/wantlist/wantlist_test.go b/exchange/bitswap/wantlist/wantlist_test.go index e3aee3060..d6027a718 100644 --- a/exchange/bitswap/wantlist/wantlist_test.go +++ b/exchange/bitswap/wantlist/wantlist_test.go @@ -3,7 +3,7 @@ package wantlist import ( "testing" - cid "gx/ipfs/QmYhQaCYEcaPPjxJX7YcPcVKkQfRy6sJ7B3XmGFk82XYdQ/go-cid" + cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid" ) var testcids []*cid.Cid diff --git a/exchange/interface.go b/exchange/interface.go index fb590d25a..5b9135342 100644 --- a/exchange/interface.go +++ b/exchange/interface.go @@ -13,10 +13,7 @@ import ( // Any type that implements exchange.Interface may be used as an IPFS block // exchange protocol. type Interface interface { // type Exchanger interface - // GetBlock returns the block associated with a given key. - GetBlock(context.Context, *cid.Cid) (blocks.Block, error) - - GetBlocks(context.Context, []*cid.Cid) (<-chan blocks.Block, error) + Fetcher // TODO Should callers be concerned with whether the block was made // available on the network? @@ -26,3 +23,9 @@ type Interface interface { // type Exchanger interface io.Closer } + +type Fetcher interface { + // GetBlock returns the block associated with a given key. + GetBlock(context.Context, *cid.Cid) (blocks.Block, error) + GetBlocks(context.Context, []*cid.Cid) (<-chan blocks.Block, error) +} diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index d23a42d07..f6ee7e562 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -161,11 +161,30 @@ func GetLinksDirect(serv node.NodeGetter) GetLinks { } } +type sesGetter struct { + bs *bserv.Session +} + +func (sg *sesGetter) Get(ctx context.Context, c *cid.Cid) (node.Node, error) { + blk, err := sg.bs.GetBlock(ctx, c) + if err != nil { + return nil, err + } + + return decodeBlock(blk) +} + // FetchGraph fetches all nodes that are children of the given node func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error { + var ng node.NodeGetter = serv + ds, ok := serv.(*dagService) + if ok { + ng = &sesGetter{ds.Blocks.NewSession(ctx)} + } + v, _ := ctx.Value("progress").(*ProgressTracker) if v == nil { - return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, cid.NewSet().Visit) + return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, cid.NewSet().Visit) } set := cid.NewSet() visit := func(c *cid.Cid) bool { @@ -176,7 +195,7 @@ func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error { return false } } - return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, visit) + return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, visit) } // FindLinks searches this nodes links for the given key, From 579fd46488576321252a3a2c96340a16ea2d62c6 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 19 May 2017 21:04:11 -0700 Subject: [PATCH 06/11] track broadcasted wantlist entries License: MIT Signed-off-by: Jeromy --- blockservice/blockservice.go | 3 ++ core/commands/bitswap.go | 5 +++ exchange/bitswap/bitswap.go | 1 + exchange/bitswap/bitswap_test.go | 2 +- exchange/bitswap/session.go | 54 ++++++++++++++++++++------- exchange/bitswap/session_test.go | 52 ++++++++++++++++++++++++++ exchange/bitswap/wantlist/wantlist.go | 14 +++++++ exchange/bitswap/wantmanager.go | 49 ++++++++++++------------ exchange/interface.go | 1 + merkledag/merkledag.go | 3 ++ 10 files changed, 146 insertions(+), 38 deletions(-) diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index e034b46fb..c6ecbc386 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -251,15 +251,18 @@ func (s *blockService) Close() error { return s.exchange.Close() } +// Session is a helper type to provide higher level access to bitswap sessions type Session struct { bs blockstore.Blockstore ses exchange.Fetcher } +// GetBlock gets a block in the context of a request session func (s *Session) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) { return getBlock(ctx, c, s.bs, s.ses) } +// GetBlocks gets blocks in the context of a request session func (s *Session) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block { return getBlocks(ctx, ks, s.bs, s.ses) } diff --git a/core/commands/bitswap.go b/core/commands/bitswap.go index 542402b89..98dd54f59 100644 --- a/core/commands/bitswap.go +++ b/core/commands/bitswap.go @@ -111,6 +111,11 @@ Print out all blocks currently on the bitswap wantlist for the local peer.`, res.SetError(err, cmds.ErrNormal) return } + if pid == nd.Identity { + res.SetOutput(&KeyList{bs.GetWantlist()}) + return + } + res.SetOutput(&KeyList{bs.WantlistForPeer(pid)}) } else { res.SetOutput(&KeyList{bs.GetWantlist()}) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index dd58aee7a..e0da2477a 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -323,6 +323,7 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error { return nil } +// SessionsForBlock returns a slice of all sessions that may be interested in the given cid func (bs *Bitswap) SessionsForBlock(c *cid.Cid) []*Session { bs.sessLk.Lock() defer bs.sessLk.Unlock() diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 26ea61f43..7842ae559 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -332,7 +332,7 @@ func TestBasicBitswap(t *testing.T) { t.Fatal(err) } - time.Sleep(time.Millisecond * 20) + time.Sleep(time.Millisecond * 25) wl := instances[2].Exchange.WantlistForPeer(instances[1].Peer) if len(wl) != 0 { t.Fatal("should have no items in other peers wantlist") diff --git a/exchange/bitswap/session.go b/exchange/bitswap/session.go index 7f1e21d03..128b377d4 100644 --- a/exchange/bitswap/session.go +++ b/exchange/bitswap/session.go @@ -25,14 +25,14 @@ type Session struct { activePeers map[peer.ID]struct{} activePeersArr []peer.ID - bs *Bitswap - incoming chan blkRecv - newReqs chan []*cid.Cid - cancelKeys chan []*cid.Cid + bs *Bitswap + incoming chan blkRecv + newReqs chan []*cid.Cid + cancelKeys chan []*cid.Cid + interestReqs chan interestReq interest *lru.Cache liveWants map[string]time.Time - liveCnt int tick *time.Timer baseTickDelay time.Duration @@ -55,6 +55,7 @@ func (bs *Bitswap) NewSession(ctx context.Context) *Session { liveWants: make(map[string]time.Time), newReqs: make(chan []*cid.Cid), cancelKeys: make(chan []*cid.Cid), + interestReqs: make(chan interestReq), ctx: ctx, bs: bs, incoming: make(chan blkRecv), @@ -85,8 +86,29 @@ func (s *Session) receiveBlockFrom(from peer.ID, blk blocks.Block) { s.incoming <- blkRecv{from: from, blk: blk} } +type interestReq struct { + c *cid.Cid + resp chan bool +} + +// TODO: PERF: this is using a channel to guard a map access against race +// conditions. This is definitely much slower than a mutex, though its unclear +// if it will actually induce any noticeable slowness. This is implemented this +// way to avoid adding a more complex set of mutexes around the liveWants map. +// note that in the average case (where this session *is* interested in the +// block we received) this function will not be called, as the cid will likely +// still be in the interest cache. +func (s *Session) isLiveWant(c *cid.Cid) bool { + resp := make(chan bool) + s.interestReqs <- interestReq{ + c: c, + resp: resp, + } + return <-resp +} + func (s *Session) interestedIn(c *cid.Cid) bool { - return s.interest.Contains(c.KeyString()) + return s.interest.Contains(c.KeyString()) || s.isLiveWant(c) } const provSearchDelay = time.Second * 10 @@ -124,12 +146,11 @@ func (s *Session) run(ctx context.Context) { for _, k := range keys { s.interest.Add(k.KeyString(), nil) } - if s.liveCnt < activeWantsLimit { - toadd := activeWantsLimit - s.liveCnt + if len(s.liveWants) < activeWantsLimit { + toadd := activeWantsLimit - len(s.liveWants) if toadd > len(keys) { toadd = len(keys) } - s.liveCnt += toadd now := keys[:toadd] keys = keys[toadd:] @@ -152,15 +173,23 @@ func (s *Session) run(ctx context.Context) { s.bs.wm.WantBlocks(ctx, live, nil, s.id) if len(live) > 0 { - go func() { - for p := range s.bs.network.FindProvidersAsync(ctx, live[0], 10) { + go func(k *cid.Cid) { + // TODO: have a task queue setup for this to: + // - rate limit + // - manage timeouts + // - ensure two 'findprovs' calls for the same block don't run concurrently + // - share peers between sessions based on interest set + for p := range s.bs.network.FindProvidersAsync(ctx, k, 10) { newpeers <- p } - }() + }(live[0]) } s.resetTick() case p := <-newpeers: s.addActivePeer(p) + case lwchk := <-s.interestReqs: + _, ok := s.liveWants[lwchk.c.KeyString()] + lwchk.resp <- ok case <-ctx.Done(): return } @@ -170,7 +199,6 @@ func (s *Session) run(ctx context.Context) { func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) { ks := blk.Cid().KeyString() if _, ok := s.liveWants[ks]; ok { - s.liveCnt-- tval := s.liveWants[ks] s.latTotal += time.Since(tval) s.fetchcnt++ diff --git a/exchange/bitswap/session_test.go b/exchange/bitswap/session_test.go index d7808b89d..e2b959fed 100644 --- a/exchange/bitswap/session_test.go +++ b/exchange/bitswap/session_test.go @@ -150,3 +150,55 @@ func TestSessionSplitFetch(t *testing.T) { } } } + +func TestInterestCacheOverflow(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vnet := getVirtualNetwork() + sesgen := NewTestSessionGenerator(vnet) + defer sesgen.Close() + bgen := blocksutil.NewBlockGenerator() + + blks := bgen.Blocks(2049) + inst := sesgen.Instances(2) + + a := inst[0] + b := inst[1] + + ses := a.Exchange.NewSession(ctx) + zeroch, err := ses.GetBlocks(ctx, []*cid.Cid{blks[0].Cid()}) + if err != nil { + t.Fatal(err) + } + + var restcids []*cid.Cid + for _, blk := range blks[1:] { + restcids = append(restcids, blk.Cid()) + } + + restch, err := ses.GetBlocks(ctx, restcids) + if err != nil { + t.Fatal(err) + } + + // wait to ensure that all the above cids were added to the sessions cache + time.Sleep(time.Millisecond * 50) + + if err := b.Exchange.HasBlock(blks[0]); err != nil { + t.Fatal(err) + } + + select { + case blk, ok := <-zeroch: + if ok && blk.Cid().Equals(blks[0].Cid()) { + // success! + } else { + t.Fatal("failed to get the block") + } + case <-restch: + t.Fatal("should not get anything on restch") + case <-time.After(time.Second * 5): + t.Fatal("timed out waiting for block") + } +} diff --git a/exchange/bitswap/wantlist/wantlist.go b/exchange/bitswap/wantlist/wantlist.go index 73b45815b..5902442ca 100644 --- a/exchange/bitswap/wantlist/wantlist.go +++ b/exchange/bitswap/wantlist/wantlist.go @@ -53,6 +53,14 @@ func New() *Wantlist { } } +// Add adds the given cid to the wantlist with the specified priority, governed +// by the session ID 'ses'. if a cid is added under multiple session IDs, then +// it must be removed by each of those sessions before it is no longer 'in the +// wantlist'. Calls to Add are idempotent given the same arguments. Subsequent +// calls with different values for priority will not update the priority +// TODO: think through priority changes here +// Add returns true if the cid did not exist in the wantlist before this call +// (even if it was under a different session) func (w *ThreadSafe) Add(c *cid.Cid, priority int, ses uint64) bool { w.lk.Lock() defer w.lk.Unlock() @@ -84,6 +92,10 @@ func (w *ThreadSafe) AddEntry(e *Entry, ses uint64) bool { return true } +// Remove removes the given cid from being tracked by the given session. +// 'true' is returned if this call to Remove removed the final session ID +// tracking the cid. (meaning true will be returned iff this call caused the +// value of 'Contains(c)' to change from true to false) func (w *ThreadSafe) Remove(c *cid.Cid, ses uint64) bool { w.lk.Lock() defer w.lk.Unlock() @@ -101,6 +113,8 @@ func (w *ThreadSafe) Remove(c *cid.Cid, ses uint64) bool { return false } +// Contains returns true if the given cid is in the wantlist tracked by one or +// more sessions func (w *ThreadSafe) Contains(k *cid.Cid) (*Entry, bool) { w.lk.RLock() defer w.lk.RUnlock() diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index cb5627b10..800fa1c40 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -25,6 +25,7 @@ type WantManager struct { // synchronized by Run loop, only touch inside there peers map[peer.ID]*msgQueue wl *wantlist.ThreadSafe + bcwl *wantlist.ThreadSafe network bsnet.BitSwapNetwork ctx context.Context @@ -47,6 +48,7 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana peerReqs: make(chan chan []peer.ID), peers: make(map[peer.ID]*msgQueue), wl: wantlist.NewThreadSafe(), + bcwl: wantlist.NewThreadSafe(), network: network, ctx: ctx, cancel: cancel, @@ -61,7 +63,7 @@ type msgQueue struct { outlk sync.Mutex out bsmsg.BitSwapMessage network bsnet.BitSwapNetwork - wl *wantlist.Wantlist + wl *wantlist.ThreadSafe sender bsnet.MessageSender @@ -71,11 +73,13 @@ type msgQueue struct { done chan struct{} } +// WantBlocks adds the given cids to the wantlist, tracked by the given session func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) { log.Infof("want blocks: %s", ks) pm.addEntries(ctx, ks, peers, false, ses) } +// CancelWants removes the given cids from the wantlist, tracked by the given session func (pm *WantManager) CancelWants(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) { pm.addEntries(context.Background(), ks, peers, true, ses) } @@ -134,9 +138,10 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue { // new peer, we will want to give them our full wantlist fullwantlist := bsmsg.New(true) - for _, e := range pm.wl.Entries() { - ne := *e - mq.wl.AddEntry(&ne) + for _, e := range pm.bcwl.Entries() { + for k := range e.SesTrk { + mq.wl.AddEntry(e, k) + } fullwantlist.AddEntry(e.Cid, e.Priority) } mq.out = fullwantlist @@ -284,13 +289,23 @@ func (pm *WantManager) Run() { select { case ws := <-pm.incoming: + // is this a broadcast or not? + brdc := len(ws.targets) == 0 + // add changes to our wantlist for _, e := range ws.entries { if e.Cancel { + if brdc { + pm.bcwl.Remove(e.Cid, ws.from) + } + if pm.wl.Remove(e.Cid, ws.from) { pm.wantlistGauge.Dec() } } else { + if brdc { + pm.bcwl.AddEntry(e.Entry, ws.from) + } if pm.wl.AddEntry(e.Entry, ws.from) { pm.wantlistGauge.Inc() } @@ -300,7 +315,7 @@ func (pm *WantManager) Run() { // broadcast those wantlist changes if len(ws.targets) == 0 { for _, p := range pm.peers { - p.addMessage(ws.entries) + p.addMessage(ws.entries, ws.from) } } else { for _, t := range ws.targets { @@ -309,24 +324,10 @@ func (pm *WantManager) Run() { log.Warning("tried sending wantlist change to non-partner peer") continue } - p.addMessage(ws.entries) + p.addMessage(ws.entries, ws.from) } } - case <-tock.C: - // resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY) - var es []*bsmsg.Entry - for _, e := range pm.wl.Entries() { - es = append(es, &bsmsg.Entry{Entry: e}) - } - - for _, p := range pm.peers { - p.outlk.Lock() - p.out = bsmsg.New(true) - p.outlk.Unlock() - - p.addMessage(es) - } case p := <-pm.connect: pm.startPeerHandler(p) case p := <-pm.disconnect: @@ -347,14 +348,14 @@ func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue { return &msgQueue{ done: make(chan struct{}), work: make(chan struct{}, 1), - wl: wantlist.New(), + wl: wantlist.NewThreadSafe(), network: wm.network, p: p, refcnt: 1, } } -func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) { +func (mq *msgQueue) addMessage(entries []*bsmsg.Entry, ses uint64) { var work bool mq.outlk.Lock() defer func() { @@ -378,12 +379,12 @@ func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) { // one passed in for _, e := range entries { if e.Cancel { - if mq.wl.Remove(e.Cid) { + if mq.wl.Remove(e.Cid, ses) { work = true mq.out.Cancel(e.Cid) } } else { - if mq.wl.Add(e.Cid, e.Priority) { + if mq.wl.Add(e.Cid, e.Priority, ses) { work = true mq.out.AddEntry(e.Cid, e.Priority) } diff --git a/exchange/interface.go b/exchange/interface.go index 5b9135342..ac494ff99 100644 --- a/exchange/interface.go +++ b/exchange/interface.go @@ -24,6 +24,7 @@ type Interface interface { // type Exchanger interface io.Closer } +// Fetcher is an object that can be used to retrieve blocks type Fetcher interface { // GetBlock returns the block associated with a given key. GetBlock(context.Context, *cid.Cid) (blocks.Block, error) diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index f6ee7e562..9fa8446d8 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -155,6 +155,9 @@ func GetLinksDirect(serv node.NodeGetter) GetLinks { return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) { node, err := serv.Get(ctx, c) if err != nil { + if err == bserv.ErrNotFound { + err = ErrNotFound + } return nil, err } return node.Links(), nil From 1ffb44cd46c89b33cf9f9beb1d4e34f6fb132ff9 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 5 Jul 2017 12:31:34 -0700 Subject: [PATCH 07/11] make NewSession in the blockservice be a function, not a method License: MIT Signed-off-by: Jeromy --- blockservice/blockservice.go | 13 ++++++------- merkledag/merkledag.go | 2 +- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index c6ecbc386..d746f2fe0 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -32,7 +32,6 @@ type BlockService interface { GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block DeleteBlock(o blocks.Block) error - NewSession(context.Context) *Session Close() error } @@ -79,18 +78,18 @@ func (bs *blockService) Exchange() exchange.Interface { return bs.exchange } -func (bs *blockService) NewSession(ctx context.Context) *Session { - bswap, ok := bs.Exchange().(*bitswap.Bitswap) - if ok { +func NewSession(ctx context.Context, bs BlockService) *Session { + exchange := bs.Exchange() + if bswap, ok := exchange.(*bitswap.Bitswap); ok { ses := bswap.NewSession(ctx) return &Session{ ses: ses, - bs: bs.blockstore, + bs: bs.Blockstore(), } } return &Session{ - ses: bs.exchange, - bs: bs.blockstore, + ses: exchange, + bs: bs.Blockstore(), } } diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index 9fa8446d8..587c481cb 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -182,7 +182,7 @@ func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error { var ng node.NodeGetter = serv ds, ok := serv.(*dagService) if ok { - ng = &sesGetter{ds.Blocks.NewSession(ctx)} + ng = &sesGetter{bserv.NewSession(ctx, ds.Blocks)} } v, _ := ctx.Value("progress").(*ProgressTracker) From eab2024e4374e9671dd6b7993c38174874d86f76 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 6 Jul 2017 12:06:57 -0700 Subject: [PATCH 08/11] address CR License: MIT Signed-off-by: Jeromy --- exchange/bitswap/session.go | 3 ++- exchange/bitswap/session_test.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/exchange/bitswap/session.go b/exchange/bitswap/session.go index 128b377d4..614aa4076 100644 --- a/exchange/bitswap/session.go +++ b/exchange/bitswap/session.go @@ -5,11 +5,11 @@ import ( "time" notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications" - blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru" loggables "gx/ipfs/QmVesPmqbPp7xRGyY96tnBwzDtVV1nqv4SCVxo5zCqKyH8/go-libp2p-loggables" + blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format" cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid" peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer" ) @@ -191,6 +191,7 @@ func (s *Session) run(ctx context.Context) { _, ok := s.liveWants[lwchk.c.KeyString()] lwchk.resp <- ok case <-ctx.Done(): + s.tick.Stop() return } } diff --git a/exchange/bitswap/session_test.go b/exchange/bitswap/session_test.go index e2b959fed..99a0abd39 100644 --- a/exchange/bitswap/session_test.go +++ b/exchange/bitswap/session_test.go @@ -7,8 +7,8 @@ import ( "time" blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil" - blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format" + blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format" cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid" ) From 124afdbaaad5f6e42e3fed70afeeb604bc473ae0 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 6 Jul 2017 12:17:25 -0700 Subject: [PATCH 09/11] extract bitswap metrics to separate struct for 64bit alignment License: MIT Signed-off-by: Jeromy --- exchange/bitswap/bitswap.go | 32 +++++++++++++++++++------------- exchange/bitswap/bitswap_test.go | 2 +- exchange/bitswap/session_test.go | 4 ++-- exchange/bitswap/stat.go | 19 ++++++++++--------- exchange/bitswap/workers.go | 4 ++-- 5 files changed, 34 insertions(+), 27 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index e0da2477a..2ebcd4ae7 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -99,6 +99,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, newBlocks: make(chan *cid.Cid, HasBlockBufferSize), provideKeys: make(chan *cid.Cid, provideKeysBufferSize), wm: NewWantManager(ctx, network), + counters: new(counters), dupMetric: dupHist, allMetric: allHist, @@ -152,14 +153,8 @@ type Bitswap struct { process process.Process // Counters for various statistics - counterLk sync.Mutex - blocksRecvd int - dupBlocksRecvd int - dupDataRecvd uint64 - blocksSent int - dataSent uint64 - dataRecvd uint64 - messagesRecvd uint64 + counterLk sync.Mutex + counters *counters // Metrics interface metrics dupMetric metrics.Histogram @@ -173,6 +168,16 @@ type Bitswap struct { sessIDLk sync.Mutex } +type counters struct { + blocksRecvd uint64 + dupBlocksRecvd uint64 + dupDataRecvd uint64 + blocksSent uint64 + dataSent uint64 + dataRecvd uint64 + messagesRecvd uint64 +} + type blockRequest struct { Cid *cid.Cid Ctx context.Context @@ -338,7 +343,7 @@ func (bs *Bitswap) SessionsForBlock(c *cid.Cid) []*Session { } func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) { - atomic.AddUint64(&bs.messagesRecvd, 1) + atomic.AddUint64(&bs.counters.messagesRecvd, 1) // This call records changes to wantlists, blocks received, // and number of bytes transfered. @@ -403,12 +408,13 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) { bs.counterLk.Lock() defer bs.counterLk.Unlock() + c := bs.counters - bs.blocksRecvd++ - bs.dataRecvd += uint64(len(b.RawData())) + c.blocksRecvd++ + c.dataRecvd += uint64(len(b.RawData())) if has { - bs.dupBlocksRecvd++ - bs.dupDataRecvd += uint64(blkLen) + c.dupBlocksRecvd++ + c.dupDataRecvd += uint64(blkLen) } } diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 7842ae559..506b8d0c1 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -291,7 +291,7 @@ func TestEmptyKey(t *testing.T) { } } -func assertStat(st *Stat, sblks, rblks int, sdata, rdata uint64) error { +func assertStat(st *Stat, sblks, rblks, sdata, rdata uint64) error { if sblks != st.BlocksSent { return fmt.Errorf("mismatch in blocks sent: %d vs %d", sblks, st.BlocksSent) } diff --git a/exchange/bitswap/session_test.go b/exchange/bitswap/session_test.go index 99a0abd39..0574bd0c3 100644 --- a/exchange/bitswap/session_test.go +++ b/exchange/bitswap/session_test.go @@ -103,8 +103,8 @@ func TestSessionBetweenPeers(t *testing.T) { } } for _, is := range inst[2:] { - if is.Exchange.messagesRecvd > 2 { - t.Fatal("uninvolved nodes should only receive two messages", is.Exchange.messagesRecvd) + if is.Exchange.counters.messagesRecvd > 2 { + t.Fatal("uninvolved nodes should only receive two messages", is.Exchange.counters.messagesRecvd) } } } diff --git a/exchange/bitswap/stat.go b/exchange/bitswap/stat.go index 2f95d9e8b..fb5eb5011 100644 --- a/exchange/bitswap/stat.go +++ b/exchange/bitswap/stat.go @@ -10,11 +10,11 @@ type Stat struct { ProvideBufLen int Wantlist []*cid.Cid Peers []string - BlocksReceived int + BlocksReceived uint64 DataReceived uint64 - BlocksSent int + BlocksSent uint64 DataSent uint64 - DupBlksReceived int + DupBlksReceived uint64 DupDataReceived uint64 } @@ -23,12 +23,13 @@ func (bs *Bitswap) Stat() (*Stat, error) { st.ProvideBufLen = len(bs.newBlocks) st.Wantlist = bs.GetWantlist() bs.counterLk.Lock() - st.BlocksReceived = bs.blocksRecvd - st.DupBlksReceived = bs.dupBlocksRecvd - st.DupDataReceived = bs.dupDataRecvd - st.BlocksSent = bs.blocksSent - st.DataSent = bs.dataSent - st.DataReceived = bs.dataRecvd + c := bs.counters + st.BlocksReceived = c.blocksRecvd + st.DupBlksReceived = c.dupBlocksRecvd + st.DupDataReceived = c.dupDataRecvd + st.BlocksSent = c.blocksSent + st.DataSent = c.dataSent + st.DataReceived = c.dataRecvd bs.counterLk.Unlock() for _, p := range bs.engine.Peers() { diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index ac1e41eb8..a899f06bb 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -73,8 +73,8 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { bs.wm.SendBlock(ctx, envelope) bs.counterLk.Lock() - bs.blocksSent++ - bs.dataSent += uint64(len(envelope.Block.RawData())) + bs.counters.blocksSent++ + bs.counters.dataSent += uint64(len(envelope.Block.RawData())) bs.counterLk.Unlock() case <-ctx.Done(): return From 3be5c913eee0a9ee00eac7055fd48b506017161c Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 7 Jul 2017 11:40:41 -0700 Subject: [PATCH 10/11] fix issue with sessions not receiving locally added blocks License: MIT Signed-off-by: Jeromy --- exchange/bitswap/bitswap.go | 10 +++- exchange/bitswap/session.go | 95 ++++++++++++++++++++++++-------- exchange/bitswap/session_test.go | 40 ++++++++++++++ 3 files changed, 120 insertions(+), 25 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 2ebcd4ae7..d9f4fea9a 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -317,6 +317,10 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error { // it now as it requires more thought and isnt causing immediate problems. bs.notifications.Publish(blk) + for _, s := range bs.SessionsForBlock(blk.Cid()) { + s.receiveBlockFrom("", blk) + } + bs.engine.AddBlock(blk) select { @@ -370,7 +374,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg wg := sync.WaitGroup{} for _, block := range iblocks { wg.Add(1) - go func(b blocks.Block) { + go func(b blocks.Block) { // TODO: this probably doesnt need to be a goroutine... defer wg.Done() bs.updateReceiveCounters(b) @@ -382,7 +386,11 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg ses.receiveBlockFrom(p, b) bs.CancelWants([]*cid.Cid{k}, ses.id) } + log.Debugf("got block %s from %s", b, p) + // TODO: rework this to not call 'HasBlock'. 'HasBlock' is really + // designed to be called when blocks are coming in from non-bitswap + // places (like the user manually adding data) if err := bs.HasBlock(b); err != nil { log.Warningf("ReceiveMessage HasBlock error: %s", err) } diff --git a/exchange/bitswap/session.go b/exchange/bitswap/session.go index 614aa4076..53db1a28a 100644 --- a/exchange/bitswap/session.go +++ b/exchange/bitswap/session.go @@ -21,7 +21,7 @@ const activeWantsLimit = 16 // info to, and who to request blocks from type Session struct { ctx context.Context - tofetch []*cid.Cid + tofetch *cidQueue activePeers map[peer.ID]struct{} activePeersArr []peer.ID @@ -55,6 +55,7 @@ func (bs *Bitswap) NewSession(ctx context.Context) *Session { liveWants: make(map[string]time.Time), newReqs: make(chan []*cid.Cid), cancelKeys: make(chan []*cid.Cid), + tofetch: newCidQueue(), interestReqs: make(chan interestReq), ctx: ctx, bs: bs, @@ -157,7 +158,9 @@ func (s *Session) run(ctx context.Context) { s.wantBlocks(ctx, now) } - s.tofetch = append(s.tofetch, keys...) + for _, k := range keys { + s.tofetch.Push(k) + } case keys := <-s.cancelKeys: s.cancel(keys) @@ -188,8 +191,7 @@ func (s *Session) run(ctx context.Context) { case p := <-newpeers: s.addActivePeer(p) case lwchk := <-s.interestReqs: - _, ok := s.liveWants[lwchk.c.KeyString()] - lwchk.resp <- ok + lwchk.resp <- s.cidIsWanted(lwchk.c) case <-ctx.Done(): s.tick.Stop() return @@ -197,19 +199,31 @@ func (s *Session) run(ctx context.Context) { } } +func (s *Session) cidIsWanted(c *cid.Cid) bool { + _, ok := s.liveWants[c.KeyString()] + if !ok { + ok = s.tofetch.Has(c) + } + + return ok +} + func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) { - ks := blk.Cid().KeyString() - if _, ok := s.liveWants[ks]; ok { - tval := s.liveWants[ks] - s.latTotal += time.Since(tval) + c := blk.Cid() + if s.cidIsWanted(c) { + ks := c.KeyString() + tval, ok := s.liveWants[ks] + if ok { + s.latTotal += time.Since(tval) + delete(s.liveWants, ks) + } else { + s.tofetch.Remove(c) + } s.fetchcnt++ - delete(s.liveWants, ks) s.notif.Publish(blk) - if len(s.tofetch) > 0 { - next := s.tofetch[0:1] - s.tofetch = s.tofetch[1:] - s.wantBlocks(ctx, next) + if next := s.tofetch.Pop(); next != nil { + s.wantBlocks(ctx, []*cid.Cid{next}) } } } @@ -222,19 +236,9 @@ func (s *Session) wantBlocks(ctx context.Context, ks []*cid.Cid) { } func (s *Session) cancel(keys []*cid.Cid) { - sset := cid.NewSet() for _, c := range keys { - sset.Add(c) + s.tofetch.Remove(c) } - var i, j int - for ; j < len(s.tofetch); j++ { - if sset.Has(s.tofetch[j]) { - continue - } - s.tofetch[i] = s.tofetch[j] - i++ - } - s.tofetch = s.tofetch[:i] } func (s *Session) cancelWants(keys []*cid.Cid) { @@ -260,3 +264,46 @@ func (s *Session) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks func (s *Session) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) { return getBlock(parent, k, s.GetBlocks) } + +type cidQueue struct { + elems []*cid.Cid + eset *cid.Set +} + +func newCidQueue() *cidQueue { + return &cidQueue{eset: cid.NewSet()} +} + +func (cq *cidQueue) Pop() *cid.Cid { + for { + if len(cq.elems) == 0 { + return nil + } + + out := cq.elems[0] + cq.elems = cq.elems[1:] + + if cq.eset.Has(out) { + cq.eset.Remove(out) + return out + } + } +} + +func (cq *cidQueue) Push(c *cid.Cid) { + if cq.eset.Visit(c) { + cq.elems = append(cq.elems, c) + } +} + +func (cq *cidQueue) Remove(c *cid.Cid) { + cq.eset.Remove(c) +} + +func (cq *cidQueue) Has(c *cid.Cid) bool { + return cq.eset.Has(c) +} + +func (cq *cidQueue) Len() int { + return cq.eset.Len() +} diff --git a/exchange/bitswap/session_test.go b/exchange/bitswap/session_test.go index 0574bd0c3..dfdae79cb 100644 --- a/exchange/bitswap/session_test.go +++ b/exchange/bitswap/session_test.go @@ -202,3 +202,43 @@ func TestInterestCacheOverflow(t *testing.T) { t.Fatal("timed out waiting for block") } } + +func TestPutAfterSessionCacheEvict(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vnet := getVirtualNetwork() + sesgen := NewTestSessionGenerator(vnet) + defer sesgen.Close() + bgen := blocksutil.NewBlockGenerator() + + blks := bgen.Blocks(2500) + inst := sesgen.Instances(1) + + a := inst[0] + + ses := a.Exchange.NewSession(ctx) + + var allcids []*cid.Cid + for _, blk := range blks[1:] { + allcids = append(allcids, blk.Cid()) + } + + blkch, err := ses.GetBlocks(ctx, allcids) + if err != nil { + t.Fatal(err) + } + + // wait to ensure that all the above cids were added to the sessions cache + time.Sleep(time.Millisecond * 50) + + if err := a.Exchange.HasBlock(blks[17]); err != nil { + t.Fatal(err) + } + + select { + case <-blkch: + case <-time.After(time.Millisecond * 50): + t.Fatal("timed out waiting for block") + } +} From dd7589bdafde48b1cb10b8f813e72eb43a217cb6 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Fri, 7 Jul 2017 20:54:07 +0200 Subject: [PATCH 11/11] bitswap: add few method comments License: MIT Signed-off-by: Jakub Sztandera --- blockservice/blockservice.go | 2 ++ exchange/bitswap/wantlist/wantlist.go | 1 + 2 files changed, 3 insertions(+) diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index d746f2fe0..1f2603637 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -78,6 +78,8 @@ func (bs *blockService) Exchange() exchange.Interface { return bs.exchange } +// NewSession creates a bitswap session that allows for controlled exchange of +// wantlists to decrease the bandwidth overhead. func NewSession(ctx context.Context, bs BlockService) *Session { exchange := bs.Exchange() if bswap, ok := exchange.(*bitswap.Bitswap); ok { diff --git a/exchange/bitswap/wantlist/wantlist.go b/exchange/bitswap/wantlist/wantlist.go index 5902442ca..de340ea6a 100644 --- a/exchange/bitswap/wantlist/wantlist.go +++ b/exchange/bitswap/wantlist/wantlist.go @@ -79,6 +79,7 @@ func (w *ThreadSafe) Add(c *cid.Cid, priority int, ses uint64) bool { return true } +// AddEntry adds given Entry to the wantlist. For more information see Add method. func (w *ThreadSafe) AddEntry(e *Entry, ses uint64) bool { w.lk.Lock() defer w.lk.Unlock()