From fcff5a5c96541f0c26621c087cbf8940da01a4e5 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 28 Aug 2014 16:48:00 -0700 Subject: [PATCH] rework bitswap to reflect discussion on PR #32 --- bitswap/bitswap.go | 167 +++++++++++++++++++---------------- bitswap/ledger.go | 15 ++-- bitswap/message.go | 41 --------- bitswap/message.pb.go | 96 +++----------------- bitswap/message.proto | 14 +-- bitswap/strategy.go | 8 +- blockservice/blockservice.go | 2 +- routing/dht/Message.go | 2 +- routing/dht/messages.pb.go | 27 +++--- routing/dht/messages.proto | 2 +- swarm/mes_listener.go | 26 +++--- 11 files changed, 149 insertions(+), 251 deletions(-) delete mode 100644 bitswap/message.go diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index e643e3bf4..d71bc347a 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -52,6 +52,8 @@ type BitSwap struct { // wantList is the set of keys we want values for. a map for fast lookups. wantList KeySet + strategy StrategyFunc + haltChan chan struct{} } @@ -87,15 +89,11 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) ( go func() { for p := range provs_ch { go func(pr *peer.Peer) { - ledger := bs.GetLedger(pr) blk, err := bs.getBlock(k, pr, tleft) if err != nil { u.PErr("getBlock returned: %v\n", err) return } - // NOTE: this credits everyone who sends us a block, - // even if we dont use it - ledger.ReceivedBytes(uint64(len(blk))) select { case valchan <- blk: default: @@ -115,30 +113,18 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) ( func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byte, error) { u.DOut("[%s] getBlock '%s' from [%s]\n", bs.peer.ID.Pretty(), k.Pretty(), p.ID.Pretty()) - // - mes := new(PBMessage) - mes.Id = proto.Uint64(swarm.GenerateMessageID()) - mes.Key = proto.String(string(k)) - typ := PBMessage_GET_BLOCK - mes.Type = &typ - // + + pmes := new(PBMessage) + pmes.Wantlist = []string{string(k)} after := time.After(timeout) - resp := bs.listener.Listen(mes.GetId(), 1, timeout) - smes := swarm.NewMessage(p, mes) + resp := bs.listener.Listen(string(k), 1, timeout) + smes := swarm.NewMessage(p, pmes) bs.meschan.Outgoing <- smes select { case resp_mes := <-resp: - pmes := new(PBMessage) - err := proto.Unmarshal(resp_mes.Data, pmes) - if err != nil { - return nil, err - } - if pmes.GetSuccess() { - return pmes.GetValue(), nil - } - return nil, u.ErrNotFound + return resp_mes.Data, nil case <-after: u.PErr("getBlock for '%s' timed out.\n", k) return nil, u.ErrTimeout @@ -147,8 +133,26 @@ func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byt // HaveBlock announces the existance of a block to BitSwap, potentially sending // it to peers (Partners) whose WantLists include it. -func (bs *BitSwap) HaveBlock(k u.Key) error { - return bs.routing.Provide(k) +func (bs *BitSwap) HaveBlock(blk *blocks.Block) error { + go func() { + for _, ledger := range bs.partners { + if _, ok := ledger.WantList[blk.Key()]; ok { + //send block to node + if ledger.ShouldSend() { + bs.SendBlock(ledger.Partner, blk) + } + } + } + }() + return bs.routing.Provide(blk.Key()) +} + +func (bs *BitSwap) SendBlock(p *peer.Peer, b *blocks.Block) { + pmes := new(PBMessage) + pmes.Blocks = [][]byte{b.Data} + + swarm_mes := swarm.NewMessage(p, pmes) + bs.meschan.Outgoing <- swarm_mes } func (bs *BitSwap) handleMessages() { @@ -161,18 +165,21 @@ func (bs *BitSwap) handleMessages() { u.PErr("%v\n", err) continue } - if pmes.GetResponse() { - bs.listener.Respond(pmes.GetId(), mes) - continue + if pmes.Blocks != nil { + for _, blkData := range pmes.Blocks { + blk, err := blocks.NewBlock(blkData) + if err != nil { + u.PErr("%v\n", err) + continue + } + go bs.blockReceive(mes.Peer, blk) + } } - switch pmes.GetType() { - case PBMessage_GET_BLOCK: - go bs.handleGetBlock(mes.Peer, pmes) - case PBMessage_WANT_BLOCK: - go bs.handleWantBlock(mes.Peer, pmes) - default: - u.PErr("Invalid message type.\n") + if pmes.Wantlist != nil { + for _, want := range pmes.Wantlist { + go bs.peerWantsBlock(mes.Peer, want) + } } case <-bs.haltChan: return @@ -180,50 +187,57 @@ func (bs *BitSwap) handleMessages() { } } -func (bs *BitSwap) handleWantBlock(p *peer.Peer, pmes *PBMessage) { - wants := pmes.GetWantlist() +// peerWantsBlock will check if we have the block in question, +// and then if we do, check the ledger for whether or not we should send it. +func (bs *BitSwap) peerWantsBlock(p *peer.Peer, want string) { + u.DOut("peer [%s] wants block [%s]\n", p.ID.Pretty(), u.Key(want).Pretty()) ledg := bs.GetLedger(p) - for _, s := range wants { - // TODO: this needs to be different. We need timeouts. - ledg.WantList[u.Key(s)] = struct{}{} + + dsk := ds.NewKey(want) + blk_i, err := bs.datastore.Get(dsk) + if err != nil { + if err == ds.ErrNotFound { + // TODO: this needs to be different. We need timeouts. + ledg.WantList[u.Key(want)] = struct{}{} + } + u.PErr("datastore get error: %v\n", err) + return + } + + blk, ok := blk_i.([]byte) + if !ok { + u.PErr("data conversion error.\n") + return + } + + if ledg.ShouldSend() { + u.DOut("Sending block to peer.\n") + bblk, err := blocks.NewBlock(blk) + if err != nil { + u.PErr("newBlock error: %v\n", err) + return + } + bs.SendBlock(p, bblk) + ledg.SentBytes(len(blk)) } } -func (bs *BitSwap) handleGetBlock(p *peer.Peer, pmes *PBMessage) { - u.DOut("handleGetBlock.\n") - ledger := bs.GetLedger(p) - - u.DOut("finding [%s] in datastore.\n", u.Key(pmes.GetKey()).Pretty()) - idata, err := bs.datastore.Get(ds.NewKey(pmes.GetKey())) +func (bs *BitSwap) blockReceive(p *peer.Peer, blk *blocks.Block) { + u.DOut("blockReceive: %s\n", blk.Key().Pretty()) + err := bs.datastore.Put(ds.NewKey(string(blk.Key())), blk.Data) if err != nil { - u.PErr("handleGetBlock datastore returned: %v\n", err) - if err == ds.ErrNotFound { - return - } + u.PErr("blockReceive error: %v\n", err) return } - u.DOut("found value!\n") - data, ok := idata.([]byte) - if !ok { - u.PErr("Failed casting data from datastore.") - return + mes := &swarm.Message{ + Peer: p, + Data: blk.Data, } + bs.listener.Respond(string(blk.Key()), mes) - if ledger.ShouldSend() { - u.DOut("Sending value back!\n") - resp := &Message{ - Value: data, - Response: true, - ID: pmes.GetId(), - Type: PBMessage_GET_BLOCK, - Success: true, - } - bs.meschan.Outgoing <- swarm.NewMessage(p, resp.ToProtobuf()) - ledger.SentBytes(uint64(len(data))) - } else { - u.DOut("Ledger decided not to send anything...\n") - } + ledger := bs.GetLedger(p) + ledger.ReceivedBytes(len(blk.Data)) } func (bs *BitSwap) GetLedger(p *peer.Peer) *Ledger { @@ -240,16 +254,14 @@ func (bs *BitSwap) GetLedger(p *peer.Peer) *Ledger { } func (bs *BitSwap) SendWantList(wl KeySet) error { - mes := Message{ - ID: swarm.GenerateMessageID(), - Type: PBMessage_WANT_BLOCK, - WantList: bs.wantList, + pmes := new(PBMessage) + for k, _ := range wl { + pmes.Wantlist = append(pmes.Wantlist, string(k)) } - pbmes := mes.ToProtobuf() // Lets just ping everybody all at once for _, ledger := range bs.partners { - bs.meschan.Outgoing <- swarm.NewMessage(ledger.Partner, pbmes) + bs.meschan.Outgoing <- swarm.NewMessage(ledger.Partner, pmes) } return nil @@ -258,3 +270,10 @@ func (bs *BitSwap) SendWantList(wl KeySet) error { func (bs *BitSwap) Halt() { bs.haltChan <- struct{}{} } + +func (bs *BitSwap) SetStrategy(sf StrategyFunc) { + bs.strategy = sf + for _, ledg := range bs.partners { + ledg.Strategy = sf + } +} diff --git a/bitswap/ledger.go b/bitswap/ledger.go index f94001771..a0f23b8d4 100644 --- a/bitswap/ledger.go +++ b/bitswap/ledger.go @@ -22,6 +22,9 @@ type Ledger struct { // LastExchange is the time of the last data exchange. LastExchange time.Time + // Number of exchanges with this peer + ExchangeCount uint64 + // WantList is a (bounded, small) set of keys that Partner desires. WantList KeySet @@ -32,15 +35,17 @@ type Ledger struct { type LedgerMap map[u.Key]*Ledger func (l *Ledger) ShouldSend() bool { - return l.Strategy(l.Accounting) + return l.Strategy(l) } -func (l *Ledger) SentBytes(n uint64) { +func (l *Ledger) SentBytes(n int) { + l.ExchangeCount++ l.LastExchange = time.Now() - l.Accounting.BytesSent += n + l.Accounting.BytesSent += uint64(n) } -func (l *Ledger) ReceivedBytes(n uint64) { +func (l *Ledger) ReceivedBytes(n int) { + l.ExchangeCount++ l.LastExchange = time.Now() - l.Accounting.BytesRecv += n + l.Accounting.BytesRecv += uint64(n) } diff --git a/bitswap/message.go b/bitswap/message.go deleted file mode 100644 index 0211ae71e..000000000 --- a/bitswap/message.go +++ /dev/null @@ -1,41 +0,0 @@ -package bitswap - -import ( - "code.google.com/p/goprotobuf/proto" - u "github.com/jbenet/go-ipfs/util" -) - -type Message struct { - Type PBMessage_MessageType - ID uint64 - Response bool - Key u.Key - Value []byte - Success bool - WantList KeySet -} - -func (m *Message) ToProtobuf() *PBMessage { - pmes := new(PBMessage) - pmes.Id = &m.ID - pmes.Type = &m.Type - if m.Response { - pmes.Response = proto.Bool(true) - } - - if m.Success { - pmes.Success = proto.Bool(true) - } - - if m.WantList != nil { - var swant []string - for k, _ := range m.WantList { - swant = append(swant, string(k)) - } - pmes.Wantlist = swant - } - - pmes.Key = proto.String(string(m.Key)) - pmes.Value = m.Value - return pmes -} diff --git a/bitswap/message.pb.go b/bitswap/message.pb.go index e187b9bea..5f15d98f5 100644 --- a/bitswap/message.pb.go +++ b/bitswap/message.pb.go @@ -20,96 +20,16 @@ import math "math" var _ = proto.Marshal var _ = math.Inf -type PBMessage_MessageType int32 - -const ( - PBMessage_GET_BLOCK PBMessage_MessageType = 0 - PBMessage_WANT_BLOCK PBMessage_MessageType = 1 -) - -var PBMessage_MessageType_name = map[int32]string{ - 0: "GET_BLOCK", - 1: "WANT_BLOCK", -} -var PBMessage_MessageType_value = map[string]int32{ - "GET_BLOCK": 0, - "WANT_BLOCK": 1, -} - -func (x PBMessage_MessageType) Enum() *PBMessage_MessageType { - p := new(PBMessage_MessageType) - *p = x - return p -} -func (x PBMessage_MessageType) String() string { - return proto.EnumName(PBMessage_MessageType_name, int32(x)) -} -func (x *PBMessage_MessageType) UnmarshalJSON(data []byte) error { - value, err := proto.UnmarshalJSONEnum(PBMessage_MessageType_value, data, "PBMessage_MessageType") - if err != nil { - return err - } - *x = PBMessage_MessageType(value) - return nil -} - type PBMessage struct { - Type *PBMessage_MessageType `protobuf:"varint,1,req,enum=bitswap.PBMessage_MessageType" json:"Type,omitempty"` - Id *uint64 `protobuf:"varint,2,req,name=id" json:"id,omitempty"` - Key *string `protobuf:"bytes,3,req,name=key" json:"key,omitempty"` - Value []byte `protobuf:"bytes,4,opt,name=value" json:"value,omitempty"` - Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"` - Success *bool `protobuf:"varint,6,opt,name=success" json:"success,omitempty"` - Wantlist []string `protobuf:"bytes,7,rep,name=wantlist" json:"wantlist,omitempty"` - XXX_unrecognized []byte `json:"-"` + Wantlist []string `protobuf:"bytes,1,rep,name=wantlist" json:"wantlist,omitempty"` + Blocks [][]byte `protobuf:"bytes,2,rep,name=blocks" json:"blocks,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *PBMessage) Reset() { *m = PBMessage{} } func (m *PBMessage) String() string { return proto.CompactTextString(m) } func (*PBMessage) ProtoMessage() {} -func (m *PBMessage) GetType() PBMessage_MessageType { - if m != nil && m.Type != nil { - return *m.Type - } - return PBMessage_GET_BLOCK -} - -func (m *PBMessage) GetId() uint64 { - if m != nil && m.Id != nil { - return *m.Id - } - return 0 -} - -func (m *PBMessage) GetKey() string { - if m != nil && m.Key != nil { - return *m.Key - } - return "" -} - -func (m *PBMessage) GetValue() []byte { - if m != nil { - return m.Value - } - return nil -} - -func (m *PBMessage) GetResponse() bool { - if m != nil && m.Response != nil { - return *m.Response - } - return false -} - -func (m *PBMessage) GetSuccess() bool { - if m != nil && m.Success != nil { - return *m.Success - } - return false -} - func (m *PBMessage) GetWantlist() []string { if m != nil { return m.Wantlist @@ -117,6 +37,12 @@ func (m *PBMessage) GetWantlist() []string { return nil } -func init() { - proto.RegisterEnum("bitswap.PBMessage_MessageType", PBMessage_MessageType_name, PBMessage_MessageType_value) +func (m *PBMessage) GetBlocks() [][]byte { + if m != nil { + return m.Blocks + } + return nil +} + +func init() { } diff --git a/bitswap/message.proto b/bitswap/message.proto index a99ed7ebc..b025ac3c3 100644 --- a/bitswap/message.proto +++ b/bitswap/message.proto @@ -1,16 +1,6 @@ package bitswap; message PBMessage { - enum MessageType { - GET_BLOCK = 0; - WANT_BLOCK = 1; - } - - required MessageType Type = 1; - required uint64 id = 2; - required string key = 3; - optional bytes value = 4; - optional bool response = 5; - optional bool success = 6; - repeated string wantlist = 7; + repeated string wantlist = 1; + repeated bytes blocks = 2; } diff --git a/bitswap/strategy.go b/bitswap/strategy.go index 6e33de7fe..c216a35c3 100644 --- a/bitswap/strategy.go +++ b/bitswap/strategy.go @@ -5,13 +5,13 @@ import ( "math/rand" ) -type StrategyFunc func(debtRatio) bool +type StrategyFunc func(*Ledger) bool -func StandardStrategy(db debtRatio) bool { - return rand.Float64() <= probabilitySend(db.Value()) +func StandardStrategy(l *Ledger) bool { + return rand.Float64() <= probabilitySend(l.Accounting.Value()) } -func YesManStrategy(db debtRatio) bool { +func YesManStrategy(l *Ledger) bool { return true } diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index eb59151cd..a2924d73f 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -40,7 +40,7 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) { return k, err } if s.Remote != nil { - err = s.Remote.HaveBlock(b.Key()) + err = s.Remote.HaveBlock(b) } return k, err } diff --git a/routing/dht/Message.go b/routing/dht/Message.go index 20c311d80..4163fb485 100644 --- a/routing/dht/Message.go +++ b/routing/dht/Message.go @@ -11,7 +11,7 @@ type Message struct { Key string Value []byte Response bool - ID uint64 + ID string Success bool Peers []*peer.Peer } diff --git a/routing/dht/messages.pb.go b/routing/dht/messages.pb.go index 7c337d306..a2452dc28 100644 --- a/routing/dht/messages.pb.go +++ b/routing/dht/messages.pb.go @@ -1,4 +1,4 @@ -// Code generated by protoc-gen-gogo. +// Code generated by protoc-gen-go. // source: messages.proto // DO NOT EDIT! @@ -13,7 +13,7 @@ It has these top-level messages: */ package dht -import proto "code.google.com/p/gogoprotobuf/proto" +import proto "code.google.com/p/goprotobuf/proto" import math "math" // Reference imports to suppress errors if they are not otherwise used. @@ -69,17 +69,14 @@ func (x *PBDHTMessage_MessageType) UnmarshalJSON(data []byte) error { } type PBDHTMessage struct { - Type *PBDHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.PBDHTMessage_MessageType" json:"type,omitempty"` - Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"` - Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"` - // Unique ID of this message, used to match queries with responses - Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"` - // Signals whether or not this message is a response to another message - Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"` - Success *bool `protobuf:"varint,6,opt,name=success" json:"success,omitempty"` - // Used for returning peers from queries (normally, peers closer to X) - Peers []*PBDHTMessage_PBPeer `protobuf:"bytes,7,rep,name=peers" json:"peers,omitempty"` - XXX_unrecognized []byte `json:"-"` + Type *PBDHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.PBDHTMessage_MessageType" json:"type,omitempty"` + Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"` + Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"` + Id *string `protobuf:"bytes,4,req,name=id" json:"id,omitempty"` + Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"` + Success *bool `protobuf:"varint,6,opt,name=success" json:"success,omitempty"` + Peers []*PBDHTMessage_PBPeer `protobuf:"bytes,7,rep,name=peers" json:"peers,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *PBDHTMessage) Reset() { *m = PBDHTMessage{} } @@ -107,11 +104,11 @@ func (m *PBDHTMessage) GetValue() []byte { return nil } -func (m *PBDHTMessage) GetId() uint64 { +func (m *PBDHTMessage) GetId() string { if m != nil && m.Id != nil { return *m.Id } - return 0 + return "" } func (m *PBDHTMessage) GetResponse() bool { diff --git a/routing/dht/messages.proto b/routing/dht/messages.proto index 4d4e8c61f..c2c5cc30d 100644 --- a/routing/dht/messages.proto +++ b/routing/dht/messages.proto @@ -23,7 +23,7 @@ message PBDHTMessage { optional bytes value = 3; // Unique ID of this message, used to match queries with responses - required uint64 id = 4; + required string id = 4; // Signals whether or not this message is a response to another message optional bool response = 5; diff --git a/swarm/mes_listener.go b/swarm/mes_listener.go index 712d1999f..97cabe810 100644 --- a/swarm/mes_listener.go +++ b/swarm/mes_listener.go @@ -1,7 +1,7 @@ package swarm import ( - "math/rand" + crand "crypto/rand" "sync" "time" @@ -9,16 +9,18 @@ import ( ) type MessageListener struct { - listeners map[uint64]*listenInfo + listeners map[string]*listenInfo haltchan chan struct{} - unlist chan uint64 + unlist chan string nlist chan *listenInfo send chan *respMes } // GenerateMessageID creates and returns a new message ID -func GenerateMessageID() uint64 { - return (uint64(rand.Uint32()) << 32) | uint64(rand.Uint32()) +func GenerateMessageID() string { + buf := make([]byte, 16) + crand.Read(buf) + return string(buf) } // The listen info struct holds information about a message that is being waited for @@ -38,21 +40,21 @@ type listenInfo struct { closed bool - id uint64 + id string } func NewMessageListener() *MessageListener { ml := new(MessageListener) ml.haltchan = make(chan struct{}) - ml.listeners = make(map[uint64]*listenInfo) + ml.listeners = make(map[string]*listenInfo) ml.nlist = make(chan *listenInfo, 16) ml.send = make(chan *respMes, 16) - ml.unlist = make(chan uint64, 16) + ml.unlist = make(chan string, 16) go ml.run() return ml } -func (ml *MessageListener) Listen(id uint64, count int, timeout time.Duration) <-chan *Message { +func (ml *MessageListener) Listen(id string, count int, timeout time.Duration) <-chan *Message { li := new(listenInfo) li.count = count li.eol = time.Now().Add(timeout) @@ -62,16 +64,16 @@ func (ml *MessageListener) Listen(id uint64, count int, timeout time.Duration) < return li.resp } -func (ml *MessageListener) Unlisten(id uint64) { +func (ml *MessageListener) Unlisten(id string) { ml.unlist <- id } type respMes struct { - id uint64 + id string mes *Message } -func (ml *MessageListener) Respond(id uint64, mes *Message) { +func (ml *MessageListener) Respond(id string, mes *Message) { ml.send <- &respMes{ id: id, mes: mes,