From 4c95eb15b20cb0c0da1de5cda8a4c367dcff4c88 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Fri, 12 Sep 2014 17:58:12 -0700 Subject: [PATCH] refactor(net:message) add NetMessage interface * Design Goal: reduce coupling * NB: Slices hold references to an underlying array, and if you assign one slice to another, both refer to the same array. If a function takes a slice argument, changes it makes to the elements of the slice will be visible to the caller, analogous to passing a pointer to the underlying array. --- net/message/message.go | 41 ++++++++++++++++++++++++++----------- net/mux/mux.go | 12 +++++------ net/mux/mux_test.go | 34 +++++++++++++++--------------- net/service/request.go | 4 ++-- net/service/service.go | 20 +++++++++--------- net/service/service_test.go | 20 +++++++++--------- 6 files changed, 74 insertions(+), 57 deletions(-) diff --git a/net/message/message.go b/net/message/message.go index e97fbf921..8874dcf34 100644 --- a/net/message/message.go +++ b/net/message/message.go @@ -6,38 +6,55 @@ import ( proto "code.google.com/p/goprotobuf/proto" ) -// Message represents a packet of information sent to or received from a +type NetMessage interface { + Peer() *peer.Peer + Data() []byte +} + +func New(p *peer.Peer, data []byte) NetMessage { + return &message{peer: p, data: data} +} + +// message represents a packet of information sent to or received from a // particular Peer. -type Message struct { +type message struct { // To or from, depending on direction. - Peer *peer.Peer + peer *peer.Peer // Opaque data - Data []byte + data []byte +} + +func (m *message) Peer() *peer.Peer { + return m.peer +} + +func (m *message) Data() []byte { + return m.data } // FromObject creates a message from a protobuf-marshallable message. -func FromObject(p *peer.Peer, data proto.Message) (*Message, error) { +func FromObject(p *peer.Peer, data proto.Message) (*message, error) { bytes, err := proto.Marshal(data) if err != nil { return nil, err } - return &Message{ - Peer: p, - Data: bytes, + return &message{ + peer: p, + data: bytes, }, nil } // Pipe objects represent a bi-directional message channel. type Pipe struct { - Incoming chan *Message - Outgoing chan *Message + Incoming chan NetMessage + Outgoing chan NetMessage } // NewPipe constructs a pipe with channels of a given buffer size. func NewPipe(bufsize int) *Pipe { return &Pipe{ - Incoming: make(chan *Message, bufsize), - Outgoing: make(chan *Message, bufsize), + Incoming: make(chan NetMessage, bufsize), + Outgoing: make(chan NetMessage, bufsize), } } diff --git a/net/mux/mux.go b/net/mux/mux.go index cd3c2f807..98ab10f59 100644 --- a/net/mux/mux.go +++ b/net/mux/mux.go @@ -87,15 +87,15 @@ func (m *Muxer) handleIncomingMessages(ctx context.Context) { } // handleIncomingMessage routes message to the appropriate protocol. -func (m *Muxer) handleIncomingMessage(ctx context.Context, m1 *msg.Message) { +func (m *Muxer) handleIncomingMessage(ctx context.Context, m1 msg.NetMessage) { - data, pid, err := unwrapData(m1.Data) + data, pid, err := unwrapData(m1.Data()) if err != nil { u.PErr("muxer de-serializing error: %v\n", err) return } - m2 := &msg.Message{Peer: m1.Peer, Data: data} + m2 := msg.New(m1.Peer(), data) proto, found := m.Protocols[pid] if !found { u.PErr("muxer unknown protocol %v\n", pid) @@ -125,14 +125,14 @@ func (m *Muxer) handleOutgoingMessages(ctx context.Context, pid ProtocolID, prot } // handleOutgoingMessage wraps out a message and sends it out the -func (m *Muxer) handleOutgoingMessage(ctx context.Context, pid ProtocolID, m1 *msg.Message) { - data, err := wrapData(m1.Data, pid) +func (m *Muxer) handleOutgoingMessage(ctx context.Context, pid ProtocolID, m1 msg.NetMessage) { + data, err := wrapData(m1.Data(), pid) if err != nil { u.PErr("muxer serializing error: %v\n", err) return } - m2 := &msg.Message{Peer: m1.Peer, Data: data} + m2 := msg.New(m1.Peer(), data) select { case m.GetPipe().Outgoing <- m2: case <-ctx.Done(): diff --git a/net/mux/mux_test.go b/net/mux/mux_test.go index 4446952dc..3e03f7667 100644 --- a/net/mux/mux_test.go +++ b/net/mux/mux_test.go @@ -32,14 +32,14 @@ func newPeer(t *testing.T, id string) *peer.Peer { return &peer.Peer{ID: peer.ID(mh)} } -func testMsg(t *testing.T, m *msg.Message, data []byte) { - if !bytes.Equal(data, m.Data) { - t.Errorf("Data does not match: %v != %v", data, m.Data) +func testMsg(t *testing.T, m msg.NetMessage, data []byte) { + if !bytes.Equal(data, m.Data()) { + t.Errorf("Data does not match: %v != %v", data, m.Data()) } } -func testWrappedMsg(t *testing.T, m *msg.Message, pid ProtocolID, data []byte) { - data2, pid2, err := unwrapData(m.Data) +func testWrappedMsg(t *testing.T, m msg.NetMessage, pid ProtocolID, data []byte) { + data2, pid2, err := unwrapData(m.Data()) if err != nil { t.Error(err) } @@ -76,7 +76,7 @@ func TestSimpleMuxer(t *testing.T) { // test outgoing p1 for _, s := range []string{"foo", "bar", "baz"} { - p1.Outgoing <- &msg.Message{Peer: peer1, Data: []byte(s)} + p1.Outgoing <- msg.New(peer1, []byte(s)) testWrappedMsg(t, <-mux1.Outgoing, pid1, []byte(s)) } @@ -86,13 +86,13 @@ func TestSimpleMuxer(t *testing.T) { if err != nil { t.Error(err) } - mux1.Incoming <- &msg.Message{Peer: peer1, Data: d} + mux1.Incoming <- msg.New(peer1, d) testMsg(t, <-p1.Incoming, []byte(s)) } // test outgoing p2 for _, s := range []string{"foo", "bar", "baz"} { - p2.Outgoing <- &msg.Message{Peer: peer1, Data: []byte(s)} + p2.Outgoing <- msg.New(peer1, []byte(s)) testWrappedMsg(t, <-mux1.Outgoing, pid2, []byte(s)) } @@ -102,7 +102,7 @@ func TestSimpleMuxer(t *testing.T) { if err != nil { t.Error(err) } - mux1.Incoming <- &msg.Message{Peer: peer1, Data: d} + mux1.Incoming <- msg.New(peer1, d) testMsg(t, <-p2.Incoming, []byte(s)) } } @@ -139,7 +139,7 @@ func TestSimultMuxer(t *testing.T) { for i := 0; i < size; i++ { <-limiter s := fmt.Sprintf("proto %v out %v", pid, i) - m := &msg.Message{Peer: peer1, Data: []byte(s)} + m := msg.New(peer1, []byte(s)) mux1.Protocols[pid].GetPipe().Outgoing <- m counts[pid][0][0]++ u.DOut("sent %v\n", s) @@ -156,7 +156,7 @@ func TestSimultMuxer(t *testing.T) { t.Error(err) } - m := &msg.Message{Peer: peer1, Data: d} + m := msg.New(peer1, d) mux1.Incoming <- m counts[pid][1][0]++ u.DOut("sent %v\n", s) @@ -167,7 +167,7 @@ func TestSimultMuxer(t *testing.T) { for { select { case m := <-mux1.Outgoing: - data, pid, err := unwrapData(m.Data) + data, pid, err := unwrapData(m.Data()) if err != nil { t.Error(err) } @@ -186,7 +186,7 @@ func TestSimultMuxer(t *testing.T) { select { case m := <-mux1.Protocols[pid].GetPipe().Incoming: counts[pid][0][1]++ - u.DOut("got %v\n", string(m.Data)) + u.DOut("got %v\n", string(m.Data())) case <-ctx.Done(): return } @@ -239,7 +239,7 @@ func TestStopping(t *testing.T) { // test outgoing p1 for _, s := range []string{"foo", "bar", "baz"} { - p1.Outgoing <- &msg.Message{Peer: peer1, Data: []byte(s)} + p1.Outgoing <- msg.New(peer1, []byte(s)) testWrappedMsg(t, <-mux1.Outgoing, pid1, []byte(s)) } @@ -249,7 +249,7 @@ func TestStopping(t *testing.T) { if err != nil { t.Error(err) } - mux1.Incoming <- &msg.Message{Peer: peer1, Data: d} + mux1.Incoming <- msg.New(peer1, d) testMsg(t, <-p1.Incoming, []byte(s)) } @@ -260,7 +260,7 @@ func TestStopping(t *testing.T) { // test outgoing p1 for _, s := range []string{"foo", "bar", "baz"} { - p1.Outgoing <- &msg.Message{Peer: peer1, Data: []byte(s)} + p1.Outgoing <- msg.New(peer1, []byte(s)) select { case <-mux1.Outgoing: t.Error("should not have received anything.") @@ -274,7 +274,7 @@ func TestStopping(t *testing.T) { if err != nil { t.Error(err) } - mux1.Incoming <- &msg.Message{Peer: peer1, Data: d} + mux1.Incoming <- msg.New(peer1, d) select { case <-p1.Incoming: t.Error("should not have received anything.") diff --git a/net/service/request.go b/net/service/request.go index b29be79a3..aeb739801 100644 --- a/net/service/request.go +++ b/net/service/request.go @@ -75,7 +75,7 @@ type Request struct { PeerID peer.ID // Response is the channel of incoming responses. - Response chan *msg.Message + Response chan msg.NetMessage } // NewRequest creates a request for given peer.ID @@ -88,7 +88,7 @@ func NewRequest(pid peer.ID) (*Request, error) { return &Request{ ID: id, PeerID: pid, - Response: make(chan *msg.Message, 1), + Response: make(chan msg.NetMessage, 1), }, nil } diff --git a/net/service/service.go b/net/service/service.go index 4e67ca9dc..33510caa0 100644 --- a/net/service/service.go +++ b/net/service/service.go @@ -16,7 +16,7 @@ type Handler interface { // HandleMessage receives an incoming message, and potentially returns // a response message to send back. - HandleMessage(context.Context, *msg.Message) (*msg.Message, error) + HandleMessage(context.Context, msg.NetMessage) (msg.NetMessage, error) } // Service is a networking component that protocols can use to multiplex @@ -69,16 +69,16 @@ func (s *Service) Stop() { } // SendMessage sends a message out -func (s *Service) SendMessage(ctx context.Context, m *msg.Message, rid RequestID) error { +func (s *Service) SendMessage(ctx context.Context, m msg.NetMessage, rid RequestID) error { // serialize ServiceMessage wrapper - data, err := wrapData(m.Data, rid) + data, err := wrapData(m.Data(), rid) if err != nil { return err } // send message - m2 := &msg.Message{Peer: m.Peer, Data: data} + m2 := msg.New(m.Peer(), data) select { case s.Outgoing <- m2: case <-ctx.Done(): @@ -89,10 +89,10 @@ func (s *Service) SendMessage(ctx context.Context, m *msg.Message, rid RequestID } // SendRequest sends a request message out and awaits a response. -func (s *Service) SendRequest(ctx context.Context, m *msg.Message) (*msg.Message, error) { +func (s *Service) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) { // create a request - r, err := NewRequest(m.Peer.ID) + r, err := NewRequest(m.Peer().ID) if err != nil { return nil, err } @@ -145,14 +145,14 @@ func (s *Service) handleIncomingMessages(ctx context.Context) { } } -func (s *Service) handleIncomingMessage(ctx context.Context, m *msg.Message) { +func (s *Service) handleIncomingMessage(ctx context.Context, m msg.NetMessage) { // unwrap the incoming message - data, rid, err := unwrapData(m.Data) + data, rid, err := unwrapData(m.Data()) if err != nil { u.PErr("de-serializing error: %v\n", err) } - m2 := &msg.Message{Peer: m.Peer, Data: data} + m2 := msg.New(m.Peer(), data) // if it's a request (or has no RequestID), handle it if rid == nil || rid.IsRequest() { @@ -177,7 +177,7 @@ func (s *Service) handleIncomingMessage(ctx context.Context, m *msg.Message) { u.PErr("RequestID should identify a response here.\n") } - key := RequestKey(m.Peer.ID, RequestID(rid)) + key := RequestKey(m.Peer().ID, RequestID(rid)) s.RequestsLock.RLock() r, found := s.Requests[key] s.RequestsLock.RUnlock() diff --git a/net/service/service_test.go b/net/service/service_test.go index 138e61763..fe73b6a8f 100644 --- a/net/service/service_test.go +++ b/net/service/service_test.go @@ -15,15 +15,15 @@ import ( // ReverseHandler reverses all Data it receives and sends it back. type ReverseHandler struct{} -func (t *ReverseHandler) HandleMessage(ctx context.Context, m *msg.Message) ( - *msg.Message, error) { +func (t *ReverseHandler) HandleMessage(ctx context.Context, m msg.NetMessage) ( + msg.NetMessage, error) { - d := m.Data + d := m.Data() for i, j := 0, len(d)-1; i < j; i, j = i+1, j-1 { d[i], d[j] = d[j], d[i] } - return &msg.Message{Peer: m.Peer, Data: d}, nil + return msg.New(m.Peer(), d), nil } func newPeer(t *testing.T, id string) *peer.Peer { @@ -47,11 +47,11 @@ func TestServiceHandler(t *testing.T) { t.Error(err) } - m1 := &msg.Message{Peer: peer1, Data: d} + m1 := msg.New(peer1, d) s.Incoming <- m1 m2 := <-s.Outgoing - d, rid, err := unwrapData(m2.Data) + d, rid, err := unwrapData(m2.Data()) if err != nil { t.Error(err) } @@ -85,14 +85,14 @@ func TestServiceRequest(t *testing.T) { } }() - m1 := &msg.Message{Peer: peer1, Data: []byte("beep")} + m1 := msg.New(peer1, []byte("beep")) m2, err := s1.SendRequest(ctx, m1) if err != nil { t.Error(err) } - if !bytes.Equal(m2.Data, []byte("peeb")) { - t.Errorf("service handler data incorrect: %v != %v", m2.Data, "oof") + if !bytes.Equal(m2.Data(), []byte("peeb")) { + t.Errorf("service handler data incorrect: %v != %v", m2.Data(), "oof") } } @@ -117,7 +117,7 @@ func TestServiceRequestTimeout(t *testing.T) { } }() - m1 := &msg.Message{Peer: peer1, Data: []byte("beep")} + m1 := msg.New(peer1, []byte("beep")) m2, err := s1.SendRequest(ctx, m1) if err == nil || m2 != nil { t.Error("should've timed out")