diff --git a/net/mux/Makefile b/net/mux/internal/pb/Makefile similarity index 100% rename from net/mux/Makefile rename to net/mux/internal/pb/Makefile diff --git a/net/mux/mux.pb.go b/net/mux/internal/pb/mux.pb.go similarity index 100% rename from net/mux/mux.pb.go rename to net/mux/internal/pb/mux.pb.go diff --git a/net/mux/mux.proto b/net/mux/internal/pb/mux.proto similarity index 100% rename from net/mux/mux.proto rename to net/mux/internal/pb/mux.proto diff --git a/net/mux/mux.go b/net/mux/mux.go index ab325ecd5..e61ac4231 100644 --- a/net/mux/mux.go +++ b/net/mux/mux.go @@ -5,6 +5,7 @@ import ( "sync" msg "github.com/jbenet/go-ipfs/net/message" + pb "github.com/jbenet/go-ipfs/net/mux/internal/pb" u "github.com/jbenet/go-ipfs/util" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" @@ -13,6 +14,12 @@ import ( var log = u.Logger("muxer") +var ( + ProtocolID_Routing = pb.ProtocolID_Routing + ProtocolID_Exchange = pb.ProtocolID_Exchange + ProtocolID_Diagnostic = pb.ProtocolID_Diagnostic +) + // Protocol objects produce + consume raw data. They are added to the Muxer // with a ProtocolID, which is added to outgoing payloads. Muxer properly // encapsulates and decapsulates when interfacing with its Protocols. The @@ -22,7 +29,7 @@ type Protocol interface { } // ProtocolMap maps ProtocolIDs to Protocols. -type ProtocolMap map[ProtocolID]Protocol +type ProtocolMap map[pb.ProtocolID]Protocol // Muxer is a simple multiplexor that reads + writes to Incoming and Outgoing // channels. It multiplexes various protocols, wrapping and unwrapping data @@ -107,7 +114,7 @@ func (m *Muxer) Stop() { } // AddProtocol adds a Protocol with given ProtocolID to the Muxer. -func (m *Muxer) AddProtocol(p Protocol, pid ProtocolID) error { +func (m *Muxer) AddProtocol(p Protocol, pid pb.ProtocolID) error { if _, found := m.Protocols[pid]; found { return errors.New("Another protocol already using this ProtocolID") } @@ -170,7 +177,7 @@ func (m *Muxer) handleIncomingMessage(m1 msg.NetMessage) { // handleOutgoingMessages consumes the messages on the proto.Outgoing channel, // wraps them and sends them out. -func (m *Muxer) handleOutgoingMessages(pid ProtocolID, proto Protocol) { +func (m *Muxer) handleOutgoingMessages(pid pb.ProtocolID, proto Protocol) { defer m.wg.Done() for { @@ -188,7 +195,7 @@ func (m *Muxer) handleOutgoingMessages(pid ProtocolID, proto Protocol) { } // handleOutgoingMessage wraps out a message and sends it out the -func (m *Muxer) handleOutgoingMessage(pid ProtocolID, m1 msg.NetMessage) { +func (m *Muxer) handleOutgoingMessage(pid pb.ProtocolID, m1 msg.NetMessage) { data, err := wrapData(m1.Data(), pid) if err != nil { log.Error("muxer serializing error: %v", err) @@ -208,9 +215,9 @@ func (m *Muxer) handleOutgoingMessage(pid ProtocolID, m1 msg.NetMessage) { } } -func wrapData(data []byte, pid ProtocolID) ([]byte, error) { +func wrapData(data []byte, pid pb.ProtocolID) ([]byte, error) { // Marshal - pbm := new(PBProtocolMessage) + pbm := new(pb.PBProtocolMessage) pbm.ProtocolID = &pid pbm.Data = data b, err := proto.Marshal(pbm) @@ -221,9 +228,9 @@ func wrapData(data []byte, pid ProtocolID) ([]byte, error) { return b, nil } -func unwrapData(data []byte) ([]byte, ProtocolID, error) { +func unwrapData(data []byte) ([]byte, pb.ProtocolID, error) { // Unmarshal - pbm := new(PBProtocolMessage) + pbm := new(pb.PBProtocolMessage) err := proto.Unmarshal(data, pbm) if err != nil { return nil, 0, err diff --git a/net/mux/mux_test.go b/net/mux/mux_test.go index 027a6b22d..72187893b 100644 --- a/net/mux/mux_test.go +++ b/net/mux/mux_test.go @@ -8,6 +8,7 @@ import ( mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" msg "github.com/jbenet/go-ipfs/net/message" + pb "github.com/jbenet/go-ipfs/net/mux/internal/pb" peer "github.com/jbenet/go-ipfs/peer" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" @@ -37,7 +38,7 @@ func testMsg(t *testing.T, m msg.NetMessage, data []byte) { } } -func testWrappedMsg(t *testing.T, m msg.NetMessage, pid ProtocolID, data []byte) { +func testWrappedMsg(t *testing.T, m msg.NetMessage, pid pb.ProtocolID, data []byte) { data2, pid2, err := unwrapData(m.Data()) if err != nil { t.Error(err) @@ -57,8 +58,8 @@ func TestSimpleMuxer(t *testing.T) { // setup p1 := &TestProtocol{Pipe: msg.NewPipe(10)} p2 := &TestProtocol{Pipe: msg.NewPipe(10)} - pid1 := ProtocolID_Test - pid2 := ProtocolID_Routing + pid1 := pb.ProtocolID_Test + pid2 := pb.ProtocolID_Routing mux1 := NewMuxer(ProtocolMap{ pid1: p1, pid2: p2, @@ -108,8 +109,8 @@ func TestSimultMuxer(t *testing.T) { // setup p1 := &TestProtocol{Pipe: msg.NewPipe(10)} p2 := &TestProtocol{Pipe: msg.NewPipe(10)} - pid1 := ProtocolID_Test - pid2 := ProtocolID_Identify + pid1 := pb.ProtocolID_Test + pid2 := pb.ProtocolID_Identify mux1 := NewMuxer(ProtocolMap{ pid1: p1, pid2: p2, @@ -127,7 +128,7 @@ func TestSimultMuxer(t *testing.T) { counts := [2][2][2]int{} // run producers at every end sending incrementing messages - produceOut := func(pid ProtocolID, size int) { + produceOut := func(pid pb.ProtocolID, size int) { limiter := time.Tick(speed) for i := 0; i < size; i++ { <-limiter @@ -139,7 +140,7 @@ func TestSimultMuxer(t *testing.T) { } } - produceIn := func(pid ProtocolID, size int) { + produceIn := func(pid pb.ProtocolID, size int) { limiter := time.Tick(speed) for i := 0; i < size; i++ { <-limiter @@ -175,7 +176,7 @@ func TestSimultMuxer(t *testing.T) { } } - consumeIn := func(pid ProtocolID) { + consumeIn := func(pid pb.ProtocolID) { for { select { case m := <-mux1.Protocols[pid].GetPipe().Incoming: @@ -217,8 +218,8 @@ func TestStopping(t *testing.T) { // setup p1 := &TestProtocol{Pipe: msg.NewPipe(10)} p2 := &TestProtocol{Pipe: msg.NewPipe(10)} - pid1 := ProtocolID_Test - pid2 := ProtocolID_Identify + pid1 := pb.ProtocolID_Test + pid2 := pb.ProtocolID_Identify mux1 := NewMuxer(ProtocolMap{ pid1: p1, pid2: p2,