From 05b80afc3576e72f2fa471833f5851f615ea441a Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 20 Aug 2014 16:51:03 -0700 Subject: [PATCH] fix swarm message type code, i beleive it works well now --- routing/dht/dht.go | 25 +++++++------- routing/dht/ext_test.go | 4 +-- routing/dht/routing.go | 6 ++-- swarm/interface.go | 5 ++- swarm/mes_wrapper.pb.go | 11 +++--- swarm/mes_wrapper.proto | 3 +- swarm/swarm.go | 75 +++++++++++++++++++---------------------- swarm/swarm_test.go | 9 +++-- swarm/wrapper.go | 24 +++++++++++++ 9 files changed, 93 insertions(+), 69 deletions(-) create mode 100644 swarm/wrapper.go diff --git a/routing/dht/dht.go b/routing/dht/dht.go index b0a2a0481..4c0751aa5 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -28,6 +28,7 @@ type IpfsDHT struct { routingTables []*kb.RoutingTable network swarm.Network + netChan *swarm.Chan // Local peer (yourself) self *peer.Peer @@ -55,6 +56,7 @@ type IpfsDHT struct { func NewDHT(p *peer.Peer, net swarm.Network) *IpfsDHT { dht := new(IpfsDHT) dht.network = net + dht.netChan = net.GetChannel(swarm.PBWrapper_DHT_MESSAGE) dht.datastore = ds.NewMapDatastore() dht.self = p dht.providers = NewProviderManager() @@ -101,10 +103,9 @@ func (dht *IpfsDHT) handleMessages() { u.DOut("Begin message handling routine\n") errs := dht.network.GetErrChan() - dhtmes := dht.network.GetChannel(swarm.PBWrapper_DHT_MESSAGE) for { select { - case mes, ok := <-dhtmes: + case mes, ok := <-dht.netChan.Incoming: if !ok { u.DOut("handleMessages closing, bad recv on incoming\n") return @@ -165,7 +166,7 @@ func (dht *IpfsDHT) putValueToNetwork(p *peer.Peer, key string, value []byte) er } mes := swarm.NewMessage(p, pmes.ToProtobuf()) - dht.network.Send(mes) + dht.netChan.Outgoing <- mes return nil } @@ -225,7 +226,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) { out: mes := swarm.NewMessage(p, resp.ToProtobuf()) - dht.network.Send(mes) + dht.netChan.Outgoing <- mes } // Store a value in this peer local storage @@ -247,7 +248,7 @@ func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *PBDHTMessage) { ID: pmes.GetId(), } - dht.network.Send(swarm.NewMessage(p, resp.ToProtobuf())) + dht.netChan.Outgoing <- swarm.NewMessage(p, resp.ToProtobuf()) } func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) { @@ -258,7 +259,7 @@ func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) { } defer func() { mes := swarm.NewMessage(p, resp.ToProtobuf()) - dht.network.Send(mes) + dht.netChan.Outgoing <- mes }() level := pmes.GetValue()[0] u.DOut("handleFindPeer: searching for '%s'\n", peer.ID(pmes.GetKey()).Pretty()) @@ -310,7 +311,7 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) { } mes := swarm.NewMessage(p, resp.ToProtobuf()) - dht.network.Send(mes) + dht.netChan.Outgoing <- mes } type providerInfo struct { @@ -336,7 +337,7 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) { for _, ps := range seq { mes := swarm.NewMessage(ps, pmes) - dht.network.Send(mes) + dht.netChan.Outgoing <- mes } buf := new(bytes.Buffer) @@ -372,7 +373,7 @@ out: } mes := swarm.NewMessage(p, resp.ToProtobuf()) - dht.network.Send(mes) + dht.netChan.Outgoing <- mes } func (dht *IpfsDHT) getValueOrPeers(p *peer.Peer, key u.Key, timeout time.Duration, level int) ([]byte, []*peer.Peer, error) { @@ -429,7 +430,7 @@ func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duratio mes := swarm.NewMessage(p, pmes.ToProtobuf()) t := time.Now() - dht.network.Send(mes) + dht.netChan.Outgoing <- mes // Wait for either the response or a timeout timeup := time.After(timeout) @@ -545,7 +546,7 @@ func (dht *IpfsDHT) findPeerSingle(p *peer.Peer, id peer.ID, timeout time.Durati mes := swarm.NewMessage(p, pmes.ToProtobuf()) listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute) t := time.Now() - dht.network.Send(mes) + dht.netChan.Outgoing <- mes after := time.After(timeout) select { case <-after: @@ -581,7 +582,7 @@ func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, time mes := swarm.NewMessage(p, pmes.ToProtobuf()) listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute) - dht.network.Send(mes) + dht.netChan.Outgoing <- mes after := time.After(timeout) select { case <-after: diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go index 79cfd27bc..6e034c69d 100644 --- a/routing/dht/ext_test.go +++ b/routing/dht/ext_test.go @@ -70,8 +70,8 @@ func (f *fauxNet) GetErrChan() chan error { return f.Chan.Errors } -func (f *fauxNet) GetChannel(t swarm.PBWrapper_MessageType) chan *swarm.Message { - return f.Chan.Incoming +func (f *fauxNet) GetChannel(t swarm.PBWrapper_MessageType) *swarm.Chan { + return f.Chan } func (f *fauxNet) Connect(addr *ma.Multiaddr) (*peer.Peer, error) { diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 3a4ebd33d..3c4ad9e00 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -240,7 +240,7 @@ func (dht *IpfsDHT) Provide(key u.Key) error { for _, p := range peers { mes := swarm.NewMessage(p, pbmes) - dht.network.Send(mes) + dht.netChan.Outgoing <- mes } return nil } @@ -352,7 +352,7 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error { before := time.Now() responseChan := dht.listener.Listen(pmes.ID, 1, time.Minute) - dht.network.Send(mes) + dht.netChan.Outgoing <- mes tout := time.After(timeout) select { @@ -385,7 +385,7 @@ func (dht *IpfsDHT) getDiagnostic(timeout time.Duration) ([]*diagInfo, error) { pbmes := pmes.ToProtobuf() for _, p := range targets { mes := swarm.NewMessage(p, pbmes) - dht.network.Send(mes) + dht.netChan.Outgoing <- mes } var out []*diagInfo diff --git a/swarm/interface.go b/swarm/interface.go index 3bfcd233b..c9655a91c 100644 --- a/swarm/interface.go +++ b/swarm/interface.go @@ -8,14 +8,13 @@ import ( ) type Network interface { - Send(*Message) - Error(error) Find(u.Key) *peer.Peer Listen() error ConnectNew(*ma.Multiaddr) (*peer.Peer, error) GetConnection(id peer.ID, addr *ma.Multiaddr) (*peer.Peer, error) + Error(error) GetErrChan() chan error - GetChannel(PBWrapper_MessageType) chan *Message + GetChannel(PBWrapper_MessageType) *Chan Close() Drop(*peer.Peer) error } diff --git a/swarm/mes_wrapper.pb.go b/swarm/mes_wrapper.pb.go index 7d51ec0de..45856634a 100644 --- a/swarm/mes_wrapper.pb.go +++ b/swarm/mes_wrapper.pb.go @@ -23,14 +23,17 @@ var _ = math.Inf type PBWrapper_MessageType int32 const ( - PBWrapper_DHT_MESSAGE PBWrapper_MessageType = 0 + PBWrapper_TEST PBWrapper_MessageType = 0 + PBWrapper_DHT_MESSAGE PBWrapper_MessageType = 1 ) var PBWrapper_MessageType_name = map[int32]string{ - 0: "DHT_MESSAGE", + 0: "TEST", + 1: "DHT_MESSAGE", } var PBWrapper_MessageType_value = map[string]int32{ - "DHT_MESSAGE": 0, + "TEST": 0, + "DHT_MESSAGE": 1, } func (x PBWrapper_MessageType) Enum() *PBWrapper_MessageType { @@ -64,7 +67,7 @@ func (m *PBWrapper) GetType() PBWrapper_MessageType { if m != nil && m.Type != nil { return *m.Type } - return PBWrapper_DHT_MESSAGE + return PBWrapper_TEST } func (m *PBWrapper) GetMessage() []byte { diff --git a/swarm/mes_wrapper.proto b/swarm/mes_wrapper.proto index 64490b0fb..5690380e5 100644 --- a/swarm/mes_wrapper.proto +++ b/swarm/mes_wrapper.proto @@ -2,7 +2,8 @@ package swarm; message PBWrapper { enum MessageType { - DHT_MESSAGE = 0; + TEST = 0; + DHT_MESSAGE = 1; } required MessageType Type = 1; diff --git a/swarm/swarm.go b/swarm/swarm.go index 926aa8910..cc57dbefb 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -38,7 +38,7 @@ func NewMessage(p *peer.Peer, data proto.Message) *Message { } } -// Chan is a swam channel, which provides duplex communication and errors. +// Chan is a swarm channel, which provides duplex communication and errors. type Chan struct { Outgoing chan *Message Incoming chan *Message @@ -84,7 +84,7 @@ type Swarm struct { conns ConnMap connsLock sync.RWMutex - filterChans map[PBWrapper_MessageType]chan *Message + filterChans map[PBWrapper_MessageType]*Chan toFilter chan *Message newFilters chan *newFilterInfo @@ -98,7 +98,7 @@ func NewSwarm(local *peer.Peer) *Swarm { Chan: NewChan(10), conns: ConnMap{}, local: local, - filterChans: make(map[PBWrapper_MessageType]chan *Message), + filterChans: make(map[PBWrapper_MessageType]*Chan), toFilter: make(chan *Message, 32), newFilters: make(chan *newFilterInfo), } @@ -233,6 +233,8 @@ func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error, bool) { return conn, nil, false } +// StartConn adds the passed in connection to its peerMap and starts +// the fanIn routine for that connection func (s *Swarm) StartConn(conn *Conn) error { if conn == nil { return errors.New("Tried to start nil connection.") @@ -275,14 +277,8 @@ func (s *Swarm) fanOut() { continue } - wrapped, err := Wrap(msg.Data, PBWrapper_DHT_MESSAGE) - if err != nil { - s.Error(err) - continue - } - // queue it in the connection's buffer - conn.Outgoing.MsgChan <- wrapped + conn.Outgoing.MsgChan <- msg.Data } } } @@ -320,7 +316,7 @@ out: type newFilterInfo struct { Type PBWrapper_MessageType - resp chan chan *Message + resp chan *Chan } func (s *Swarm) routeMessages() { @@ -342,15 +338,36 @@ func (s *Swarm) routeMessages() { } mes.Data = wrapper.GetMessage() - ch <- mes + ch.Incoming <- mes case gchan := <-s.newFilters: - nch := make(chan *Message) - s.filterChans[gchan.Type] = nch + nch, ok := s.filterChans[gchan.Type] + if !ok { + nch = NewChan(16) + s.filterChans[gchan.Type] = nch + go s.muxChan(nch, gchan.Type) + } gchan.resp <- nch } } } +func (s *Swarm) muxChan(ch *Chan, typ PBWrapper_MessageType) { + for { + select { + case <-ch.Close: + return + case mes := <-ch.Outgoing: + data, err := Wrap(mes.Data, typ) + if err != nil { + u.PErr("muxChan error: %s\n", err) + continue + } + mes.Data = data + s.Chan.Outgoing <- mes + } + } +} + func (s *Swarm) Find(key u.Key) *peer.Peer { s.connsLock.RLock() defer s.connsLock.RUnlock() @@ -386,6 +403,7 @@ func (s *Swarm) GetConnection(id peer.ID, addr *ma.Multiaddr) (*peer.Peer, error return conn.Peer, err } +// Handle performing a handshake on a new connection and ensuring proper forward communication func (s *Swarm) handleDialedCon(conn *Conn) error { err := ident.Handshake(s.local, conn.Peer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan) if err != nil { @@ -440,10 +458,6 @@ func (s *Swarm) Drop(p *peer.Peer) error { return conn.Close() } -func (s *Swarm) Send(mes *Message) { - s.Chan.Outgoing <- mes -} - func (s *Swarm) Error(e error) { s.Chan.Errors <- e } @@ -452,31 +466,10 @@ func (s *Swarm) GetErrChan() chan error { return s.Chan.Errors } -func Wrap(data []byte, typ PBWrapper_MessageType) ([]byte, error) { - wrapper := new(PBWrapper) - wrapper.Message = data - wrapper.Type = &typ - b, err := proto.Marshal(wrapper) - if err != nil { - return nil, err - } - return b, nil -} - -func Unwrap(data []byte) (*PBWrapper, error) { - mes := new(PBWrapper) - err := proto.Unmarshal(data, mes) - if err != nil { - return nil, err - } - - return mes, nil -} - -func (s *Swarm) GetChannel(typ PBWrapper_MessageType) chan *Message { +func (s *Swarm) GetChannel(typ PBWrapper_MessageType) *Chan { nfi := &newFilterInfo{ Type: typ, - resp: make(chan chan *Message), + resp: make(chan *Chan), } s.newFilters <- nfi diff --git a/swarm/swarm_test.go b/swarm/swarm_test.go index 2760e9a80..4991e0f64 100644 --- a/swarm/swarm_test.go +++ b/swarm/swarm_test.go @@ -38,7 +38,8 @@ func pong(c net.Conn, peer *peer.Peer) { fmt.Printf("error: didn't receive ping: '%v'\n", b.GetMessage()) return } - data, err = Wrap([]byte("pong"), PBWrapper_DHT_MESSAGE) + + data, err = Wrap([]byte("pong"), PBWrapper_TEST) if err != nil { fmt.Printf("error %v\n", err) return @@ -63,6 +64,7 @@ func TestSwarm(t *testing.T) { "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33": "/ip4/127.0.0.1/tcp/4567", } + recv := swarm.GetChannel(PBWrapper_TEST) for k, n := range peerNames { peer, err := setupPeer(k, n) if err != nil { @@ -96,13 +98,14 @@ func TestSwarm(t *testing.T) { MsgNum := 1000 for k := 0; k < MsgNum; k++ { for _, p := range peers { - swarm.Chan.Outgoing <- &Message{Peer: p, Data: []byte("ping")} + recv.Outgoing <- &Message{Peer: p, Data: []byte("ping")} } } got := map[u.Key]int{} + for k := 0; k < (MsgNum * len(peers)); k++ { - msg := <-swarm.Chan.Incoming + msg := <-recv.Incoming if string(msg.Data) != "pong" { t.Error("unexpected conn output", msg.Data) } diff --git a/swarm/wrapper.go b/swarm/wrapper.go new file mode 100644 index 000000000..52ffc7765 --- /dev/null +++ b/swarm/wrapper.go @@ -0,0 +1,24 @@ +package swarm + +import "code.google.com/p/goprotobuf/proto" + +func Wrap(data []byte, typ PBWrapper_MessageType) ([]byte, error) { + wrapper := new(PBWrapper) + wrapper.Message = data + wrapper.Type = &typ + b, err := proto.Marshal(wrapper) + if err != nil { + return nil, err + } + return b, nil +} + +func Unwrap(data []byte) (*PBWrapper, error) { + mes := new(PBWrapper) + err := proto.Unmarshal(data, mes) + if err != nil { + return nil, err + } + + return mes, nil +}