Merge pull request #3867 from ipfs/feat/bitswap-sessions

Bitswap sessions
This commit is contained in:
Jeromy Johnson 2017-07-07 12:38:09 -07:00 committed by GitHub
commit 0784a852ac
18 changed files with 1120 additions and 166 deletions

View File

@ -25,8 +25,8 @@ func (bg *BlockGenerator) Next() *blocks.BasicBlock {
}
// Blocks generates as many BasicBlocks as specified by n.
func (bg *BlockGenerator) Blocks(n int) []*blocks.BasicBlock {
blocks := make([]*blocks.BasicBlock, 0)
func (bg *BlockGenerator) Blocks(n int) []blocks.Block {
blocks := make([]blocks.Block, 0, n)
for i := 0; i < n; i++ {
b := bg.Next()
blocks = append(blocks, b)

View File

@ -10,9 +10,10 @@ import (
"github.com/ipfs/go-ipfs/blocks/blockstore"
exchange "github.com/ipfs/go-ipfs/exchange"
blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"
bitswap "github.com/ipfs/go-ipfs/exchange/bitswap"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"
cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
)
@ -77,6 +78,23 @@ func (bs *blockService) Exchange() exchange.Interface {
return bs.exchange
}
// NewSession creates a bitswap session that allows for controlled exchange of
// wantlists to decrease the bandwidth overhead.
func NewSession(ctx context.Context, bs BlockService) *Session {
exchange := bs.Exchange()
if bswap, ok := exchange.(*bitswap.Bitswap); ok {
ses := bswap.NewSession(ctx)
return &Session{
ses: ses,
bs: bs.Blockstore(),
}
}
return &Session{
ses: exchange,
bs: bs.Blockstore(),
}
}
// AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here.
func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) {
@ -141,16 +159,25 @@ func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
log.Debugf("BlockService GetBlock: '%s'", c)
block, err := s.blockstore.Get(c)
var f exchange.Fetcher
if s.exchange != nil {
f = s.exchange
}
return getBlock(ctx, c, s.blockstore, f)
}
func getBlock(ctx context.Context, c *cid.Cid, bs blockstore.Blockstore, f exchange.Fetcher) (blocks.Block, error) {
block, err := bs.Get(c)
if err == nil {
return block, nil
}
if err == blockstore.ErrNotFound && s.exchange != nil {
if err == blockstore.ErrNotFound && f != nil {
// TODO be careful checking ErrNotFound. If the underlying
// implementation changes, this will break.
log.Debug("Blockservice: Searching bitswap")
blk, err := s.exchange.GetBlock(ctx, c)
blk, err := f.GetBlock(ctx, c)
if err != nil {
if err == blockstore.ErrNotFound {
return nil, ErrNotFound
@ -172,12 +199,16 @@ func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block,
// the returned channel.
// NB: No guarantees are made about order.
func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
return getBlocks(ctx, ks, s.blockstore, s.exchange)
}
func getBlocks(ctx context.Context, ks []*cid.Cid, bs blockstore.Blockstore, f exchange.Fetcher) <-chan blocks.Block {
out := make(chan blocks.Block)
go func() {
defer close(out)
var misses []*cid.Cid
for _, c := range ks {
hit, err := s.blockstore.Get(c)
hit, err := bs.Get(c)
if err != nil {
misses = append(misses, c)
continue
@ -194,7 +225,7 @@ func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc
return
}
rblocks, err := s.exchange.GetBlocks(ctx, misses)
rblocks, err := f.GetBlocks(ctx, misses)
if err != nil {
log.Debugf("Error with GetBlocks: %s", err)
return
@ -220,3 +251,19 @@ func (s *blockService) Close() error {
log.Debug("blockservice is shutting down...")
return s.exchange.Close()
}
// Session is a helper type to provide higher level access to bitswap sessions
type Session struct {
bs blockstore.Blockstore
ses exchange.Fetcher
}
// GetBlock gets a block in the context of a request session
func (s *Session) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
return getBlock(ctx, c, s.bs, s.ses)
}
// GetBlocks gets blocks in the context of a request session
func (s *Session) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
return getBlocks(ctx, ks, s.bs, s.ses)
}

View File

@ -64,7 +64,11 @@ var unwantCmd = &cmds.Command{
ks = append(ks, c)
}
bs.CancelWants(ks)
// TODO: This should maybe find *all* sessions for this request and cancel them?
// (why): in reality, i think this command should be removed. Its
// messing with the internal state of bitswap. You should cancel wants
// by killing the command that caused the want.
bs.CancelWants(ks, 0)
},
}
@ -107,6 +111,11 @@ Print out all blocks currently on the bitswap wantlist for the local peer.`,
res.SetError(err, cmds.ErrNormal)
return
}
if pid == nd.Identity {
res.SetOutput(&KeyList{bs.GetWantlist()})
return
}
res.SetOutput(&KeyList{bs.WantlistForPeer(pid)})
} else {
res.SetOutput(&KeyList{bs.GetWantlist()})

View File

@ -27,7 +27,7 @@ import (
node "gx/ipfs/QmPAKbSsgEX5B6fpmxa61jXYnoWzZr5sNafd3qgPiSH8Uv/go-ipld-format"
humanize "gx/ipfs/QmPSBJL4momYnE7DcUyk2DVhD6rH488ZmHBGLbxNdhU44K/go-humanize"
cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
multibase "gx/ipfs/QmcxkxTVuURV2Ptse8TvkqH5BQDwV62X1x19JqqvbBzwUM/go-multibase"
multibase "gx/ipfs/Qme4T6BE4sQxg7ZouamF5M7Tx1ZFTqzcns7BkyQPXpoT99/go-multibase"
)
const (

View File

@ -7,6 +7,7 @@ import (
"errors"
"math"
"sync"
"sync/atomic"
"time"
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
@ -17,13 +18,12 @@ import (
notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
flags "github.com/ipfs/go-ipfs/flags"
"github.com/ipfs/go-ipfs/thirdparty/delay"
blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"
metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
loggables "gx/ipfs/QmVesPmqbPp7xRGyY96tnBwzDtVV1nqv4SCVxo5zCqKyH8/go-libp2p-loggables"
blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"
cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
)
@ -99,6 +99,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
newBlocks: make(chan *cid.Cid, HasBlockBufferSize),
provideKeys: make(chan *cid.Cid, provideKeysBufferSize),
wm: NewWantManager(ctx, network),
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
@ -152,17 +153,29 @@ type Bitswap struct {
process process.Process
// Counters for various statistics
counterLk sync.Mutex
blocksRecvd int
dupBlocksRecvd int
dupDataRecvd uint64
blocksSent int
dataSent uint64
dataRecvd uint64
counterLk sync.Mutex
counters *counters
// Metrics interface metrics
dupMetric metrics.Histogram
allMetric metrics.Histogram
// Sessions
sessions []*Session
sessLk sync.Mutex
sessID uint64
sessIDLk sync.Mutex
}
type counters struct {
blocksRecvd uint64
dupBlocksRecvd uint64
dupDataRecvd uint64
blocksSent uint64
dataSent uint64
dataRecvd uint64
messagesRecvd uint64
}
type blockRequest struct {
@ -173,45 +186,7 @@ type blockRequest struct {
// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) {
if k == nil {
log.Error("nil cid in GetBlock")
return nil, blockstore.ErrNotFound
}
// Any async work initiated by this function must end when this function
// returns. To ensure this, derive a new context. Note that it is okay to
// listen on parent in this scope, but NOT okay to pass |parent| to
// functions called by this one. Otherwise those functions won't return
// when this context's cancel func is executed. This is difficult to
// enforce. May this comment keep you safe.
ctx, cancelFunc := context.WithCancel(parent)
// TODO: this request ID should come in from a higher layer so we can track
// across multiple 'GetBlock' invocations
ctx = logging.ContextWithLoggable(ctx, loggables.Uuid("GetBlockRequest"))
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
defer log.Event(ctx, "Bitswap.GetBlockRequest.End", k)
defer cancelFunc()
promise, err := bs.GetBlocks(ctx, []*cid.Cid{k})
if err != nil {
return nil, err
}
select {
case block, ok := <-promise:
if !ok {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
return nil, errors.New("promise channel was closed")
}
}
return block, nil
case <-parent.Done():
return nil, parent.Err()
}
return getBlock(parent, k, bs.GetBlocks)
}
func (bs *Bitswap) WantlistForPeer(p peer.ID) []*cid.Cid {
@ -251,7 +226,9 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
}
bs.wm.WantBlocks(ctx, keys)
mses := bs.getNextSessionID()
bs.wm.WantBlocks(ctx, keys, nil, mses)
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
@ -273,7 +250,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
defer close(out)
defer func() {
// can't just defer this call on its own, arguments are resolved *when* the defer is created
bs.CancelWants(remaining.Keys())
bs.CancelWants(remaining.Keys(), mses)
}()
for {
select {
@ -282,6 +259,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
return
}
bs.CancelWants([]*cid.Cid{blk.Cid()}, mses)
remaining.Remove(blk.Cid())
select {
case out <- blk:
@ -302,9 +280,19 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
}
}
func (bs *Bitswap) getNextSessionID() uint64 {
bs.sessIDLk.Lock()
defer bs.sessIDLk.Unlock()
bs.sessID++
return bs.sessID
}
// CancelWant removes a given key from the wantlist
func (bs *Bitswap) CancelWants(cids []*cid.Cid) {
bs.wm.CancelWants(cids)
func (bs *Bitswap) CancelWants(cids []*cid.Cid, ses uint64) {
if len(cids) == 0 {
return
}
bs.wm.CancelWants(context.Background(), cids, nil, ses)
}
// HasBlock announces the existance of a block to this bitswap service. The
@ -329,6 +317,10 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error {
// it now as it requires more thought and isnt causing immediate problems.
bs.notifications.Publish(blk)
for _, s := range bs.SessionsForBlock(blk.Cid()) {
s.receiveBlockFrom("", blk)
}
bs.engine.AddBlock(blk)
select {
@ -340,7 +332,23 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error {
return nil
}
// SessionsForBlock returns a slice of all sessions that may be interested in the given cid
func (bs *Bitswap) SessionsForBlock(c *cid.Cid) []*Session {
bs.sessLk.Lock()
defer bs.sessLk.Unlock()
var out []*Session
for _, s := range bs.sessions {
if s.interestedIn(c) {
out = append(out, s)
}
}
return out
}
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
atomic.AddUint64(&bs.counters.messagesRecvd, 1)
// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
bs.engine.MessageReceived(p, incoming)
@ -362,12 +370,11 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
}
keys = append(keys, block.Cid())
}
bs.wm.CancelWants(keys)
wg := sync.WaitGroup{}
for _, block := range iblocks {
wg.Add(1)
go func(b blocks.Block) {
go func(b blocks.Block) { // TODO: this probably doesnt need to be a goroutine...
defer wg.Done()
bs.updateReceiveCounters(b)
@ -375,7 +382,15 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
k := b.Cid()
log.Event(ctx, "Bitswap.GetBlockRequest.End", k)
for _, ses := range bs.SessionsForBlock(k) {
ses.receiveBlockFrom(p, b)
bs.CancelWants([]*cid.Cid{k}, ses.id)
}
log.Debugf("got block %s from %s", b, p)
// TODO: rework this to not call 'HasBlock'. 'HasBlock' is really
// designed to be called when blocks are coming in from non-bitswap
// places (like the user manually adding data)
if err := bs.HasBlock(b); err != nil {
log.Warningf("ReceiveMessage HasBlock error: %s", err)
}
@ -401,12 +416,13 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {
bs.counterLk.Lock()
defer bs.counterLk.Unlock()
c := bs.counters
bs.blocksRecvd++
bs.dataRecvd += uint64(len(b.RawData()))
c.blocksRecvd++
c.dataRecvd += uint64(len(b.RawData()))
if has {
bs.dupBlocksRecvd++
bs.dupDataRecvd += uint64(blkLen)
c.dupBlocksRecvd++
c.dupDataRecvd += uint64(blkLen)
}
}

View File

@ -291,7 +291,7 @@ func TestEmptyKey(t *testing.T) {
}
}
func assertStat(st *Stat, sblks, rblks int, sdata, rdata uint64) error {
func assertStat(st *Stat, sblks, rblks, sdata, rdata uint64) error {
if sblks != st.BlocksSent {
return fmt.Errorf("mismatch in blocks sent: %d vs %d", sblks, st.BlocksSent)
}
@ -318,7 +318,7 @@ func TestBasicBitswap(t *testing.T) {
t.Log("Test a one node trying to get one block from another")
instances := sg.Instances(2)
instances := sg.Instances(3)
blocks := bg.Blocks(1)
err := instances[0].Exchange.HasBlock(blocks[0])
if err != nil {
@ -332,6 +332,15 @@ func TestBasicBitswap(t *testing.T) {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 25)
wl := instances[2].Exchange.WantlistForPeer(instances[1].Peer)
if len(wl) != 0 {
t.Fatal("should have no items in other peers wantlist")
}
if len(instances[1].Exchange.GetWantlist()) != 0 {
t.Fatal("shouldnt have anything in wantlist")
}
st0, err := instances[0].Exchange.Stat()
if err != nil {
t.Fatal(err)
@ -370,6 +379,9 @@ func TestDoubleGet(t *testing.T) {
instances := sg.Instances(2)
blocks := bg.Blocks(1)
// NOTE: A race condition can happen here where these GetBlocks requests go
// through before the peers even get connected. This is okay, bitswap
// *should* be able to handle this.
ctx1, cancel1 := context.WithCancel(context.Background())
blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []*cid.Cid{blocks[0].Cid()})
if err != nil {
@ -385,7 +397,7 @@ func TestDoubleGet(t *testing.T) {
}
// ensure both requests make it into the wantlist at the same time
time.Sleep(time.Millisecond * 100)
time.Sleep(time.Millisecond * 20)
cancel1()
_, ok := <-blkch1
@ -405,6 +417,14 @@ func TestDoubleGet(t *testing.T) {
}
t.Log(blk)
case <-time.After(time.Second * 5):
p1wl := instances[0].Exchange.WantlistForPeer(instances[1].Peer)
if len(p1wl) != 1 {
t.Logf("wantlist view didnt have 1 item (had %d)", len(p1wl))
} else if !p1wl[0].Equals(blocks[0].Cid()) {
t.Logf("had 1 item, it was wrong: %s %s", blocks[0].Cid(), p1wl[0])
} else {
t.Log("had correct wantlist, somehow")
}
t.Fatal("timed out waiting on block")
}

View File

@ -2,10 +2,10 @@
package decision
import (
"context"
"sync"
"time"
context "context"
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
@ -105,13 +105,10 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
}
func (e *Engine) WantlistForPeer(p peer.ID) (out []*wl.Entry) {
e.lock.Lock()
partner, ok := e.ledgerMap[p]
if ok {
out = partner.wantList.SortedEntries()
}
e.lock.Unlock()
return out
partner := e.findOrCreate(p)
partner.lk.Lock()
defer partner.lk.Unlock()
return partner.wantList.SortedEntries()
}
func (e *Engine) LedgerForPeer(p peer.ID) *Receipt {

100
exchange/bitswap/get.go Normal file
View File

@ -0,0 +1,100 @@
package bitswap
import (
"context"
"errors"
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"
cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
)
type getBlocksFunc func(context.Context, []*cid.Cid) (<-chan blocks.Block, error)
func getBlock(p context.Context, k *cid.Cid, gb getBlocksFunc) (blocks.Block, error) {
if k == nil {
log.Error("nil cid in GetBlock")
return nil, blockstore.ErrNotFound
}
// Any async work initiated by this function must end when this function
// returns. To ensure this, derive a new context. Note that it is okay to
// listen on parent in this scope, but NOT okay to pass |parent| to
// functions called by this one. Otherwise those functions won't return
// when this context's cancel func is executed. This is difficult to
// enforce. May this comment keep you safe.
ctx, cancel := context.WithCancel(p)
defer cancel()
promise, err := gb(ctx, []*cid.Cid{k})
if err != nil {
return nil, err
}
select {
case block, ok := <-promise:
if !ok {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
return nil, errors.New("promise channel was closed")
}
}
return block, nil
case <-p.Done():
return nil, p.Err()
}
}
type wantFunc func(context.Context, []*cid.Cid)
func getBlocksImpl(ctx context.Context, keys []*cid.Cid, notif notifications.PubSub, want wantFunc, cwants func([]*cid.Cid)) (<-chan blocks.Block, error) {
if len(keys) == 0 {
out := make(chan blocks.Block)
close(out)
return out, nil
}
remaining := cid.NewSet()
promise := notif.Subscribe(ctx, keys...)
for _, k := range keys {
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
remaining.Add(k)
}
want(ctx, keys)
out := make(chan blocks.Block)
go handleIncoming(ctx, remaining, promise, out, cwants)
return out, nil
}
func handleIncoming(ctx context.Context, remaining *cid.Set, in <-chan blocks.Block, out chan blocks.Block, cfun func([]*cid.Cid)) {
ctx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
close(out)
// can't just defer this call on its own, arguments are resolved *when* the defer is created
cfun(remaining.Keys())
}()
for {
select {
case blk, ok := <-in:
if !ok {
return
}
remaining.Remove(blk.Cid())
select {
case out <- blk:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}

309
exchange/bitswap/session.go Normal file
View File

@ -0,0 +1,309 @@
package bitswap
import (
"context"
"time"
notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
loggables "gx/ipfs/QmVesPmqbPp7xRGyY96tnBwzDtVV1nqv4SCVxo5zCqKyH8/go-libp2p-loggables"
blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"
cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
)
const activeWantsLimit = 16
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
// info to, and who to request blocks from
type Session struct {
ctx context.Context
tofetch *cidQueue
activePeers map[peer.ID]struct{}
activePeersArr []peer.ID
bs *Bitswap
incoming chan blkRecv
newReqs chan []*cid.Cid
cancelKeys chan []*cid.Cid
interestReqs chan interestReq
interest *lru.Cache
liveWants map[string]time.Time
tick *time.Timer
baseTickDelay time.Duration
latTotal time.Duration
fetchcnt int
notif notifications.PubSub
uuid logging.Loggable
id uint64
}
// NewSession creates a new bitswap session whose lifetime is bounded by the
// given context
func (bs *Bitswap) NewSession(ctx context.Context) *Session {
s := &Session{
activePeers: make(map[peer.ID]struct{}),
liveWants: make(map[string]time.Time),
newReqs: make(chan []*cid.Cid),
cancelKeys: make(chan []*cid.Cid),
tofetch: newCidQueue(),
interestReqs: make(chan interestReq),
ctx: ctx,
bs: bs,
incoming: make(chan blkRecv),
notif: notifications.New(),
uuid: loggables.Uuid("GetBlockRequest"),
baseTickDelay: time.Millisecond * 500,
id: bs.getNextSessionID(),
}
cache, _ := lru.New(2048)
s.interest = cache
bs.sessLk.Lock()
bs.sessions = append(bs.sessions, s)
bs.sessLk.Unlock()
go s.run(ctx)
return s
}
type blkRecv struct {
from peer.ID
blk blocks.Block
}
func (s *Session) receiveBlockFrom(from peer.ID, blk blocks.Block) {
s.incoming <- blkRecv{from: from, blk: blk}
}
type interestReq struct {
c *cid.Cid
resp chan bool
}
// TODO: PERF: this is using a channel to guard a map access against race
// conditions. This is definitely much slower than a mutex, though its unclear
// if it will actually induce any noticeable slowness. This is implemented this
// way to avoid adding a more complex set of mutexes around the liveWants map.
// note that in the average case (where this session *is* interested in the
// block we received) this function will not be called, as the cid will likely
// still be in the interest cache.
func (s *Session) isLiveWant(c *cid.Cid) bool {
resp := make(chan bool)
s.interestReqs <- interestReq{
c: c,
resp: resp,
}
return <-resp
}
func (s *Session) interestedIn(c *cid.Cid) bool {
return s.interest.Contains(c.KeyString()) || s.isLiveWant(c)
}
const provSearchDelay = time.Second * 10
func (s *Session) addActivePeer(p peer.ID) {
if _, ok := s.activePeers[p]; !ok {
s.activePeers[p] = struct{}{}
s.activePeersArr = append(s.activePeersArr, p)
}
}
func (s *Session) resetTick() {
if s.latTotal == 0 {
s.tick.Reset(provSearchDelay)
} else {
avLat := s.latTotal / time.Duration(s.fetchcnt)
s.tick.Reset(s.baseTickDelay + (3 * avLat))
}
}
func (s *Session) run(ctx context.Context) {
s.tick = time.NewTimer(provSearchDelay)
newpeers := make(chan peer.ID, 16)
for {
select {
case blk := <-s.incoming:
s.tick.Stop()
s.addActivePeer(blk.from)
s.receiveBlock(ctx, blk.blk)
s.resetTick()
case keys := <-s.newReqs:
for _, k := range keys {
s.interest.Add(k.KeyString(), nil)
}
if len(s.liveWants) < activeWantsLimit {
toadd := activeWantsLimit - len(s.liveWants)
if toadd > len(keys) {
toadd = len(keys)
}
now := keys[:toadd]
keys = keys[toadd:]
s.wantBlocks(ctx, now)
}
for _, k := range keys {
s.tofetch.Push(k)
}
case keys := <-s.cancelKeys:
s.cancel(keys)
case <-s.tick.C:
var live []*cid.Cid
for c := range s.liveWants {
cs, _ := cid.Cast([]byte(c))
live = append(live, cs)
s.liveWants[c] = time.Now()
}
// Broadcast these keys to everyone we're connected to
s.bs.wm.WantBlocks(ctx, live, nil, s.id)
if len(live) > 0 {
go func(k *cid.Cid) {
// TODO: have a task queue setup for this to:
// - rate limit
// - manage timeouts
// - ensure two 'findprovs' calls for the same block don't run concurrently
// - share peers between sessions based on interest set
for p := range s.bs.network.FindProvidersAsync(ctx, k, 10) {
newpeers <- p
}
}(live[0])
}
s.resetTick()
case p := <-newpeers:
s.addActivePeer(p)
case lwchk := <-s.interestReqs:
lwchk.resp <- s.cidIsWanted(lwchk.c)
case <-ctx.Done():
s.tick.Stop()
return
}
}
}
func (s *Session) cidIsWanted(c *cid.Cid) bool {
_, ok := s.liveWants[c.KeyString()]
if !ok {
ok = s.tofetch.Has(c)
}
return ok
}
func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
c := blk.Cid()
if s.cidIsWanted(c) {
ks := c.KeyString()
tval, ok := s.liveWants[ks]
if ok {
s.latTotal += time.Since(tval)
delete(s.liveWants, ks)
} else {
s.tofetch.Remove(c)
}
s.fetchcnt++
s.notif.Publish(blk)
if next := s.tofetch.Pop(); next != nil {
s.wantBlocks(ctx, []*cid.Cid{next})
}
}
}
func (s *Session) wantBlocks(ctx context.Context, ks []*cid.Cid) {
for _, c := range ks {
s.liveWants[c.KeyString()] = time.Now()
}
s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id)
}
func (s *Session) cancel(keys []*cid.Cid) {
for _, c := range keys {
s.tofetch.Remove(c)
}
}
func (s *Session) cancelWants(keys []*cid.Cid) {
s.cancelKeys <- keys
}
func (s *Session) fetch(ctx context.Context, keys []*cid.Cid) {
select {
case s.newReqs <- keys:
case <-ctx.Done():
}
}
// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks.Block, error) {
ctx = logging.ContextWithLoggable(ctx, s.uuid)
return getBlocksImpl(ctx, keys, s.notif, s.fetch, s.cancelWants)
}
// GetBlock fetches a single block
func (s *Session) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) {
return getBlock(parent, k, s.GetBlocks)
}
type cidQueue struct {
elems []*cid.Cid
eset *cid.Set
}
func newCidQueue() *cidQueue {
return &cidQueue{eset: cid.NewSet()}
}
func (cq *cidQueue) Pop() *cid.Cid {
for {
if len(cq.elems) == 0 {
return nil
}
out := cq.elems[0]
cq.elems = cq.elems[1:]
if cq.eset.Has(out) {
cq.eset.Remove(out)
return out
}
}
}
func (cq *cidQueue) Push(c *cid.Cid) {
if cq.eset.Visit(c) {
cq.elems = append(cq.elems, c)
}
}
func (cq *cidQueue) Remove(c *cid.Cid) {
cq.eset.Remove(c)
}
func (cq *cidQueue) Has(c *cid.Cid) bool {
return cq.eset.Has(c)
}
func (cq *cidQueue) Len() int {
return cq.eset.Len()
}

View File

@ -0,0 +1,244 @@
package bitswap
import (
"context"
"fmt"
"testing"
"time"
blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"
cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
)
func TestBasicSessions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vnet := getVirtualNetwork()
sesgen := NewTestSessionGenerator(vnet)
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()
block := bgen.Next()
inst := sesgen.Instances(2)
a := inst[0]
b := inst[1]
if err := b.Blockstore().Put(block); err != nil {
t.Fatal(err)
}
sesa := a.Exchange.NewSession(ctx)
blkout, err := sesa.GetBlock(ctx, block.Cid())
if err != nil {
t.Fatal(err)
}
if !blkout.Cid().Equals(block.Cid()) {
t.Fatal("got wrong block")
}
}
func assertBlockLists(got, exp []blocks.Block) error {
if len(got) != len(exp) {
return fmt.Errorf("got wrong number of blocks, %d != %d", len(got), len(exp))
}
h := cid.NewSet()
for _, b := range got {
h.Add(b.Cid())
}
for _, b := range exp {
if !h.Has(b.Cid()) {
return fmt.Errorf("didnt have: %s", b.Cid())
}
}
return nil
}
func TestSessionBetweenPeers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vnet := getVirtualNetwork()
sesgen := NewTestSessionGenerator(vnet)
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()
inst := sesgen.Instances(10)
blks := bgen.Blocks(101)
if err := inst[0].Blockstore().PutMany(blks); err != nil {
t.Fatal(err)
}
var cids []*cid.Cid
for _, blk := range blks {
cids = append(cids, blk.Cid())
}
ses := inst[1].Exchange.NewSession(ctx)
if _, err := ses.GetBlock(ctx, cids[0]); err != nil {
t.Fatal(err)
}
blks = blks[1:]
cids = cids[1:]
for i := 0; i < 10; i++ {
ch, err := ses.GetBlocks(ctx, cids[i*10:(i+1)*10])
if err != nil {
t.Fatal(err)
}
var got []blocks.Block
for b := range ch {
got = append(got, b)
}
if err := assertBlockLists(got, blks[i*10:(i+1)*10]); err != nil {
t.Fatal(err)
}
}
for _, is := range inst[2:] {
if is.Exchange.counters.messagesRecvd > 2 {
t.Fatal("uninvolved nodes should only receive two messages", is.Exchange.counters.messagesRecvd)
}
}
}
func TestSessionSplitFetch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vnet := getVirtualNetwork()
sesgen := NewTestSessionGenerator(vnet)
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()
inst := sesgen.Instances(11)
blks := bgen.Blocks(100)
for i := 0; i < 10; i++ {
if err := inst[i].Blockstore().PutMany(blks[i*10 : (i+1)*10]); err != nil {
t.Fatal(err)
}
}
var cids []*cid.Cid
for _, blk := range blks {
cids = append(cids, blk.Cid())
}
ses := inst[10].Exchange.NewSession(ctx)
ses.baseTickDelay = time.Millisecond * 10
for i := 0; i < 10; i++ {
ch, err := ses.GetBlocks(ctx, cids[i*10:(i+1)*10])
if err != nil {
t.Fatal(err)
}
var got []blocks.Block
for b := range ch {
got = append(got, b)
}
if err := assertBlockLists(got, blks[i*10:(i+1)*10]); err != nil {
t.Fatal(err)
}
}
}
func TestInterestCacheOverflow(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vnet := getVirtualNetwork()
sesgen := NewTestSessionGenerator(vnet)
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()
blks := bgen.Blocks(2049)
inst := sesgen.Instances(2)
a := inst[0]
b := inst[1]
ses := a.Exchange.NewSession(ctx)
zeroch, err := ses.GetBlocks(ctx, []*cid.Cid{blks[0].Cid()})
if err != nil {
t.Fatal(err)
}
var restcids []*cid.Cid
for _, blk := range blks[1:] {
restcids = append(restcids, blk.Cid())
}
restch, err := ses.GetBlocks(ctx, restcids)
if err != nil {
t.Fatal(err)
}
// wait to ensure that all the above cids were added to the sessions cache
time.Sleep(time.Millisecond * 50)
if err := b.Exchange.HasBlock(blks[0]); err != nil {
t.Fatal(err)
}
select {
case blk, ok := <-zeroch:
if ok && blk.Cid().Equals(blks[0].Cid()) {
// success!
} else {
t.Fatal("failed to get the block")
}
case <-restch:
t.Fatal("should not get anything on restch")
case <-time.After(time.Second * 5):
t.Fatal("timed out waiting for block")
}
}
func TestPutAfterSessionCacheEvict(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vnet := getVirtualNetwork()
sesgen := NewTestSessionGenerator(vnet)
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()
blks := bgen.Blocks(2500)
inst := sesgen.Instances(1)
a := inst[0]
ses := a.Exchange.NewSession(ctx)
var allcids []*cid.Cid
for _, blk := range blks[1:] {
allcids = append(allcids, blk.Cid())
}
blkch, err := ses.GetBlocks(ctx, allcids)
if err != nil {
t.Fatal(err)
}
// wait to ensure that all the above cids were added to the sessions cache
time.Sleep(time.Millisecond * 50)
if err := a.Exchange.HasBlock(blks[17]); err != nil {
t.Fatal(err)
}
select {
case <-blkch:
case <-time.After(time.Millisecond * 50):
t.Fatal("timed out waiting for block")
}
}

View File

@ -10,11 +10,11 @@ type Stat struct {
ProvideBufLen int
Wantlist []*cid.Cid
Peers []string
BlocksReceived int
BlocksReceived uint64
DataReceived uint64
BlocksSent int
BlocksSent uint64
DataSent uint64
DupBlksReceived int
DupBlksReceived uint64
DupDataReceived uint64
}
@ -23,12 +23,13 @@ func (bs *Bitswap) Stat() (*Stat, error) {
st.ProvideBufLen = len(bs.newBlocks)
st.Wantlist = bs.GetWantlist()
bs.counterLk.Lock()
st.BlocksReceived = bs.blocksRecvd
st.DupBlksReceived = bs.dupBlocksRecvd
st.DupDataReceived = bs.dupDataRecvd
st.BlocksSent = bs.blocksSent
st.DataSent = bs.dataSent
st.DataReceived = bs.dataRecvd
c := bs.counters
st.BlocksReceived = c.blocksRecvd
st.DupBlksReceived = c.dupBlocksRecvd
st.DupDataReceived = c.dupDataRecvd
st.BlocksSent = c.blocksSent
st.DataSent = c.dataSent
st.DataReceived = c.dataRecvd
bs.counterLk.Unlock()
for _, p := range bs.engine.Peers() {

View File

@ -47,7 +47,7 @@ func (g *SessionGenerator) Next() Instance {
if err != nil {
panic("FIXME") // TODO change signature
}
return Session(g.ctx, g.net, p)
return MkSession(g.ctx, g.net, p)
}
func (g *SessionGenerator) Instances(n int) []Instance {
@ -86,7 +86,7 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
// NB: It's easy make mistakes by providing the same peer ID to two different
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
// just a much better idea.
func Session(ctx context.Context, net tn.Network, p testutil.Identity) Instance {
func MkSession(ctx context.Context, net tn.Network, p testutil.Identity) Instance {
bsdelay := delay.Fixed(0)
adapter := net.Adapter(p)

View File

@ -10,8 +10,8 @@ import (
)
type ThreadSafe struct {
lk sync.RWMutex
Wantlist Wantlist
lk sync.RWMutex
set map[string]*Entry
}
// not threadsafe
@ -23,7 +23,16 @@ type Entry struct {
Cid *cid.Cid
Priority int
RefCnt int
SesTrk map[uint64]struct{}
}
// NewRefEntry creates a new reference tracked wantlist entry
func NewRefEntry(c *cid.Cid, p int) *Entry {
return &Entry{
Cid: c,
Priority: p,
SesTrk: make(map[uint64]struct{}),
}
}
type entrySlice []*Entry
@ -34,7 +43,7 @@ func (es entrySlice) Less(i, j int) bool { return es[i].Priority > es[j].Priorit
func NewThreadSafe() *ThreadSafe {
return &ThreadSafe{
Wantlist: *New(),
set: make(map[string]*Entry),
}
}
@ -44,46 +53,101 @@ func New() *Wantlist {
}
}
func (w *ThreadSafe) Add(k *cid.Cid, priority int) bool {
// Add adds the given cid to the wantlist with the specified priority, governed
// by the session ID 'ses'. if a cid is added under multiple session IDs, then
// it must be removed by each of those sessions before it is no longer 'in the
// wantlist'. Calls to Add are idempotent given the same arguments. Subsequent
// calls with different values for priority will not update the priority
// TODO: think through priority changes here
// Add returns true if the cid did not exist in the wantlist before this call
// (even if it was under a different session)
func (w *ThreadSafe) Add(c *cid.Cid, priority int, ses uint64) bool {
w.lk.Lock()
defer w.lk.Unlock()
return w.Wantlist.Add(k, priority)
k := c.KeyString()
if e, ok := w.set[k]; ok {
e.SesTrk[ses] = struct{}{}
return false
}
w.set[k] = &Entry{
Cid: c,
Priority: priority,
SesTrk: map[uint64]struct{}{ses: struct{}{}},
}
return true
}
func (w *ThreadSafe) AddEntry(e *Entry) bool {
// AddEntry adds given Entry to the wantlist. For more information see Add method.
func (w *ThreadSafe) AddEntry(e *Entry, ses uint64) bool {
w.lk.Lock()
defer w.lk.Unlock()
return w.Wantlist.AddEntry(e)
k := e.Cid.KeyString()
if ex, ok := w.set[k]; ok {
ex.SesTrk[ses] = struct{}{}
return false
}
w.set[k] = e
e.SesTrk[ses] = struct{}{}
return true
}
func (w *ThreadSafe) Remove(k *cid.Cid) bool {
// Remove removes the given cid from being tracked by the given session.
// 'true' is returned if this call to Remove removed the final session ID
// tracking the cid. (meaning true will be returned iff this call caused the
// value of 'Contains(c)' to change from true to false)
func (w *ThreadSafe) Remove(c *cid.Cid, ses uint64) bool {
w.lk.Lock()
defer w.lk.Unlock()
return w.Wantlist.Remove(k)
k := c.KeyString()
e, ok := w.set[k]
if !ok {
return false
}
delete(e.SesTrk, ses)
if len(e.SesTrk) == 0 {
delete(w.set, k)
return true
}
return false
}
// Contains returns true if the given cid is in the wantlist tracked by one or
// more sessions
func (w *ThreadSafe) Contains(k *cid.Cid) (*Entry, bool) {
w.lk.RLock()
defer w.lk.RUnlock()
return w.Wantlist.Contains(k)
e, ok := w.set[k.KeyString()]
return e, ok
}
func (w *ThreadSafe) Entries() []*Entry {
w.lk.RLock()
defer w.lk.RUnlock()
return w.Wantlist.Entries()
var es entrySlice
for _, e := range w.set {
es = append(es, e)
}
return es
}
func (w *ThreadSafe) SortedEntries() []*Entry {
w.lk.RLock()
defer w.lk.RUnlock()
return w.Wantlist.SortedEntries()
var es entrySlice
for _, e := range w.set {
es = append(es, e)
}
sort.Sort(es)
return es
}
func (w *ThreadSafe) Len() int {
w.lk.RLock()
defer w.lk.RUnlock()
return w.Wantlist.Len()
return len(w.set)
}
func (w *Wantlist) Len() int {
@ -92,15 +156,13 @@ func (w *Wantlist) Len() int {
func (w *Wantlist) Add(c *cid.Cid, priority int) bool {
k := c.KeyString()
if e, ok := w.set[k]; ok {
e.RefCnt++
if _, ok := w.set[k]; ok {
return false
}
w.set[k] = &Entry{
Cid: c,
Priority: priority,
RefCnt: 1,
}
return true
@ -108,8 +170,7 @@ func (w *Wantlist) Add(c *cid.Cid, priority int) bool {
func (w *Wantlist) AddEntry(e *Entry) bool {
k := e.Cid.KeyString()
if ex, ok := w.set[k]; ok {
ex.RefCnt++
if _, ok := w.set[k]; ok {
return false
}
w.set[k] = e
@ -118,17 +179,13 @@ func (w *Wantlist) AddEntry(e *Entry) bool {
func (w *Wantlist) Remove(c *cid.Cid) bool {
k := c.KeyString()
e, ok := w.set[k]
_, ok := w.set[k]
if !ok {
return false
}
e.RefCnt--
if e.RefCnt <= 0 {
delete(w.set, k)
return true
}
return false
delete(w.set, k)
return true
}
func (w *Wantlist) Contains(k *cid.Cid) (*Entry, bool) {

View File

@ -0,0 +1,104 @@
package wantlist
import (
"testing"
cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
)
var testcids []*cid.Cid
func init() {
strs := []string{
"QmQL8LqkEgYXaDHdNYCG2mmpow7Sp8Z8Kt3QS688vyBeC7",
"QmcBDsdjgSXU7BP4A4V8LJCXENE5xVwnhrhRGVTJr9YCVj",
"QmQakgd2wDxc3uUF4orGdEm28zUT9Mmimp5pyPG2SFS9Gj",
}
for _, s := range strs {
c, err := cid.Decode(s)
if err != nil {
panic(err)
}
testcids = append(testcids, c)
}
}
type wli interface {
Contains(*cid.Cid) (*Entry, bool)
}
func assertHasCid(t *testing.T, w wli, c *cid.Cid) {
e, ok := w.Contains(c)
if !ok {
t.Fatal("expected to have ", c)
}
if !e.Cid.Equals(c) {
t.Fatal("returned entry had wrong cid value")
}
}
func assertNotHasCid(t *testing.T, w wli, c *cid.Cid) {
_, ok := w.Contains(c)
if ok {
t.Fatal("expected not to have ", c)
}
}
func TestBasicWantlist(t *testing.T) {
wl := New()
if !wl.Add(testcids[0], 5) {
t.Fatal("expected true")
}
assertHasCid(t, wl, testcids[0])
if !wl.Add(testcids[1], 4) {
t.Fatal("expected true")
}
assertHasCid(t, wl, testcids[0])
assertHasCid(t, wl, testcids[1])
if wl.Len() != 2 {
t.Fatal("should have had two items")
}
if wl.Add(testcids[1], 4) {
t.Fatal("add shouldnt report success on second add")
}
assertHasCid(t, wl, testcids[0])
assertHasCid(t, wl, testcids[1])
if wl.Len() != 2 {
t.Fatal("should have had two items")
}
if !wl.Remove(testcids[0]) {
t.Fatal("should have gotten true")
}
assertHasCid(t, wl, testcids[1])
if _, has := wl.Contains(testcids[0]); has {
t.Fatal("shouldnt have this cid")
}
}
func TestSesRefWantlist(t *testing.T) {
wl := NewThreadSafe()
if !wl.Add(testcids[0], 5, 1) {
t.Fatal("should have added")
}
assertHasCid(t, wl, testcids[0])
if wl.Remove(testcids[0], 2) {
t.Fatal("shouldnt have removed")
}
assertHasCid(t, wl, testcids[0])
if wl.Add(testcids[0], 5, 1) {
t.Fatal("shouldnt have added")
}
assertHasCid(t, wl, testcids[0])
if !wl.Remove(testcids[0], 1) {
t.Fatal("should have removed")
}
assertNotHasCid(t, wl, testcids[0])
}

View File

@ -17,7 +17,7 @@ import (
type WantManager struct {
// sync channels for Run loop
incoming chan []*bsmsg.Entry
incoming chan *wantSet
connect chan peer.ID // notification channel for new peers connecting
disconnect chan peer.ID // notification channel for peers disconnecting
peerReqs chan chan []peer.ID // channel to request connected peers on
@ -25,6 +25,7 @@ type WantManager struct {
// synchronized by Run loop, only touch inside there
peers map[peer.ID]*msgQueue
wl *wantlist.ThreadSafe
bcwl *wantlist.ThreadSafe
network bsnet.BitSwapNetwork
ctx context.Context
@ -41,12 +42,13 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana
sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
" this bitswap").Histogram(metricsBuckets)
return &WantManager{
incoming: make(chan []*bsmsg.Entry, 10),
incoming: make(chan *wantSet, 10),
connect: make(chan peer.ID, 10),
disconnect: make(chan peer.ID, 10),
peerReqs: make(chan chan []peer.ID),
peers: make(map[peer.ID]*msgQueue),
wl: wantlist.NewThreadSafe(),
bcwl: wantlist.NewThreadSafe(),
network: network,
ctx: ctx,
cancel: cancel,
@ -61,6 +63,7 @@ type msgQueue struct {
outlk sync.Mutex
out bsmsg.BitSwapMessage
network bsnet.BitSwapNetwork
wl *wantlist.ThreadSafe
sender bsnet.MessageSender
@ -70,30 +73,33 @@ type msgQueue struct {
done chan struct{}
}
func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid) {
// WantBlocks adds the given cids to the wantlist, tracked by the given session
func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) {
log.Infof("want blocks: %s", ks)
pm.addEntries(ctx, ks, false)
pm.addEntries(ctx, ks, peers, false, ses)
}
func (pm *WantManager) CancelWants(ks []*cid.Cid) {
log.Infof("cancel wants: %s", ks)
pm.addEntries(context.TODO(), ks, true)
// CancelWants removes the given cids from the wantlist, tracked by the given session
func (pm *WantManager) CancelWants(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) {
pm.addEntries(context.Background(), ks, peers, true, ses)
}
func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, cancel bool) {
type wantSet struct {
entries []*bsmsg.Entry
targets []peer.ID
from uint64
}
func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, targets []peer.ID, cancel bool, ses uint64) {
var entries []*bsmsg.Entry
for i, k := range ks {
entries = append(entries, &bsmsg.Entry{
Cancel: cancel,
Entry: &wantlist.Entry{
Cid: k,
Priority: kMaxPriority - i,
RefCnt: 1,
},
Entry: wantlist.NewRefEntry(k, kMaxPriority-i),
})
}
select {
case pm.incoming <- entries:
case pm.incoming <- &wantSet{entries: entries, targets: targets, from: ses}:
case <-pm.ctx.Done():
case <-ctx.Done():
}
@ -132,7 +138,10 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
// new peer, we will want to give them our full wantlist
fullwantlist := bsmsg.New(true)
for _, e := range pm.wl.Entries() {
for _, e := range pm.bcwl.Entries() {
for k := range e.SesTrk {
mq.wl.AddEntry(e, k)
}
fullwantlist.AddEntry(e.Cid, e.Priority)
}
mq.out = fullwantlist
@ -278,43 +287,47 @@ func (pm *WantManager) Run() {
defer tock.Stop()
for {
select {
case entries := <-pm.incoming:
case ws := <-pm.incoming:
// is this a broadcast or not?
brdc := len(ws.targets) == 0
// add changes to our wantlist
var filtered []*bsmsg.Entry
for _, e := range entries {
for _, e := range ws.entries {
if e.Cancel {
if pm.wl.Remove(e.Cid) {
if brdc {
pm.bcwl.Remove(e.Cid, ws.from)
}
if pm.wl.Remove(e.Cid, ws.from) {
pm.wantlistGauge.Dec()
filtered = append(filtered, e)
}
} else {
if pm.wl.AddEntry(e.Entry) {
if brdc {
pm.bcwl.AddEntry(e.Entry, ws.from)
}
if pm.wl.AddEntry(e.Entry, ws.from) {
pm.wantlistGauge.Inc()
filtered = append(filtered, e)
}
}
}
// broadcast those wantlist changes
for _, p := range pm.peers {
p.addMessage(filtered)
if len(ws.targets) == 0 {
for _, p := range pm.peers {
p.addMessage(ws.entries, ws.from)
}
} else {
for _, t := range ws.targets {
p, ok := pm.peers[t]
if !ok {
log.Warning("tried sending wantlist change to non-partner peer")
continue
}
p.addMessage(ws.entries, ws.from)
}
}
case <-tock.C:
// resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY)
var es []*bsmsg.Entry
for _, e := range pm.wl.Entries() {
es = append(es, &bsmsg.Entry{Entry: e})
}
for _, p := range pm.peers {
p.outlk.Lock()
p.out = bsmsg.New(true)
p.outlk.Unlock()
p.addMessage(es)
}
case p := <-pm.connect:
pm.startPeerHandler(p)
case p := <-pm.disconnect:
@ -335,16 +348,21 @@ func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue {
return &msgQueue{
done: make(chan struct{}),
work: make(chan struct{}, 1),
wl: wantlist.NewThreadSafe(),
network: wm.network,
p: p,
refcnt: 1,
}
}
func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
func (mq *msgQueue) addMessage(entries []*bsmsg.Entry, ses uint64) {
var work bool
mq.outlk.Lock()
defer func() {
mq.outlk.Unlock()
if !work {
return
}
select {
case mq.work <- struct{}{}:
default:
@ -361,9 +379,15 @@ func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
// one passed in
for _, e := range entries {
if e.Cancel {
mq.out.Cancel(e.Cid)
if mq.wl.Remove(e.Cid, ses) {
work = true
mq.out.Cancel(e.Cid)
}
} else {
mq.out.AddEntry(e.Cid, e.Priority)
if mq.wl.Add(e.Cid, e.Priority, ses) {
work = true
mq.out.AddEntry(e.Cid, e.Priority)
}
}
}
}

View File

@ -49,7 +49,7 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
idmap := logging.LoggableMap{"ID": id}
defer log.Info("bitswap task worker shutting down...")
defer log.Debug("bitswap task worker shutting down...")
for {
log.Event(ctx, "Bitswap.TaskWorker.Loop", idmap)
select {
@ -73,8 +73,8 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
bs.wm.SendBlock(ctx, envelope)
bs.counterLk.Lock()
bs.blocksSent++
bs.dataSent += uint64(len(envelope.Block.RawData()))
bs.counters.blocksSent++
bs.counters.dataSent += uint64(len(envelope.Block.RawData()))
bs.counterLk.Unlock()
case <-ctx.Done():
return

View File

@ -13,10 +13,7 @@ import (
// Any type that implements exchange.Interface may be used as an IPFS block
// exchange protocol.
type Interface interface { // type Exchanger interface
// GetBlock returns the block associated with a given key.
GetBlock(context.Context, *cid.Cid) (blocks.Block, error)
GetBlocks(context.Context, []*cid.Cid) (<-chan blocks.Block, error)
Fetcher
// TODO Should callers be concerned with whether the block was made
// available on the network?
@ -26,3 +23,10 @@ type Interface interface { // type Exchanger interface
io.Closer
}
// Fetcher is an object that can be used to retrieve blocks
type Fetcher interface {
// GetBlock returns the block associated with a given key.
GetBlock(context.Context, *cid.Cid) (blocks.Block, error)
GetBlocks(context.Context, []*cid.Cid) (<-chan blocks.Block, error)
}

View File

@ -155,17 +155,39 @@ func GetLinksDirect(serv node.NodeGetter) GetLinks {
return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
node, err := serv.Get(ctx, c)
if err != nil {
if err == bserv.ErrNotFound {
err = ErrNotFound
}
return nil, err
}
return node.Links(), nil
}
}
type sesGetter struct {
bs *bserv.Session
}
func (sg *sesGetter) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
blk, err := sg.bs.GetBlock(ctx, c)
if err != nil {
return nil, err
}
return decodeBlock(blk)
}
// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
var ng node.NodeGetter = serv
ds, ok := serv.(*dagService)
if ok {
ng = &sesGetter{bserv.NewSession(ctx, ds.Blocks)}
}
v, _ := ctx.Value("progress").(*ProgressTracker)
if v == nil {
return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, cid.NewSet().Visit)
return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, cid.NewSet().Visit)
}
set := cid.NewSet()
visit := func(c *cid.Cid) bool {
@ -176,7 +198,7 @@ func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
return false
}
}
return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, visit)
return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, visit)
}
// FindLinks searches this nodes links for the given key,