From e43d1317bb7cf4e6ad0ad68a2c8f0b1e8422f680 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 27 Apr 2017 17:38:46 -0700 Subject: [PATCH] rework how refcounted wantlists work License: MIT Signed-off-by: Jeromy --- core/commands/bitswap.go | 6 +- exchange/bitswap/bitswap.go | 28 +++++-- exchange/bitswap/bitswap_test.go | 5 ++ exchange/bitswap/session.go | 22 ++++-- exchange/bitswap/wantlist/wantlist.go | 92 ++++++++++++++++------ exchange/bitswap/wantlist/wantlist_test.go | 87 ++++++++++++++++++++ exchange/bitswap/wantmanager.go | 23 +++--- 7 files changed, 211 insertions(+), 52 deletions(-) create mode 100644 exchange/bitswap/wantlist/wantlist_test.go diff --git a/core/commands/bitswap.go b/core/commands/bitswap.go index e7d94e912..542402b89 100644 --- a/core/commands/bitswap.go +++ b/core/commands/bitswap.go @@ -64,7 +64,11 @@ var unwantCmd = &cmds.Command{ ks = append(ks, c) } - bs.CancelWants(ks) + // TODO: This should maybe find *all* sessions for this request and cancel them? + // (why): in reality, i think this command should be removed. Its + // messing with the internal state of bitswap. You should cancel wants + // by killing the command that caused the want. + bs.CancelWants(ks, 0) }, } diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 74c70b108..065c209a9 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -169,6 +169,9 @@ type Bitswap struct { // Sessions sessions []*Session sessLk sync.Mutex + + sessID uint64 + sessIDLk sync.Mutex } type blockRequest struct { @@ -219,7 +222,9 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block log.Event(ctx, "Bitswap.GetBlockRequest.Start", k) } - bs.wm.WantBlocks(ctx, keys, nil) + mses := bs.getNextSessionID() + + bs.wm.WantBlocks(ctx, keys, nil, mses) // NB: Optimization. Assumes that providers of key[0] are likely to // be able to provide for all keys. This currently holds true in most @@ -241,7 +246,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block defer close(out) defer func() { // can't just defer this call on its own, arguments are resolved *when* the defer is created - bs.CancelWants(remaining.Keys()) + bs.CancelWants(remaining.Keys(), mses) }() for { select { @@ -250,6 +255,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block return } + bs.CancelWants([]*cid.Cid{blk.Cid()}, mses) remaining.Remove(blk.Cid()) select { case out <- blk: @@ -270,9 +276,16 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block } } +func (bs *Bitswap) getNextSessionID() uint64 { + bs.sessIDLk.Lock() + defer bs.sessIDLk.Unlock() + bs.sessID++ + return bs.sessID +} + // CancelWant removes a given key from the wantlist -func (bs *Bitswap) CancelWants(cids []*cid.Cid) { - bs.wm.CancelWants(context.Background(), cids, nil) +func (bs *Bitswap) CancelWants(cids []*cid.Cid, ses uint64) { + bs.wm.CancelWants(context.Background(), cids, nil, ses) } // HasBlock announces the existance of a block to this bitswap service. The @@ -314,7 +327,7 @@ func (bs *Bitswap) SessionsForBlock(c *cid.Cid) []*Session { var out []*Session for _, s := range bs.sessions { - if s.InterestedIn(c) { + if s.interestedIn(c) { out = append(out, s) } } @@ -346,8 +359,6 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg keys = append(keys, block.Cid()) } - bs.wm.CancelWants(context.Background(), keys, nil) - wg := sync.WaitGroup{} for _, block := range iblocks { wg.Add(1) @@ -360,7 +371,8 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg log.Event(ctx, "Bitswap.GetBlockRequest.End", k) for _, ses := range bs.SessionsForBlock(k) { - ses.ReceiveBlock(p, b) + ses.receiveBlockFrom(p, b) + bs.CancelWants([]*cid.Cid{k}, ses.id) } log.Debugf("got block %s from %s", b, p) if err := bs.HasBlock(b); err != nil { diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 76a28d5dc..e73022f62 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -332,6 +332,11 @@ func TestBasicBitswap(t *testing.T) { t.Fatal(err) } + time.Sleep(time.Millisecond * 20) + if len(instances[1].Exchange.GetWantlist()) != 0 { + t.Fatal("shouldnt have anything in wantlist") + } + st0, err := instances[0].Exchange.Stat() if err != nil { t.Fatal(err) diff --git a/exchange/bitswap/session.go b/exchange/bitswap/session.go index 84ab680dd..0a5c7426a 100644 --- a/exchange/bitswap/session.go +++ b/exchange/bitswap/session.go @@ -16,6 +16,9 @@ import ( const activeWantsLimit = 16 +// Session holds state for an individual bitswap transfer operation. +// This allows bitswap to make smarter decisions about who to send wantlist +// info to, and who to request blocks from type Session struct { ctx context.Context tofetch []*cid.Cid @@ -40,8 +43,12 @@ type Session struct { notif notifications.PubSub uuid logging.Loggable + + id uint64 } +// NewSession creates a new bitswap session whose lifetime is bounded by the +// given context func (bs *Bitswap) NewSession(ctx context.Context) *Session { s := &Session{ activePeers: make(map[peer.ID]struct{}), @@ -54,6 +61,7 @@ func (bs *Bitswap) NewSession(ctx context.Context) *Session { notif: notifications.New(), uuid: loggables.Uuid("GetBlockRequest"), baseTickDelay: time.Millisecond * 500, + id: bs.getNextSessionID(), } cache, _ := lru.New(2048) @@ -73,11 +81,11 @@ type blkRecv struct { blk blocks.Block } -func (s *Session) ReceiveBlock(from peer.ID, blk blocks.Block) { +func (s *Session) receiveBlockFrom(from peer.ID, blk blocks.Block) { s.incoming <- blkRecv{from: from, blk: blk} } -func (s *Session) InterestedIn(c *cid.Cid) bool { +func (s *Session) interestedIn(c *cid.Cid) bool { return s.interest.Contains(c.KeyString()) } @@ -134,14 +142,14 @@ func (s *Session) run(ctx context.Context) { case <-s.tick.C: var live []*cid.Cid - for c, _ := range s.liveWants { + for c := range s.liveWants { cs, _ := cid.Cast([]byte(c)) live = append(live, cs) s.liveWants[c] = time.Now() } // Broadcast these keys to everyone we're connected to - s.bs.wm.WantBlocks(ctx, live, nil) + s.bs.wm.WantBlocks(ctx, live, nil, s.id) if len(live) > 0 { go func() { @@ -181,7 +189,7 @@ func (s *Session) wantBlocks(ctx context.Context, ks []*cid.Cid) { for _, c := range ks { s.liveWants[c.KeyString()] = time.Now() } - s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr) + s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id) } func (s *Session) cancel(keys []*cid.Cid) { @@ -211,11 +219,15 @@ func (s *Session) fetch(ctx context.Context, keys []*cid.Cid) { } } +// GetBlocks fetches a set of blocks within the context of this session and +// returns a channel that found blocks will be returned on. No order is +// guaranteed on the returned blocks. func (s *Session) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks.Block, error) { ctx = logging.ContextWithLoggable(ctx, s.uuid) return getBlocksImpl(ctx, keys, s.notif, s.fetch, s.cancelWants) } +// GetBlock fetches a single block func (s *Session) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) { return getBlock(parent, k, s.GetBlocks) } diff --git a/exchange/bitswap/wantlist/wantlist.go b/exchange/bitswap/wantlist/wantlist.go index 7c77998b3..06b5b80dc 100644 --- a/exchange/bitswap/wantlist/wantlist.go +++ b/exchange/bitswap/wantlist/wantlist.go @@ -10,8 +10,8 @@ import ( ) type ThreadSafe struct { - lk sync.RWMutex - Wantlist Wantlist + lk sync.RWMutex + set map[string]*Entry } // not threadsafe @@ -23,7 +23,16 @@ type Entry struct { Cid *cid.Cid Priority int - RefCnt int + SesTrk map[uint64]struct{} +} + +// NewRefEntry creates a new reference tracked wantlist entry +func NewRefEntry(c *cid.Cid, p int) *Entry { + return &Entry{ + Cid: c, + Priority: p, + SesTrk: make(map[uint64]struct{}), + } } type entrySlice []*Entry @@ -34,7 +43,7 @@ func (es entrySlice) Less(i, j int) bool { return es[i].Priority > es[j].Priorit func NewThreadSafe() *ThreadSafe { return &ThreadSafe{ - Wantlist: *New(), + set: make(map[string]*Entry), } } @@ -44,46 +53,86 @@ func New() *Wantlist { } } -func (w *ThreadSafe) Add(k *cid.Cid, priority int) bool { +func (w *ThreadSafe) Add(c *cid.Cid, priority int, ses uint64) bool { w.lk.Lock() defer w.lk.Unlock() - return w.Wantlist.Add(k, priority) + k := c.KeyString() + if e, ok := w.set[k]; ok { + e.SesTrk[ses] = struct{}{} + return false + } + + w.set[k] = &Entry{ + Cid: c, + Priority: priority, + SesTrk: map[uint64]struct{}{ses: struct{}{}}, + } + + return true } -func (w *ThreadSafe) AddEntry(e *Entry) bool { +func (w *ThreadSafe) AddEntry(e *Entry, ses uint64) bool { w.lk.Lock() defer w.lk.Unlock() - return w.Wantlist.AddEntry(e) + k := e.Cid.KeyString() + if ex, ok := w.set[k]; ok { + ex.SesTrk[ses] = struct{}{} + return false + } + w.set[k] = e + e.SesTrk[ses] = struct{}{} + return true } -func (w *ThreadSafe) Remove(k *cid.Cid) bool { +func (w *ThreadSafe) Remove(c *cid.Cid, ses uint64) bool { w.lk.Lock() defer w.lk.Unlock() - return w.Wantlist.Remove(k) + k := c.KeyString() + e, ok := w.set[k] + if !ok { + return false + } + + delete(e.SesTrk, ses) + if len(e.SesTrk) == 0 { + delete(w.set, k) + return true + } + return false } func (w *ThreadSafe) Contains(k *cid.Cid) (*Entry, bool) { w.lk.RLock() defer w.lk.RUnlock() - return w.Wantlist.Contains(k) + e, ok := w.set[k.KeyString()] + return e, ok } func (w *ThreadSafe) Entries() []*Entry { w.lk.RLock() defer w.lk.RUnlock() - return w.Wantlist.Entries() + var es entrySlice + for _, e := range w.set { + es = append(es, e) + } + return es } func (w *ThreadSafe) SortedEntries() []*Entry { w.lk.RLock() defer w.lk.RUnlock() - return w.Wantlist.SortedEntries() + var es entrySlice + for _, e := range w.set { + es = append(es, e) + } + sort.Sort(es) + return es } func (w *ThreadSafe) Len() int { w.lk.RLock() defer w.lk.RUnlock() - return w.Wantlist.Len() + return len(w.set) } func (w *Wantlist) Len() int { @@ -92,15 +141,13 @@ func (w *Wantlist) Len() int { func (w *Wantlist) Add(c *cid.Cid, priority int) bool { k := c.KeyString() - if e, ok := w.set[k]; ok { - e.RefCnt++ + if _, ok := w.set[k]; ok { return false } w.set[k] = &Entry{ Cid: c, Priority: priority, - RefCnt: 1, } return true @@ -108,8 +155,7 @@ func (w *Wantlist) Add(c *cid.Cid, priority int) bool { func (w *Wantlist) AddEntry(e *Entry) bool { k := e.Cid.KeyString() - if ex, ok := w.set[k]; ok { - ex.RefCnt++ + if _, ok := w.set[k]; ok { return false } w.set[k] = e @@ -118,16 +164,12 @@ func (w *Wantlist) AddEntry(e *Entry) bool { func (w *Wantlist) Remove(c *cid.Cid) bool { k := c.KeyString() - e, ok := w.set[k] + _, ok := w.set[k] if !ok { return false } - e.RefCnt-- - if e.RefCnt <= 0 { - delete(w.set, k) - return true - } + delete(w.set, k) return false } diff --git a/exchange/bitswap/wantlist/wantlist_test.go b/exchange/bitswap/wantlist/wantlist_test.go new file mode 100644 index 000000000..a88825dcd --- /dev/null +++ b/exchange/bitswap/wantlist/wantlist_test.go @@ -0,0 +1,87 @@ +package wantlist + +import ( + "testing" + + cid "gx/ipfs/QmYhQaCYEcaPPjxJX7YcPcVKkQfRy6sJ7B3XmGFk82XYdQ/go-cid" +) + +var testcids []*cid.Cid + +func init() { + strs := []string{ + "QmQL8LqkEgYXaDHdNYCG2mmpow7Sp8Z8Kt3QS688vyBeC7", + "QmcBDsdjgSXU7BP4A4V8LJCXENE5xVwnhrhRGVTJr9YCVj", + "QmQakgd2wDxc3uUF4orGdEm28zUT9Mmimp5pyPG2SFS9Gj", + } + for _, s := range strs { + c, err := cid.Decode(s) + if err != nil { + panic(err) + } + testcids = append(testcids, c) + } + +} + +type wli interface { + Contains(*cid.Cid) (*Entry, bool) +} + +func assertHasCid(t *testing.T, w wli, c *cid.Cid) { + e, ok := w.Contains(c) + if !ok { + t.Fatal("expected to have ", c) + } + if !e.Cid.Equals(c) { + t.Fatal("returned entry had wrong cid value") + } +} + +func assertNotHasCid(t *testing.T, w wli, c *cid.Cid) { + _, ok := w.Contains(c) + if ok { + t.Fatal("expected not to have ", c) + } +} + +func TestBasicWantlist(t *testing.T) { + wl := New() + + wl.Add(testcids[0], 5) + assertHasCid(t, wl, testcids[0]) + wl.Add(testcids[1], 4) + assertHasCid(t, wl, testcids[0]) + assertHasCid(t, wl, testcids[1]) + + if wl.Len() != 2 { + t.Fatal("should have had two items") + } + + wl.Add(testcids[1], 4) + assertHasCid(t, wl, testcids[0]) + assertHasCid(t, wl, testcids[1]) + + if wl.Len() != 2 { + t.Fatal("should have had two items") + } + + wl.Remove(testcids[0]) + assertHasCid(t, wl, testcids[1]) + if _, has := wl.Contains(testcids[0]); has { + t.Fatal("shouldnt have this cid") + } +} + +func TestSesRefWantlist(t *testing.T) { + wl := NewThreadSafe() + + wl.Add(testcids[0], 5, 1) + assertHasCid(t, wl, testcids[0]) + wl.Remove(testcids[0], 2) + assertHasCid(t, wl, testcids[0]) + wl.Add(testcids[0], 5, 1) + assertHasCid(t, wl, testcids[0]) + wl.Remove(testcids[0], 1) + assertNotHasCid(t, wl, testcids[0]) +} diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index c8a617724..cb5627b10 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -71,34 +71,31 @@ type msgQueue struct { done chan struct{} } -func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid, peers []peer.ID) { +func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) { log.Infof("want blocks: %s", ks) - pm.addEntries(ctx, ks, peers, false) + pm.addEntries(ctx, ks, peers, false, ses) } -func (pm *WantManager) CancelWants(ctx context.Context, ks []*cid.Cid, peers []peer.ID) { - pm.addEntries(context.Background(), ks, peers, true) +func (pm *WantManager) CancelWants(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) { + pm.addEntries(context.Background(), ks, peers, true, ses) } type wantSet struct { entries []*bsmsg.Entry targets []peer.ID + from uint64 } -func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, targets []peer.ID, cancel bool) { +func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, targets []peer.ID, cancel bool, ses uint64) { var entries []*bsmsg.Entry for i, k := range ks { entries = append(entries, &bsmsg.Entry{ Cancel: cancel, - Entry: &wantlist.Entry{ - Cid: k, - Priority: kMaxPriority - i, - RefCnt: 1, - }, + Entry: wantlist.NewRefEntry(k, kMaxPriority-i), }) } select { - case pm.incoming <- &wantSet{entries: entries, targets: targets}: + case pm.incoming <- &wantSet{entries: entries, targets: targets, from: ses}: case <-pm.ctx.Done(): case <-ctx.Done(): } @@ -290,11 +287,11 @@ func (pm *WantManager) Run() { // add changes to our wantlist for _, e := range ws.entries { if e.Cancel { - if pm.wl.Remove(e.Cid) { + if pm.wl.Remove(e.Cid, ws.from) { pm.wantlistGauge.Dec() } } else { - if pm.wl.AddEntry(e.Entry) { + if pm.wl.AddEntry(e.Entry, ws.from) { pm.wantlistGauge.Inc() } }