mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-10 18:57:57 +08:00
implement bitswap roundWorker
make vendor
This commit is contained in:
parent
12b296ee1a
commit
5b6a5e807f
2
Godeps/Godeps.json
generated
2
Godeps/Godeps.json
generated
@ -1,6 +1,6 @@
|
||||
{
|
||||
"ImportPath": "github.com/jbenet/go-ipfs",
|
||||
"GoVersion": "go1.3",
|
||||
"GoVersion": "devel +ffe33f1f1f17 Tue Nov 25 15:41:33 2014 +1100",
|
||||
"Packages": [
|
||||
"./..."
|
||||
],
|
||||
|
||||
@ -15,6 +15,7 @@ import (
|
||||
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
|
||||
notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
|
||||
strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
|
||||
wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
|
||||
@ -29,6 +30,8 @@ const maxProvidersPerRequest = 3
|
||||
const providerRequestTimeout = time.Second * 10
|
||||
const hasBlockTimeout = time.Second * 15
|
||||
|
||||
const roundTime = time.Second / 2
|
||||
|
||||
// New initializes a BitSwap instance that communicates over the
|
||||
// provided BitSwapNetwork. This function registers the returned instance as
|
||||
// the network delegate.
|
||||
@ -41,6 +44,7 @@ func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, rout
|
||||
notif := notifications.New()
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
cancelFunc()
|
||||
notif.Shutdown()
|
||||
}()
|
||||
|
||||
@ -51,11 +55,12 @@ func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, rout
|
||||
strategy: strategy.New(nice),
|
||||
routing: routing,
|
||||
sender: network,
|
||||
wantlist: u.NewKeySet(),
|
||||
wantlist: wl.NewWantlist(),
|
||||
batchRequests: make(chan []u.Key, 32),
|
||||
}
|
||||
network.SetDelegate(bs)
|
||||
go bs.loop(ctx)
|
||||
go bs.roundWorker(ctx)
|
||||
|
||||
return bs
|
||||
}
|
||||
@ -85,7 +90,7 @@ type bitswap struct {
|
||||
// TODO(brian): save the strategy's state to the datastore
|
||||
strategy strategy.Strategy
|
||||
|
||||
wantlist u.KeySet
|
||||
wantlist *wl.Wantlist
|
||||
|
||||
// cancelFunc signals cancellation to the bitswap event loop
|
||||
cancelFunc func()
|
||||
@ -166,8 +171,8 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
|
||||
panic("Cant send wantlist to nil peerchan")
|
||||
}
|
||||
message := bsmsg.New()
|
||||
for _, wanted := range bs.wantlist.Keys() {
|
||||
message.AddWanted(wanted)
|
||||
for _, wanted := range bs.wantlist.Entries() {
|
||||
message.AddEntry(wanted.Value, wanted.Priority, false)
|
||||
}
|
||||
for peerToQuery := range peers {
|
||||
log.Debug("sending query to: %s", peerToQuery)
|
||||
@ -195,9 +200,9 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
|
||||
return nil
|
||||
}
|
||||
|
||||
func (bs *bitswap) sendWantlistToProviders(ctx context.Context, ks []u.Key) {
|
||||
func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wl.Wantlist) {
|
||||
wg := sync.WaitGroup{}
|
||||
for _, k := range ks {
|
||||
for _, e := range wantlist.Entries() {
|
||||
wg.Add(1)
|
||||
go func(k u.Key) {
|
||||
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
|
||||
@ -208,11 +213,44 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, ks []u.Key) {
|
||||
log.Errorf("error sending wantlist: %s", err)
|
||||
}
|
||||
wg.Done()
|
||||
}(k)
|
||||
}(e.Value)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (bs *bitswap) roundWorker(ctx context.Context) {
|
||||
roundTicker := time.NewTicker(roundTime)
|
||||
bandwidthPerRound := 500000
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-roundTicker.C:
|
||||
alloc, err := bs.strategy.GetAllocation(bandwidthPerRound, bs.blockstore)
|
||||
if err != nil {
|
||||
log.Critical("%s", err)
|
||||
}
|
||||
//log.Errorf("Allocation: %v", alloc)
|
||||
bs.processStrategyAllocation(ctx, alloc)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (bs *bitswap) processStrategyAllocation(ctx context.Context, alloc []*strategy.Task) {
|
||||
for _, t := range alloc {
|
||||
for _, block := range t.Blocks {
|
||||
message := bsmsg.New()
|
||||
message.AddBlock(block)
|
||||
for _, wanted := range bs.wantlist.Entries() {
|
||||
message.AddEntry(wanted.Value, wanted.Priority, false)
|
||||
}
|
||||
if err := bs.send(ctx, t.Peer, message); err != nil {
|
||||
log.Errorf("Message Send Failed: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO ensure only one active request per key
|
||||
func (bs *bitswap) loop(parent context.Context) {
|
||||
|
||||
@ -228,7 +266,7 @@ func (bs *bitswap) loop(parent context.Context) {
|
||||
select {
|
||||
case <-broadcastSignal.C:
|
||||
// Resend unfulfilled wantlist keys
|
||||
bs.sendWantlistToProviders(ctx, bs.wantlist.Keys())
|
||||
bs.sendWantlistToProviders(ctx, bs.wantlist)
|
||||
case ks := <-bs.batchRequests:
|
||||
// TODO: implement batching on len(ks) > X for some X
|
||||
// i.e. if given 20 keys, fetch first five, then next
|
||||
@ -239,7 +277,7 @@ func (bs *bitswap) loop(parent context.Context) {
|
||||
continue
|
||||
}
|
||||
for _, k := range ks {
|
||||
bs.wantlist.Add(k)
|
||||
bs.wantlist.Add(k, 1)
|
||||
}
|
||||
// NB: send want list to providers for the first peer in this list.
|
||||
// the assumption is made that the providers of the first key in
|
||||
@ -277,45 +315,41 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Record message bytes in ledger
|
||||
// TODO: this is bad, and could be easily abused.
|
||||
// Should only track *useful* messages in ledger
|
||||
// This call records changes to wantlists, blocks received,
|
||||
// and number of bytes transfered.
|
||||
bs.strategy.MessageReceived(p, incoming)
|
||||
// TODO: this is bad, and could be easily abused.
|
||||
// Should only track *useful* messages in ledger
|
||||
|
||||
var blkeys []u.Key
|
||||
for _, block := range incoming.Blocks() {
|
||||
blkeys = append(blkeys, block.Key())
|
||||
if err := bs.HasBlock(ctx, block); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, key := range incoming.Wantlist() {
|
||||
if bs.strategy.ShouldSendBlockToPeer(key, p) {
|
||||
if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
|
||||
continue
|
||||
} else {
|
||||
// Create a separate message to send this block in
|
||||
blkmsg := bsmsg.New()
|
||||
|
||||
// TODO: only send this the first time
|
||||
// no sense in sending our wantlist to the
|
||||
// same peer multiple times
|
||||
for _, k := range bs.wantlist.Keys() {
|
||||
blkmsg.AddWanted(k)
|
||||
}
|
||||
|
||||
blkmsg.AddBlock(block)
|
||||
bs.send(ctx, p, blkmsg)
|
||||
bs.strategy.BlockSentToPeer(block.Key(), p)
|
||||
}
|
||||
}
|
||||
if len(blkeys) > 0 {
|
||||
bs.cancelBlocks(ctx, blkeys)
|
||||
}
|
||||
|
||||
// TODO: consider changing this function to not return anything
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
|
||||
message := bsmsg.New()
|
||||
message.SetFull(false)
|
||||
for _, k := range bkeys {
|
||||
message.AddEntry(k, 0, true)
|
||||
}
|
||||
for _, p := range bs.strategy.Peers() {
|
||||
err := bs.send(ctx, p, message)
|
||||
if err != nil {
|
||||
log.Errorf("Error sending message: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (bs *bitswap) ReceiveError(err error) {
|
||||
log.Errorf("Bitswap ReceiveError: %s", err)
|
||||
// TODO log the network error
|
||||
@ -337,8 +371,8 @@ func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block *blocks.Block)
|
||||
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
|
||||
message := bsmsg.New()
|
||||
message.AddBlock(block)
|
||||
for _, wanted := range bs.wantlist.Keys() {
|
||||
message.AddWanted(wanted)
|
||||
for _, wanted := range bs.wantlist.Entries() {
|
||||
message.AddEntry(wanted.Value, wanted.Priority, false)
|
||||
}
|
||||
if err := bs.send(ctx, p, message); err != nil {
|
||||
return err
|
||||
|
||||
@ -11,6 +11,7 @@ import (
|
||||
blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil"
|
||||
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
|
||||
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
delay "github.com/jbenet/go-ipfs/util/delay"
|
||||
testutil "github.com/jbenet/go-ipfs/util/testutil"
|
||||
)
|
||||
@ -25,6 +26,7 @@ func TestClose(t *testing.T) {
|
||||
vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
||||
rout := mockrouting.NewServer()
|
||||
sesgen := NewSessionGenerator(vnet, rout)
|
||||
defer sesgen.Stop()
|
||||
bgen := blocksutil.NewBlockGenerator()
|
||||
|
||||
block := bgen.Next()
|
||||
@ -39,6 +41,7 @@ func TestGetBlockTimeout(t *testing.T) {
|
||||
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
||||
rs := mockrouting.NewServer()
|
||||
g := NewSessionGenerator(net, rs)
|
||||
defer g.Stop()
|
||||
|
||||
self := g.Next()
|
||||
|
||||
@ -56,11 +59,13 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) {
|
||||
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
||||
rs := mockrouting.NewServer()
|
||||
g := NewSessionGenerator(net, rs)
|
||||
defer g.Stop()
|
||||
|
||||
block := blocks.NewBlock([]byte("block"))
|
||||
rs.Client(testutil.NewPeerWithIDString("testing")).Provide(context.Background(), block.Key()) // but not on network
|
||||
|
||||
solo := g.Next()
|
||||
defer solo.Exchange.Close()
|
||||
|
||||
ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
|
||||
_, err := solo.Exchange.GetBlock(ctx, block.Key())
|
||||
@ -78,8 +83,10 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
|
||||
rs := mockrouting.NewServer()
|
||||
block := blocks.NewBlock([]byte("block"))
|
||||
g := NewSessionGenerator(net, rs)
|
||||
defer g.Stop()
|
||||
|
||||
hasBlock := g.Next()
|
||||
defer hasBlock.Exchange.Close()
|
||||
|
||||
if err := hasBlock.Blockstore().Put(block); err != nil {
|
||||
t.Fatal(err)
|
||||
@ -89,6 +96,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
|
||||
}
|
||||
|
||||
wantsBlock := g.Next()
|
||||
defer wantsBlock.Exchange.Close()
|
||||
|
||||
ctx, _ := context.WithTimeout(context.Background(), time.Second)
|
||||
received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
|
||||
@ -107,7 +115,7 @@ func TestLargeSwarm(t *testing.T) {
|
||||
t.SkipNow()
|
||||
}
|
||||
t.Parallel()
|
||||
numInstances := 5
|
||||
numInstances := 500
|
||||
numBlocks := 2
|
||||
PerformDistributionTest(t, numInstances, numBlocks)
|
||||
}
|
||||
@ -129,6 +137,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
|
||||
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
||||
rs := mockrouting.NewServer()
|
||||
sg := NewSessionGenerator(net, rs)
|
||||
defer sg.Stop()
|
||||
bg := blocksutil.NewBlockGenerator()
|
||||
|
||||
t.Log("Test a few nodes trying to get one file with a lot of blocks")
|
||||
@ -138,24 +147,29 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
|
||||
|
||||
t.Log("Give the blocks to the first instance")
|
||||
|
||||
var blkeys []u.Key
|
||||
first := instances[0]
|
||||
for _, b := range blocks {
|
||||
first.Blockstore().Put(b)
|
||||
blkeys = append(blkeys, b.Key())
|
||||
first.Exchange.HasBlock(context.Background(), b)
|
||||
rs.Client(first.Peer).Provide(context.Background(), b.Key())
|
||||
}
|
||||
|
||||
t.Log("Distribute!")
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
for _, inst := range instances {
|
||||
for _, b := range blocks {
|
||||
wg.Add(1)
|
||||
// NB: executing getOrFail concurrently puts tremendous pressure on
|
||||
// the goroutine scheduler
|
||||
getOrFail(inst, b, t, &wg)
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(inst Instance) {
|
||||
defer wg.Done()
|
||||
outch, err := inst.Exchange.GetBlocks(context.TODO(), blkeys)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _ = range outch {
|
||||
}
|
||||
}(inst)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
@ -189,6 +203,7 @@ func TestSendToWantingPeer(t *testing.T) {
|
||||
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
||||
rs := mockrouting.NewServer()
|
||||
sg := NewSessionGenerator(net, rs)
|
||||
defer sg.Stop()
|
||||
bg := blocksutil.NewBlockGenerator()
|
||||
|
||||
me := sg.Next()
|
||||
@ -201,7 +216,7 @@ func TestSendToWantingPeer(t *testing.T) {
|
||||
|
||||
alpha := bg.Next()
|
||||
|
||||
const timeout = 100 * time.Millisecond // FIXME don't depend on time
|
||||
const timeout = 1000 * time.Millisecond // FIXME don't depend on time
|
||||
|
||||
t.Logf("Peer %v attempts to get %v. NB: not available\n", w.Peer, alpha.Key())
|
||||
ctx, _ := context.WithTimeout(context.Background(), timeout)
|
||||
@ -246,3 +261,33 @@ func TestSendToWantingPeer(t *testing.T) {
|
||||
t.Fatal("Expected to receive alpha from me")
|
||||
}
|
||||
}
|
||||
|
||||
func TestBasicBitswap(t *testing.T) {
|
||||
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
||||
rs := mockrouting.NewServer()
|
||||
sg := NewSessionGenerator(net, rs)
|
||||
bg := blocksutil.NewBlockGenerator()
|
||||
|
||||
t.Log("Test a few nodes trying to get one file with a lot of blocks")
|
||||
|
||||
instances := sg.Instances(2)
|
||||
blocks := bg.Blocks(1)
|
||||
err := instances[0].Exchange.HasBlock(context.TODO(), blocks[0])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
|
||||
blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Log(blk)
|
||||
for _, inst := range instances {
|
||||
err := inst.Exchange.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -21,16 +21,16 @@ var _ = proto.Marshal
|
||||
var _ = math.Inf
|
||||
|
||||
type Message struct {
|
||||
Wantlist []string `protobuf:"bytes,1,rep,name=wantlist" json:"wantlist,omitempty"`
|
||||
Blocks [][]byte `protobuf:"bytes,2,rep,name=blocks" json:"blocks,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
Wantlist *Message_Wantlist `protobuf:"bytes,1,opt,name=wantlist" json:"wantlist,omitempty"`
|
||||
Blocks [][]byte `protobuf:"bytes,2,rep,name=blocks" json:"blocks,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Message) Reset() { *m = Message{} }
|
||||
func (m *Message) String() string { return proto.CompactTextString(m) }
|
||||
func (*Message) ProtoMessage() {}
|
||||
|
||||
func (m *Message) GetWantlist() []string {
|
||||
func (m *Message) GetWantlist() *Message_Wantlist {
|
||||
if m != nil {
|
||||
return m.Wantlist
|
||||
}
|
||||
@ -44,5 +44,61 @@ func (m *Message) GetBlocks() [][]byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
type Message_Wantlist struct {
|
||||
Entries []*Message_Wantlist_Entry `protobuf:"bytes,1,rep,name=entries" json:"entries,omitempty"`
|
||||
Full *bool `protobuf:"varint,2,opt,name=full" json:"full,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Message_Wantlist) Reset() { *m = Message_Wantlist{} }
|
||||
func (m *Message_Wantlist) String() string { return proto.CompactTextString(m) }
|
||||
func (*Message_Wantlist) ProtoMessage() {}
|
||||
|
||||
func (m *Message_Wantlist) GetEntries() []*Message_Wantlist_Entry {
|
||||
if m != nil {
|
||||
return m.Entries
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Message_Wantlist) GetFull() bool {
|
||||
if m != nil && m.Full != nil {
|
||||
return *m.Full
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type Message_Wantlist_Entry struct {
|
||||
Block *string `protobuf:"bytes,1,opt,name=block" json:"block,omitempty"`
|
||||
Priority *int32 `protobuf:"varint,2,opt,name=priority" json:"priority,omitempty"`
|
||||
Cancel *bool `protobuf:"varint,3,opt,name=cancel" json:"cancel,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Message_Wantlist_Entry) Reset() { *m = Message_Wantlist_Entry{} }
|
||||
func (m *Message_Wantlist_Entry) String() string { return proto.CompactTextString(m) }
|
||||
func (*Message_Wantlist_Entry) ProtoMessage() {}
|
||||
|
||||
func (m *Message_Wantlist_Entry) GetBlock() string {
|
||||
if m != nil && m.Block != nil {
|
||||
return *m.Block
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *Message_Wantlist_Entry) GetPriority() int32 {
|
||||
if m != nil && m.Priority != nil {
|
||||
return *m.Priority
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *Message_Wantlist_Entry) GetCancel() bool {
|
||||
if m != nil && m.Cancel != nil {
|
||||
return *m.Cancel
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func init() {
|
||||
}
|
||||
|
||||
@ -1,6 +1,19 @@
|
||||
package bitswap.message.pb;
|
||||
|
||||
message Message {
|
||||
repeated string wantlist = 1;
|
||||
repeated bytes blocks = 2;
|
||||
|
||||
message Wantlist {
|
||||
|
||||
message Entry {
|
||||
optional string block = 1; // the block key
|
||||
optional int32 priority = 2; // the priority (normalized). default to 1
|
||||
optional bool cancel = 3; // whether this revokes an entry
|
||||
}
|
||||
|
||||
repeated Entry entries = 1; // a list of wantlist entries
|
||||
optional bool full = 2; // whether this is the full wantlist. default to false
|
||||
}
|
||||
|
||||
optional Wantlist wantlist = 1;
|
||||
repeated bytes blocks = 2;
|
||||
}
|
||||
|
||||
@ -9,6 +9,7 @@ import (
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
|
||||
ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
|
||||
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
||||
)
|
||||
|
||||
// TODO move message.go into the bitswap package
|
||||
@ -17,21 +18,21 @@ import (
|
||||
type BitSwapMessage interface {
|
||||
// Wantlist returns a slice of unique keys that represent data wanted by
|
||||
// the sender.
|
||||
Wantlist() []u.Key
|
||||
Wantlist() []*Entry
|
||||
|
||||
// Blocks returns a slice of unique blocks
|
||||
Blocks() []*blocks.Block
|
||||
|
||||
// AddWanted adds the key to the Wantlist.
|
||||
//
|
||||
// Insertion order determines priority. That is, earlier insertions are
|
||||
// deemed higher priority than keys inserted later.
|
||||
//
|
||||
// t = 0, msg.AddWanted(A)
|
||||
// t = 1, msg.AddWanted(B)
|
||||
//
|
||||
// implies Priority(A) > Priority(B)
|
||||
AddWanted(u.Key)
|
||||
// AddEntry adds an entry to the Wantlist.
|
||||
AddEntry(u.Key, int, bool)
|
||||
|
||||
// Sets whether or not the contained wantlist represents the entire wantlist
|
||||
// true = full wantlist
|
||||
// false = wantlist 'patch'
|
||||
// default: true
|
||||
SetFull(bool)
|
||||
|
||||
Full() bool
|
||||
|
||||
AddBlock(*blocks.Block)
|
||||
Exportable
|
||||
@ -43,23 +44,30 @@ type Exportable interface {
|
||||
}
|
||||
|
||||
type impl struct {
|
||||
existsInWantlist map[u.Key]struct{} // map to detect duplicates
|
||||
wantlist []u.Key // slice to preserve ordering
|
||||
blocks map[u.Key]*blocks.Block // map to detect duplicates
|
||||
full bool
|
||||
wantlist map[u.Key]*Entry
|
||||
blocks map[u.Key]*blocks.Block // map to detect duplicates
|
||||
}
|
||||
|
||||
func New() BitSwapMessage {
|
||||
return &impl{
|
||||
blocks: make(map[u.Key]*blocks.Block),
|
||||
existsInWantlist: make(map[u.Key]struct{}),
|
||||
wantlist: make([]u.Key, 0),
|
||||
blocks: make(map[u.Key]*blocks.Block),
|
||||
wantlist: make(map[u.Key]*Entry),
|
||||
full: true,
|
||||
}
|
||||
}
|
||||
|
||||
type Entry struct {
|
||||
Key u.Key
|
||||
Priority int
|
||||
Cancel bool
|
||||
}
|
||||
|
||||
func newMessageFromProto(pbm pb.Message) BitSwapMessage {
|
||||
m := New()
|
||||
for _, s := range pbm.GetWantlist() {
|
||||
m.AddWanted(u.Key(s))
|
||||
m.SetFull(pbm.GetWantlist().GetFull())
|
||||
for _, e := range pbm.GetWantlist().GetEntries() {
|
||||
m.AddEntry(u.Key(e.GetBlock()), int(e.GetPriority()), e.GetCancel())
|
||||
}
|
||||
for _, d := range pbm.GetBlocks() {
|
||||
b := blocks.NewBlock(d)
|
||||
@ -68,8 +76,20 @@ func newMessageFromProto(pbm pb.Message) BitSwapMessage {
|
||||
return m
|
||||
}
|
||||
|
||||
func (m *impl) Wantlist() []u.Key {
|
||||
return m.wantlist
|
||||
func (m *impl) SetFull(full bool) {
|
||||
m.full = full
|
||||
}
|
||||
|
||||
func (m *impl) Full() bool {
|
||||
return m.full
|
||||
}
|
||||
|
||||
func (m *impl) Wantlist() []*Entry {
|
||||
var out []*Entry
|
||||
for _, e := range m.wantlist {
|
||||
out = append(out, e)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (m *impl) Blocks() []*blocks.Block {
|
||||
@ -80,13 +100,18 @@ func (m *impl) Blocks() []*blocks.Block {
|
||||
return bs
|
||||
}
|
||||
|
||||
func (m *impl) AddWanted(k u.Key) {
|
||||
_, exists := m.existsInWantlist[k]
|
||||
func (m *impl) AddEntry(k u.Key, priority int, cancel bool) {
|
||||
e, exists := m.wantlist[k]
|
||||
if exists {
|
||||
return
|
||||
e.Priority = priority
|
||||
e.Cancel = cancel
|
||||
} else {
|
||||
m.wantlist[k] = &Entry{
|
||||
Key: k,
|
||||
Priority: priority,
|
||||
Cancel: cancel,
|
||||
}
|
||||
}
|
||||
m.existsInWantlist[k] = struct{}{}
|
||||
m.wantlist = append(m.wantlist, k)
|
||||
}
|
||||
|
||||
func (m *impl) AddBlock(b *blocks.Block) {
|
||||
@ -106,14 +131,19 @@ func FromNet(r io.Reader) (BitSwapMessage, error) {
|
||||
}
|
||||
|
||||
func (m *impl) ToProto() *pb.Message {
|
||||
pb := new(pb.Message)
|
||||
for _, k := range m.Wantlist() {
|
||||
pb.Wantlist = append(pb.Wantlist, string(k))
|
||||
pbm := new(pb.Message)
|
||||
pbm.Wantlist = new(pb.Message_Wantlist)
|
||||
for _, e := range m.wantlist {
|
||||
pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
|
||||
Block: proto.String(string(e.Key)),
|
||||
Priority: proto.Int32(int32(e.Priority)),
|
||||
Cancel: &e.Cancel,
|
||||
})
|
||||
}
|
||||
for _, b := range m.Blocks() {
|
||||
pb.Blocks = append(pb.Blocks, b.Data)
|
||||
pbm.Blocks = append(pbm.Blocks, b.Data)
|
||||
}
|
||||
return pb
|
||||
return pbm
|
||||
}
|
||||
|
||||
func (m *impl) ToNet(w io.Writer) error {
|
||||
|
||||
@ -4,6 +4,8 @@ import (
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
||||
|
||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||
pb "github.com/jbenet/go-ipfs/exchange/bitswap/message/internal/pb"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
@ -12,22 +14,26 @@ import (
|
||||
func TestAppendWanted(t *testing.T) {
|
||||
const str = "foo"
|
||||
m := New()
|
||||
m.AddWanted(u.Key(str))
|
||||
m.AddEntry(u.Key(str), 1, false)
|
||||
|
||||
if !contains(m.ToProto().GetWantlist(), str) {
|
||||
if !wantlistContains(m.ToProto().GetWantlist(), str) {
|
||||
t.Fail()
|
||||
}
|
||||
m.ToProto().GetWantlist().GetEntries()
|
||||
}
|
||||
|
||||
func TestNewMessageFromProto(t *testing.T) {
|
||||
const str = "a_key"
|
||||
protoMessage := new(pb.Message)
|
||||
protoMessage.Wantlist = []string{string(str)}
|
||||
if !contains(protoMessage.Wantlist, str) {
|
||||
protoMessage.Wantlist = new(pb.Message_Wantlist)
|
||||
protoMessage.Wantlist.Entries = []*pb.Message_Wantlist_Entry{
|
||||
&pb.Message_Wantlist_Entry{Block: proto.String(str)},
|
||||
}
|
||||
if !wantlistContains(protoMessage.Wantlist, str) {
|
||||
t.Fail()
|
||||
}
|
||||
m := newMessageFromProto(*protoMessage)
|
||||
if !contains(m.ToProto().GetWantlist(), str) {
|
||||
if !wantlistContains(m.ToProto().GetWantlist(), str) {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
@ -57,7 +63,7 @@ func TestWantlist(t *testing.T) {
|
||||
keystrs := []string{"foo", "bar", "baz", "bat"}
|
||||
m := New()
|
||||
for _, s := range keystrs {
|
||||
m.AddWanted(u.Key(s))
|
||||
m.AddEntry(u.Key(s), 1, false)
|
||||
}
|
||||
exported := m.Wantlist()
|
||||
|
||||
@ -65,12 +71,12 @@ func TestWantlist(t *testing.T) {
|
||||
present := false
|
||||
for _, s := range keystrs {
|
||||
|
||||
if s == string(k) {
|
||||
if s == string(k.Key) {
|
||||
present = true
|
||||
}
|
||||
}
|
||||
if !present {
|
||||
t.Logf("%v isn't in original list", string(k))
|
||||
t.Logf("%v isn't in original list", k.Key)
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
@ -80,19 +86,19 @@ func TestCopyProtoByValue(t *testing.T) {
|
||||
const str = "foo"
|
||||
m := New()
|
||||
protoBeforeAppend := m.ToProto()
|
||||
m.AddWanted(u.Key(str))
|
||||
if contains(protoBeforeAppend.GetWantlist(), str) {
|
||||
m.AddEntry(u.Key(str), 1, false)
|
||||
if wantlistContains(protoBeforeAppend.GetWantlist(), str) {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func TestToNetFromNetPreservesWantList(t *testing.T) {
|
||||
original := New()
|
||||
original.AddWanted(u.Key("M"))
|
||||
original.AddWanted(u.Key("B"))
|
||||
original.AddWanted(u.Key("D"))
|
||||
original.AddWanted(u.Key("T"))
|
||||
original.AddWanted(u.Key("F"))
|
||||
original.AddEntry(u.Key("M"), 1, false)
|
||||
original.AddEntry(u.Key("B"), 1, false)
|
||||
original.AddEntry(u.Key("D"), 1, false)
|
||||
original.AddEntry(u.Key("T"), 1, false)
|
||||
original.AddEntry(u.Key("F"), 1, false)
|
||||
|
||||
var buf bytes.Buffer
|
||||
if err := original.ToNet(&buf); err != nil {
|
||||
@ -106,11 +112,11 @@ func TestToNetFromNetPreservesWantList(t *testing.T) {
|
||||
|
||||
keys := make(map[u.Key]bool)
|
||||
for _, k := range copied.Wantlist() {
|
||||
keys[k] = true
|
||||
keys[k.Key] = true
|
||||
}
|
||||
|
||||
for _, k := range original.Wantlist() {
|
||||
if _, ok := keys[k]; !ok {
|
||||
if _, ok := keys[k.Key]; !ok {
|
||||
t.Fatalf("Key Missing: \"%v\"", k)
|
||||
}
|
||||
}
|
||||
@ -146,9 +152,18 @@ func TestToAndFromNetMessage(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func contains(s []string, x string) bool {
|
||||
for _, a := range s {
|
||||
if a == x {
|
||||
func wantlistContains(wantlist *pb.Message_Wantlist, x string) bool {
|
||||
for _, e := range wantlist.GetEntries() {
|
||||
if e.GetBlock() == x {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func contains(strs []string, x string) bool {
|
||||
for _, s := range strs {
|
||||
if s == x {
|
||||
return true
|
||||
}
|
||||
}
|
||||
@ -159,8 +174,8 @@ func TestDuplicates(t *testing.T) {
|
||||
b := blocks.NewBlock([]byte("foo"))
|
||||
msg := New()
|
||||
|
||||
msg.AddWanted(b.Key())
|
||||
msg.AddWanted(b.Key())
|
||||
msg.AddEntry(b.Key(), 1, false)
|
||||
msg.AddEntry(b.Key(), 1, false)
|
||||
if len(msg.Wantlist()) != 1 {
|
||||
t.Fatal("Duplicate in BitSwapMessage")
|
||||
}
|
||||
|
||||
@ -3,6 +3,7 @@ package strategy
|
||||
import (
|
||||
"time"
|
||||
|
||||
bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
@ -34,6 +35,8 @@ type Strategy interface {
|
||||
|
||||
BlockSentToPeer(u.Key, peer.Peer)
|
||||
|
||||
GetAllocation(int, bstore.Blockstore) ([]*Task, error)
|
||||
|
||||
// Values determining bitswap behavioural patterns
|
||||
GetBatchSize() int
|
||||
GetRebroadcastDelay() time.Duration
|
||||
|
||||
@ -3,6 +3,7 @@ package strategy
|
||||
import (
|
||||
"time"
|
||||
|
||||
wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
@ -13,7 +14,7 @@ type keySet map[u.Key]struct{}
|
||||
|
||||
func newLedger(p peer.Peer, strategy strategyFunc) *ledger {
|
||||
return &ledger{
|
||||
wantList: keySet{},
|
||||
wantList: wl.NewWantlist(),
|
||||
Strategy: strategy,
|
||||
Partner: p,
|
||||
sentToPeer: make(map[u.Key]time.Time),
|
||||
@ -39,7 +40,7 @@ type ledger struct {
|
||||
exchangeCount uint64
|
||||
|
||||
// wantList is a (bounded, small) set of keys that Partner desires.
|
||||
wantList keySet
|
||||
wantList *wl.Wantlist
|
||||
|
||||
// sentToPeer is a set of keys to ensure we dont send duplicate blocks
|
||||
// to a given peer
|
||||
@ -65,14 +66,17 @@ func (l *ledger) ReceivedBytes(n int) {
|
||||
}
|
||||
|
||||
// TODO: this needs to be different. We need timeouts.
|
||||
func (l *ledger) Wants(k u.Key) {
|
||||
func (l *ledger) Wants(k u.Key, priority int) {
|
||||
log.Debugf("peer %s wants %s", l.Partner, k)
|
||||
l.wantList[k] = struct{}{}
|
||||
l.wantList.Add(k, priority)
|
||||
}
|
||||
|
||||
func (l *ledger) CancelWant(k u.Key) {
|
||||
l.wantList.Remove(k)
|
||||
}
|
||||
|
||||
func (l *ledger) WantListContains(k u.Key) bool {
|
||||
_, ok := l.wantList[k]
|
||||
return ok
|
||||
return l.wantList.Contains(k)
|
||||
}
|
||||
|
||||
func (l *ledger) ExchangeCount() uint64 {
|
||||
|
||||
@ -5,7 +5,10 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||
bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
||||
wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
@ -77,6 +80,60 @@ func (s *strategist) ShouldSendBlockToPeer(k u.Key, p peer.Peer) bool {
|
||||
return ledger.ShouldSend()
|
||||
}
|
||||
|
||||
type Task struct {
|
||||
Peer peer.Peer
|
||||
Blocks []*blocks.Block
|
||||
}
|
||||
|
||||
func (s *strategist) GetAllocation(bandwidth int, bs bstore.Blockstore) ([]*Task, error) {
|
||||
var tasks []*Task
|
||||
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
var partners []peer.Peer
|
||||
for _, ledger := range s.ledgerMap {
|
||||
if ledger.ShouldSend() {
|
||||
partners = append(partners, ledger.Partner)
|
||||
}
|
||||
}
|
||||
if len(partners) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
bandwidthPerPeer := bandwidth / len(partners)
|
||||
for _, p := range partners {
|
||||
blksForPeer, err := s.getSendableBlocks(s.ledger(p).wantList, bs, bandwidthPerPeer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tasks = append(tasks, &Task{
|
||||
Peer: p,
|
||||
Blocks: blksForPeer,
|
||||
})
|
||||
}
|
||||
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
func (s *strategist) getSendableBlocks(wantlist *wl.Wantlist, bs bstore.Blockstore, bw int) ([]*blocks.Block, error) {
|
||||
var outblocks []*blocks.Block
|
||||
for _, e := range wantlist.Entries() {
|
||||
block, err := bs.Get(e.Value)
|
||||
if err == u.ErrNotFound {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
outblocks = append(outblocks, block)
|
||||
bw -= len(block.Data)
|
||||
if bw <= 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
return outblocks, nil
|
||||
}
|
||||
|
||||
func (s *strategist) BlockSentToPeer(k u.Key, p peer.Peer) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
@ -106,8 +163,15 @@ func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error
|
||||
return errors.New("Strategy received nil message")
|
||||
}
|
||||
l := s.ledger(p)
|
||||
for _, key := range m.Wantlist() {
|
||||
l.Wants(key)
|
||||
if m.Full() {
|
||||
l.wantList = wl.NewWantlist()
|
||||
}
|
||||
for _, e := range m.Wantlist() {
|
||||
if e.Cancel {
|
||||
l.CancelWant(e.Key)
|
||||
} else {
|
||||
l.Wants(e.Key, e.Priority)
|
||||
}
|
||||
}
|
||||
for _, block := range m.Blocks() {
|
||||
// FIXME extract blocks.NumBytes(block) or block.NumBytes() method
|
||||
@ -165,5 +229,5 @@ func (s *strategist) GetBatchSize() int {
|
||||
}
|
||||
|
||||
func (s *strategist) GetRebroadcastDelay() time.Duration {
|
||||
return time.Second * 5
|
||||
return time.Second * 10
|
||||
}
|
||||
|
||||
@ -61,7 +61,7 @@ func TestBlockRecordedAsWantedAfterMessageReceived(t *testing.T) {
|
||||
block := blocks.NewBlock([]byte("data wanted by beggar"))
|
||||
|
||||
messageFromBeggarToChooser := message.New()
|
||||
messageFromBeggarToChooser.AddWanted(block.Key())
|
||||
messageFromBeggarToChooser.AddEntry(block.Key(), 1, false)
|
||||
|
||||
chooser.MessageReceived(beggar.Peer, messageFromBeggarToChooser)
|
||||
// for this test, doesn't matter if you record that beggar sent
|
||||
|
||||
Loading…
Reference in New Issue
Block a user