diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 38b0ec272..84f4b4c60 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -1,6 +1,6 @@ { "ImportPath": "github.com/jbenet/go-ipfs", - "GoVersion": "go1.3", + "GoVersion": "devel +ffe33f1f1f17 Tue Nov 25 15:41:33 2014 +1100", "Packages": [ "./..." ], diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 64f293528..1e0e86b61 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -15,6 +15,7 @@ import ( bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications" strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy" + wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" eventlog "github.com/jbenet/go-ipfs/util/eventlog" @@ -29,6 +30,8 @@ const maxProvidersPerRequest = 3 const providerRequestTimeout = time.Second * 10 const hasBlockTimeout = time.Second * 15 +const roundTime = time.Second / 2 + // New initializes a BitSwap instance that communicates over the // provided BitSwapNetwork. This function registers the returned instance as // the network delegate. @@ -41,6 +44,7 @@ func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, rout notif := notifications.New() go func() { <-ctx.Done() + cancelFunc() notif.Shutdown() }() @@ -51,11 +55,12 @@ func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, rout strategy: strategy.New(nice), routing: routing, sender: network, - wantlist: u.NewKeySet(), + wantlist: wl.NewWantlist(), batchRequests: make(chan []u.Key, 32), } network.SetDelegate(bs) go bs.loop(ctx) + go bs.roundWorker(ctx) return bs } @@ -85,7 +90,7 @@ type bitswap struct { // TODO(brian): save the strategy's state to the datastore strategy strategy.Strategy - wantlist u.KeySet + wantlist *wl.Wantlist // cancelFunc signals cancellation to the bitswap event loop cancelFunc func() @@ -166,8 +171,8 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e panic("Cant send wantlist to nil peerchan") } message := bsmsg.New() - for _, wanted := range bs.wantlist.Keys() { - message.AddWanted(wanted) + for _, wanted := range bs.wantlist.Entries() { + message.AddEntry(wanted.Value, wanted.Priority, false) } for peerToQuery := range peers { log.Debug("sending query to: %s", peerToQuery) @@ -195,9 +200,9 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e return nil } -func (bs *bitswap) sendWantlistToProviders(ctx context.Context, ks []u.Key) { +func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wl.Wantlist) { wg := sync.WaitGroup{} - for _, k := range ks { + for _, e := range wantlist.Entries() { wg.Add(1) go func(k u.Key) { child, _ := context.WithTimeout(ctx, providerRequestTimeout) @@ -208,11 +213,44 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, ks []u.Key) { log.Errorf("error sending wantlist: %s", err) } wg.Done() - }(k) + }(e.Value) } wg.Wait() } +func (bs *bitswap) roundWorker(ctx context.Context) { + roundTicker := time.NewTicker(roundTime) + bandwidthPerRound := 500000 + for { + select { + case <-ctx.Done(): + return + case <-roundTicker.C: + alloc, err := bs.strategy.GetAllocation(bandwidthPerRound, bs.blockstore) + if err != nil { + log.Critical("%s", err) + } + //log.Errorf("Allocation: %v", alloc) + bs.processStrategyAllocation(ctx, alloc) + } + } +} + +func (bs *bitswap) processStrategyAllocation(ctx context.Context, alloc []*strategy.Task) { + for _, t := range alloc { + for _, block := range t.Blocks { + message := bsmsg.New() + message.AddBlock(block) + for _, wanted := range bs.wantlist.Entries() { + message.AddEntry(wanted.Value, wanted.Priority, false) + } + if err := bs.send(ctx, t.Peer, message); err != nil { + log.Errorf("Message Send Failed: %s", err) + } + } + } +} + // TODO ensure only one active request per key func (bs *bitswap) loop(parent context.Context) { @@ -228,7 +266,7 @@ func (bs *bitswap) loop(parent context.Context) { select { case <-broadcastSignal.C: // Resend unfulfilled wantlist keys - bs.sendWantlistToProviders(ctx, bs.wantlist.Keys()) + bs.sendWantlistToProviders(ctx, bs.wantlist) case ks := <-bs.batchRequests: // TODO: implement batching on len(ks) > X for some X // i.e. if given 20 keys, fetch first five, then next @@ -239,7 +277,7 @@ func (bs *bitswap) loop(parent context.Context) { continue } for _, k := range ks { - bs.wantlist.Add(k) + bs.wantlist.Add(k, 1) } // NB: send want list to providers for the first peer in this list. // the assumption is made that the providers of the first key in @@ -277,45 +315,41 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm return nil, nil } - // Record message bytes in ledger - // TODO: this is bad, and could be easily abused. - // Should only track *useful* messages in ledger // This call records changes to wantlists, blocks received, // and number of bytes transfered. bs.strategy.MessageReceived(p, incoming) + // TODO: this is bad, and could be easily abused. + // Should only track *useful* messages in ledger + var blkeys []u.Key for _, block := range incoming.Blocks() { + blkeys = append(blkeys, block.Key()) if err := bs.HasBlock(ctx, block); err != nil { log.Error(err) } } - - for _, key := range incoming.Wantlist() { - if bs.strategy.ShouldSendBlockToPeer(key, p) { - if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil { - continue - } else { - // Create a separate message to send this block in - blkmsg := bsmsg.New() - - // TODO: only send this the first time - // no sense in sending our wantlist to the - // same peer multiple times - for _, k := range bs.wantlist.Keys() { - blkmsg.AddWanted(k) - } - - blkmsg.AddBlock(block) - bs.send(ctx, p, blkmsg) - bs.strategy.BlockSentToPeer(block.Key(), p) - } - } + if len(blkeys) > 0 { + bs.cancelBlocks(ctx, blkeys) } // TODO: consider changing this function to not return anything return nil, nil } +func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) { + message := bsmsg.New() + message.SetFull(false) + for _, k := range bkeys { + message.AddEntry(k, 0, true) + } + for _, p := range bs.strategy.Peers() { + err := bs.send(ctx, p, message) + if err != nil { + log.Errorf("Error sending message: %s", err) + } + } +} + func (bs *bitswap) ReceiveError(err error) { log.Errorf("Bitswap ReceiveError: %s", err) // TODO log the network error @@ -337,8 +371,8 @@ func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block *blocks.Block) if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) { message := bsmsg.New() message.AddBlock(block) - for _, wanted := range bs.wantlist.Keys() { - message.AddWanted(wanted) + for _, wanted := range bs.wantlist.Entries() { + message.AddEntry(wanted.Value, wanted.Priority, false) } if err := bs.send(ctx, p, message); err != nil { return err diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index d58ff596a..0e72883cc 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -11,6 +11,7 @@ import ( blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil" tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" mockrouting "github.com/jbenet/go-ipfs/routing/mock" + u "github.com/jbenet/go-ipfs/util" delay "github.com/jbenet/go-ipfs/util/delay" testutil "github.com/jbenet/go-ipfs/util/testutil" ) @@ -25,6 +26,7 @@ func TestClose(t *testing.T) { vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) rout := mockrouting.NewServer() sesgen := NewSessionGenerator(vnet, rout) + defer sesgen.Stop() bgen := blocksutil.NewBlockGenerator() block := bgen.Next() @@ -39,6 +41,7 @@ func TestGetBlockTimeout(t *testing.T) { net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) rs := mockrouting.NewServer() g := NewSessionGenerator(net, rs) + defer g.Stop() self := g.Next() @@ -56,11 +59,13 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) rs := mockrouting.NewServer() g := NewSessionGenerator(net, rs) + defer g.Stop() block := blocks.NewBlock([]byte("block")) rs.Client(testutil.NewPeerWithIDString("testing")).Provide(context.Background(), block.Key()) // but not on network solo := g.Next() + defer solo.Exchange.Close() ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond) _, err := solo.Exchange.GetBlock(ctx, block.Key()) @@ -78,8 +83,10 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { rs := mockrouting.NewServer() block := blocks.NewBlock([]byte("block")) g := NewSessionGenerator(net, rs) + defer g.Stop() hasBlock := g.Next() + defer hasBlock.Exchange.Close() if err := hasBlock.Blockstore().Put(block); err != nil { t.Fatal(err) @@ -89,6 +96,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { } wantsBlock := g.Next() + defer wantsBlock.Exchange.Close() ctx, _ := context.WithTimeout(context.Background(), time.Second) received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key()) @@ -107,7 +115,7 @@ func TestLargeSwarm(t *testing.T) { t.SkipNow() } t.Parallel() - numInstances := 5 + numInstances := 500 numBlocks := 2 PerformDistributionTest(t, numInstances, numBlocks) } @@ -129,6 +137,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) rs := mockrouting.NewServer() sg := NewSessionGenerator(net, rs) + defer sg.Stop() bg := blocksutil.NewBlockGenerator() t.Log("Test a few nodes trying to get one file with a lot of blocks") @@ -138,24 +147,29 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { t.Log("Give the blocks to the first instance") + var blkeys []u.Key first := instances[0] for _, b := range blocks { first.Blockstore().Put(b) + blkeys = append(blkeys, b.Key()) first.Exchange.HasBlock(context.Background(), b) rs.Client(first.Peer).Provide(context.Background(), b.Key()) } t.Log("Distribute!") - var wg sync.WaitGroup - + wg := sync.WaitGroup{} for _, inst := range instances { - for _, b := range blocks { - wg.Add(1) - // NB: executing getOrFail concurrently puts tremendous pressure on - // the goroutine scheduler - getOrFail(inst, b, t, &wg) - } + wg.Add(1) + go func(inst Instance) { + defer wg.Done() + outch, err := inst.Exchange.GetBlocks(context.TODO(), blkeys) + if err != nil { + t.Fatal(err) + } + for _ = range outch { + } + }(inst) } wg.Wait() @@ -189,6 +203,7 @@ func TestSendToWantingPeer(t *testing.T) { net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) rs := mockrouting.NewServer() sg := NewSessionGenerator(net, rs) + defer sg.Stop() bg := blocksutil.NewBlockGenerator() me := sg.Next() @@ -201,7 +216,7 @@ func TestSendToWantingPeer(t *testing.T) { alpha := bg.Next() - const timeout = 100 * time.Millisecond // FIXME don't depend on time + const timeout = 1000 * time.Millisecond // FIXME don't depend on time t.Logf("Peer %v attempts to get %v. NB: not available\n", w.Peer, alpha.Key()) ctx, _ := context.WithTimeout(context.Background(), timeout) @@ -246,3 +261,33 @@ func TestSendToWantingPeer(t *testing.T) { t.Fatal("Expected to receive alpha from me") } } + +func TestBasicBitswap(t *testing.T) { + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) + rs := mockrouting.NewServer() + sg := NewSessionGenerator(net, rs) + bg := blocksutil.NewBlockGenerator() + + t.Log("Test a few nodes trying to get one file with a lot of blocks") + + instances := sg.Instances(2) + blocks := bg.Blocks(1) + err := instances[0].Exchange.HasBlock(context.TODO(), blocks[0]) + if err != nil { + t.Fatal(err) + } + + ctx, _ := context.WithTimeout(context.TODO(), time.Second*5) + blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key()) + if err != nil { + t.Fatal(err) + } + + t.Log(blk) + for _, inst := range instances { + err := inst.Exchange.Close() + if err != nil { + t.Fatal(err) + } + } +} diff --git a/exchange/bitswap/message/internal/pb/message.pb.go b/exchange/bitswap/message/internal/pb/message.pb.go index f6f8a9bbc..4ddfc56f7 100644 --- a/exchange/bitswap/message/internal/pb/message.pb.go +++ b/exchange/bitswap/message/internal/pb/message.pb.go @@ -21,16 +21,16 @@ var _ = proto.Marshal var _ = math.Inf type Message struct { - Wantlist []string `protobuf:"bytes,1,rep,name=wantlist" json:"wantlist,omitempty"` - Blocks [][]byte `protobuf:"bytes,2,rep,name=blocks" json:"blocks,omitempty"` - XXX_unrecognized []byte `json:"-"` + Wantlist *Message_Wantlist `protobuf:"bytes,1,opt,name=wantlist" json:"wantlist,omitempty"` + Blocks [][]byte `protobuf:"bytes,2,rep,name=blocks" json:"blocks,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *Message) Reset() { *m = Message{} } func (m *Message) String() string { return proto.CompactTextString(m) } func (*Message) ProtoMessage() {} -func (m *Message) GetWantlist() []string { +func (m *Message) GetWantlist() *Message_Wantlist { if m != nil { return m.Wantlist } @@ -44,5 +44,61 @@ func (m *Message) GetBlocks() [][]byte { return nil } +type Message_Wantlist struct { + Entries []*Message_Wantlist_Entry `protobuf:"bytes,1,rep,name=entries" json:"entries,omitempty"` + Full *bool `protobuf:"varint,2,opt,name=full" json:"full,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Message_Wantlist) Reset() { *m = Message_Wantlist{} } +func (m *Message_Wantlist) String() string { return proto.CompactTextString(m) } +func (*Message_Wantlist) ProtoMessage() {} + +func (m *Message_Wantlist) GetEntries() []*Message_Wantlist_Entry { + if m != nil { + return m.Entries + } + return nil +} + +func (m *Message_Wantlist) GetFull() bool { + if m != nil && m.Full != nil { + return *m.Full + } + return false +} + +type Message_Wantlist_Entry struct { + Block *string `protobuf:"bytes,1,opt,name=block" json:"block,omitempty"` + Priority *int32 `protobuf:"varint,2,opt,name=priority" json:"priority,omitempty"` + Cancel *bool `protobuf:"varint,3,opt,name=cancel" json:"cancel,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Message_Wantlist_Entry) Reset() { *m = Message_Wantlist_Entry{} } +func (m *Message_Wantlist_Entry) String() string { return proto.CompactTextString(m) } +func (*Message_Wantlist_Entry) ProtoMessage() {} + +func (m *Message_Wantlist_Entry) GetBlock() string { + if m != nil && m.Block != nil { + return *m.Block + } + return "" +} + +func (m *Message_Wantlist_Entry) GetPriority() int32 { + if m != nil && m.Priority != nil { + return *m.Priority + } + return 0 +} + +func (m *Message_Wantlist_Entry) GetCancel() bool { + if m != nil && m.Cancel != nil { + return *m.Cancel + } + return false +} + func init() { } diff --git a/exchange/bitswap/message/internal/pb/message.proto b/exchange/bitswap/message/internal/pb/message.proto index a8c6c7252..7c44f3a6b 100644 --- a/exchange/bitswap/message/internal/pb/message.proto +++ b/exchange/bitswap/message/internal/pb/message.proto @@ -1,6 +1,19 @@ package bitswap.message.pb; message Message { - repeated string wantlist = 1; - repeated bytes blocks = 2; + + message Wantlist { + + message Entry { + optional string block = 1; // the block key + optional int32 priority = 2; // the priority (normalized). default to 1 + optional bool cancel = 3; // whether this revokes an entry + } + + repeated Entry entries = 1; // a list of wantlist entries + optional bool full = 2; // whether this is the full wantlist. default to false + } + + optional Wantlist wantlist = 1; + repeated bytes blocks = 2; } diff --git a/exchange/bitswap/message/message.go b/exchange/bitswap/message/message.go index 62a39be91..288fc9da7 100644 --- a/exchange/bitswap/message/message.go +++ b/exchange/bitswap/message/message.go @@ -9,6 +9,7 @@ import ( u "github.com/jbenet/go-ipfs/util" ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io" + proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" ) // TODO move message.go into the bitswap package @@ -17,21 +18,21 @@ import ( type BitSwapMessage interface { // Wantlist returns a slice of unique keys that represent data wanted by // the sender. - Wantlist() []u.Key + Wantlist() []*Entry // Blocks returns a slice of unique blocks Blocks() []*blocks.Block - // AddWanted adds the key to the Wantlist. - // - // Insertion order determines priority. That is, earlier insertions are - // deemed higher priority than keys inserted later. - // - // t = 0, msg.AddWanted(A) - // t = 1, msg.AddWanted(B) - // - // implies Priority(A) > Priority(B) - AddWanted(u.Key) + // AddEntry adds an entry to the Wantlist. + AddEntry(u.Key, int, bool) + + // Sets whether or not the contained wantlist represents the entire wantlist + // true = full wantlist + // false = wantlist 'patch' + // default: true + SetFull(bool) + + Full() bool AddBlock(*blocks.Block) Exportable @@ -43,23 +44,30 @@ type Exportable interface { } type impl struct { - existsInWantlist map[u.Key]struct{} // map to detect duplicates - wantlist []u.Key // slice to preserve ordering - blocks map[u.Key]*blocks.Block // map to detect duplicates + full bool + wantlist map[u.Key]*Entry + blocks map[u.Key]*blocks.Block // map to detect duplicates } func New() BitSwapMessage { return &impl{ - blocks: make(map[u.Key]*blocks.Block), - existsInWantlist: make(map[u.Key]struct{}), - wantlist: make([]u.Key, 0), + blocks: make(map[u.Key]*blocks.Block), + wantlist: make(map[u.Key]*Entry), + full: true, } } +type Entry struct { + Key u.Key + Priority int + Cancel bool +} + func newMessageFromProto(pbm pb.Message) BitSwapMessage { m := New() - for _, s := range pbm.GetWantlist() { - m.AddWanted(u.Key(s)) + m.SetFull(pbm.GetWantlist().GetFull()) + for _, e := range pbm.GetWantlist().GetEntries() { + m.AddEntry(u.Key(e.GetBlock()), int(e.GetPriority()), e.GetCancel()) } for _, d := range pbm.GetBlocks() { b := blocks.NewBlock(d) @@ -68,8 +76,20 @@ func newMessageFromProto(pbm pb.Message) BitSwapMessage { return m } -func (m *impl) Wantlist() []u.Key { - return m.wantlist +func (m *impl) SetFull(full bool) { + m.full = full +} + +func (m *impl) Full() bool { + return m.full +} + +func (m *impl) Wantlist() []*Entry { + var out []*Entry + for _, e := range m.wantlist { + out = append(out, e) + } + return out } func (m *impl) Blocks() []*blocks.Block { @@ -80,13 +100,18 @@ func (m *impl) Blocks() []*blocks.Block { return bs } -func (m *impl) AddWanted(k u.Key) { - _, exists := m.existsInWantlist[k] +func (m *impl) AddEntry(k u.Key, priority int, cancel bool) { + e, exists := m.wantlist[k] if exists { - return + e.Priority = priority + e.Cancel = cancel + } else { + m.wantlist[k] = &Entry{ + Key: k, + Priority: priority, + Cancel: cancel, + } } - m.existsInWantlist[k] = struct{}{} - m.wantlist = append(m.wantlist, k) } func (m *impl) AddBlock(b *blocks.Block) { @@ -106,14 +131,19 @@ func FromNet(r io.Reader) (BitSwapMessage, error) { } func (m *impl) ToProto() *pb.Message { - pb := new(pb.Message) - for _, k := range m.Wantlist() { - pb.Wantlist = append(pb.Wantlist, string(k)) + pbm := new(pb.Message) + pbm.Wantlist = new(pb.Message_Wantlist) + for _, e := range m.wantlist { + pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{ + Block: proto.String(string(e.Key)), + Priority: proto.Int32(int32(e.Priority)), + Cancel: &e.Cancel, + }) } for _, b := range m.Blocks() { - pb.Blocks = append(pb.Blocks, b.Data) + pbm.Blocks = append(pbm.Blocks, b.Data) } - return pb + return pbm } func (m *impl) ToNet(w io.Writer) error { diff --git a/exchange/bitswap/message/message_test.go b/exchange/bitswap/message/message_test.go index 681b60a6f..29eb6eb4e 100644 --- a/exchange/bitswap/message/message_test.go +++ b/exchange/bitswap/message/message_test.go @@ -4,6 +4,8 @@ import ( "bytes" "testing" + proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" + blocks "github.com/jbenet/go-ipfs/blocks" pb "github.com/jbenet/go-ipfs/exchange/bitswap/message/internal/pb" u "github.com/jbenet/go-ipfs/util" @@ -12,22 +14,26 @@ import ( func TestAppendWanted(t *testing.T) { const str = "foo" m := New() - m.AddWanted(u.Key(str)) + m.AddEntry(u.Key(str), 1, false) - if !contains(m.ToProto().GetWantlist(), str) { + if !wantlistContains(m.ToProto().GetWantlist(), str) { t.Fail() } + m.ToProto().GetWantlist().GetEntries() } func TestNewMessageFromProto(t *testing.T) { const str = "a_key" protoMessage := new(pb.Message) - protoMessage.Wantlist = []string{string(str)} - if !contains(protoMessage.Wantlist, str) { + protoMessage.Wantlist = new(pb.Message_Wantlist) + protoMessage.Wantlist.Entries = []*pb.Message_Wantlist_Entry{ + &pb.Message_Wantlist_Entry{Block: proto.String(str)}, + } + if !wantlistContains(protoMessage.Wantlist, str) { t.Fail() } m := newMessageFromProto(*protoMessage) - if !contains(m.ToProto().GetWantlist(), str) { + if !wantlistContains(m.ToProto().GetWantlist(), str) { t.Fail() } } @@ -57,7 +63,7 @@ func TestWantlist(t *testing.T) { keystrs := []string{"foo", "bar", "baz", "bat"} m := New() for _, s := range keystrs { - m.AddWanted(u.Key(s)) + m.AddEntry(u.Key(s), 1, false) } exported := m.Wantlist() @@ -65,12 +71,12 @@ func TestWantlist(t *testing.T) { present := false for _, s := range keystrs { - if s == string(k) { + if s == string(k.Key) { present = true } } if !present { - t.Logf("%v isn't in original list", string(k)) + t.Logf("%v isn't in original list", k.Key) t.Fail() } } @@ -80,19 +86,19 @@ func TestCopyProtoByValue(t *testing.T) { const str = "foo" m := New() protoBeforeAppend := m.ToProto() - m.AddWanted(u.Key(str)) - if contains(protoBeforeAppend.GetWantlist(), str) { + m.AddEntry(u.Key(str), 1, false) + if wantlistContains(protoBeforeAppend.GetWantlist(), str) { t.Fail() } } func TestToNetFromNetPreservesWantList(t *testing.T) { original := New() - original.AddWanted(u.Key("M")) - original.AddWanted(u.Key("B")) - original.AddWanted(u.Key("D")) - original.AddWanted(u.Key("T")) - original.AddWanted(u.Key("F")) + original.AddEntry(u.Key("M"), 1, false) + original.AddEntry(u.Key("B"), 1, false) + original.AddEntry(u.Key("D"), 1, false) + original.AddEntry(u.Key("T"), 1, false) + original.AddEntry(u.Key("F"), 1, false) var buf bytes.Buffer if err := original.ToNet(&buf); err != nil { @@ -106,11 +112,11 @@ func TestToNetFromNetPreservesWantList(t *testing.T) { keys := make(map[u.Key]bool) for _, k := range copied.Wantlist() { - keys[k] = true + keys[k.Key] = true } for _, k := range original.Wantlist() { - if _, ok := keys[k]; !ok { + if _, ok := keys[k.Key]; !ok { t.Fatalf("Key Missing: \"%v\"", k) } } @@ -146,9 +152,18 @@ func TestToAndFromNetMessage(t *testing.T) { } } -func contains(s []string, x string) bool { - for _, a := range s { - if a == x { +func wantlistContains(wantlist *pb.Message_Wantlist, x string) bool { + for _, e := range wantlist.GetEntries() { + if e.GetBlock() == x { + return true + } + } + return false +} + +func contains(strs []string, x string) bool { + for _, s := range strs { + if s == x { return true } } @@ -159,8 +174,8 @@ func TestDuplicates(t *testing.T) { b := blocks.NewBlock([]byte("foo")) msg := New() - msg.AddWanted(b.Key()) - msg.AddWanted(b.Key()) + msg.AddEntry(b.Key(), 1, false) + msg.AddEntry(b.Key(), 1, false) if len(msg.Wantlist()) != 1 { t.Fatal("Duplicate in BitSwapMessage") } diff --git a/exchange/bitswap/strategy/interface.go b/exchange/bitswap/strategy/interface.go index 58385f5b7..c74b58c42 100644 --- a/exchange/bitswap/strategy/interface.go +++ b/exchange/bitswap/strategy/interface.go @@ -3,6 +3,7 @@ package strategy import ( "time" + bstore "github.com/jbenet/go-ipfs/blocks/blockstore" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" @@ -34,6 +35,8 @@ type Strategy interface { BlockSentToPeer(u.Key, peer.Peer) + GetAllocation(int, bstore.Blockstore) ([]*Task, error) + // Values determining bitswap behavioural patterns GetBatchSize() int GetRebroadcastDelay() time.Duration diff --git a/exchange/bitswap/strategy/ledger.go b/exchange/bitswap/strategy/ledger.go index 84e92d035..7ce7b73d9 100644 --- a/exchange/bitswap/strategy/ledger.go +++ b/exchange/bitswap/strategy/ledger.go @@ -3,6 +3,7 @@ package strategy import ( "time" + wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" ) @@ -13,7 +14,7 @@ type keySet map[u.Key]struct{} func newLedger(p peer.Peer, strategy strategyFunc) *ledger { return &ledger{ - wantList: keySet{}, + wantList: wl.NewWantlist(), Strategy: strategy, Partner: p, sentToPeer: make(map[u.Key]time.Time), @@ -39,7 +40,7 @@ type ledger struct { exchangeCount uint64 // wantList is a (bounded, small) set of keys that Partner desires. - wantList keySet + wantList *wl.Wantlist // sentToPeer is a set of keys to ensure we dont send duplicate blocks // to a given peer @@ -65,14 +66,17 @@ func (l *ledger) ReceivedBytes(n int) { } // TODO: this needs to be different. We need timeouts. -func (l *ledger) Wants(k u.Key) { +func (l *ledger) Wants(k u.Key, priority int) { log.Debugf("peer %s wants %s", l.Partner, k) - l.wantList[k] = struct{}{} + l.wantList.Add(k, priority) +} + +func (l *ledger) CancelWant(k u.Key) { + l.wantList.Remove(k) } func (l *ledger) WantListContains(k u.Key) bool { - _, ok := l.wantList[k] - return ok + return l.wantList.Contains(k) } func (l *ledger) ExchangeCount() uint64 { diff --git a/exchange/bitswap/strategy/strategy.go b/exchange/bitswap/strategy/strategy.go index fe7414caa..b21a3b2b1 100644 --- a/exchange/bitswap/strategy/strategy.go +++ b/exchange/bitswap/strategy/strategy.go @@ -5,7 +5,10 @@ import ( "sync" "time" + blocks "github.com/jbenet/go-ipfs/blocks" + bstore "github.com/jbenet/go-ipfs/blocks/blockstore" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" + wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" ) @@ -77,6 +80,60 @@ func (s *strategist) ShouldSendBlockToPeer(k u.Key, p peer.Peer) bool { return ledger.ShouldSend() } +type Task struct { + Peer peer.Peer + Blocks []*blocks.Block +} + +func (s *strategist) GetAllocation(bandwidth int, bs bstore.Blockstore) ([]*Task, error) { + var tasks []*Task + + s.lock.RLock() + defer s.lock.RUnlock() + var partners []peer.Peer + for _, ledger := range s.ledgerMap { + if ledger.ShouldSend() { + partners = append(partners, ledger.Partner) + } + } + if len(partners) == 0 { + return nil, nil + } + + bandwidthPerPeer := bandwidth / len(partners) + for _, p := range partners { + blksForPeer, err := s.getSendableBlocks(s.ledger(p).wantList, bs, bandwidthPerPeer) + if err != nil { + return nil, err + } + tasks = append(tasks, &Task{ + Peer: p, + Blocks: blksForPeer, + }) + } + + return tasks, nil +} + +func (s *strategist) getSendableBlocks(wantlist *wl.Wantlist, bs bstore.Blockstore, bw int) ([]*blocks.Block, error) { + var outblocks []*blocks.Block + for _, e := range wantlist.Entries() { + block, err := bs.Get(e.Value) + if err == u.ErrNotFound { + continue + } + if err != nil { + return nil, err + } + outblocks = append(outblocks, block) + bw -= len(block.Data) + if bw <= 0 { + break + } + } + return outblocks, nil +} + func (s *strategist) BlockSentToPeer(k u.Key, p peer.Peer) { s.lock.Lock() defer s.lock.Unlock() @@ -106,8 +163,15 @@ func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error return errors.New("Strategy received nil message") } l := s.ledger(p) - for _, key := range m.Wantlist() { - l.Wants(key) + if m.Full() { + l.wantList = wl.NewWantlist() + } + for _, e := range m.Wantlist() { + if e.Cancel { + l.CancelWant(e.Key) + } else { + l.Wants(e.Key, e.Priority) + } } for _, block := range m.Blocks() { // FIXME extract blocks.NumBytes(block) or block.NumBytes() method @@ -165,5 +229,5 @@ func (s *strategist) GetBatchSize() int { } func (s *strategist) GetRebroadcastDelay() time.Duration { - return time.Second * 5 + return time.Second * 10 } diff --git a/exchange/bitswap/strategy/strategy_test.go b/exchange/bitswap/strategy/strategy_test.go index e063dff68..687ea4d34 100644 --- a/exchange/bitswap/strategy/strategy_test.go +++ b/exchange/bitswap/strategy/strategy_test.go @@ -61,7 +61,7 @@ func TestBlockRecordedAsWantedAfterMessageReceived(t *testing.T) { block := blocks.NewBlock([]byte("data wanted by beggar")) messageFromBeggarToChooser := message.New() - messageFromBeggarToChooser.AddWanted(block.Key()) + messageFromBeggarToChooser.AddEntry(block.Key(), 1, false) chooser.MessageReceived(beggar.Peer, messageFromBeggarToChooser) // for this test, doesn't matter if you record that beggar sent