diff --git a/net/message/message.go b/net/message/message.go deleted file mode 100644 index b12de4281..000000000 --- a/net/message/message.go +++ /dev/null @@ -1,90 +0,0 @@ -package message - -import ( - "errors" - - peer "github.com/jbenet/go-ipfs/peer" - - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" - router "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-router" -) - -// ErrInvalidPayload is an error used in the router.HandlePacket implementations -var ErrInvalidPayload = errors.New("invalid packet: non-[]byte payload") - -// Packet is used inside the network package to represent a message -// flowing across the subsystems (Conn, Swarm, Mux, Service). -// implements router.Packet -type Packet struct { - Src router.Address // peer.ID or service string - Dst router.Address // peer.ID or service string - Data []byte // raw data - Context context.Context // context of the Packet. -} - -func (p *Packet) Destination() router.Address { - return p.Dst -} - -func (p *Packet) Payload() interface{} { - return p.Data -} - -func (p *Packet) Response(data []byte) Packet { - return Packet{ - Src: p.Dst, - Dst: p.Src, - Data: data, - Context: p.Context, - } -} - -// NetMessage is the interface for the message -type NetMessage interface { - Peer() peer.Peer - Data() []byte - Loggable() map[string]interface{} -} - -// New is the interface for constructing a new message. -func New(p peer.Peer, data []byte) NetMessage { - return &message{peer: p, data: data} -} - -// message represents a packet of information sent to or received from a -// particular Peer. -type message struct { - // To or from, depending on direction. - peer peer.Peer - - // Opaque data - data []byte -} - -func (m *message) Peer() peer.Peer { - return m.peer -} - -func (m *message) Data() []byte { - return m.data -} - -func (m *message) Loggable() map[string]interface{} { - return map[string]interface{}{ - "netMessage": map[string]interface{}{ - "recipient": m.Peer().Loggable(), - // TODO sizeBytes? bytes? lenBytes? - "size": len(m.Data()), - }, - } -} - -// FromObject creates a message from a protobuf-marshallable message. -func FromObject(p peer.Peer, data proto.Message) (NetMessage, error) { - bytes, err := proto.Marshal(data) - if err != nil { - return nil, err - } - return New(p, bytes), nil -} diff --git a/net/mux/internal/pb/Makefile b/net/mux/internal/pb/Makefile deleted file mode 100644 index 334feee74..000000000 --- a/net/mux/internal/pb/Makefile +++ /dev/null @@ -1,10 +0,0 @@ -PB = $(wildcard *.proto) -GO = $(PB:.proto=.pb.go) - -all: $(GO) - -%.pb.go: %.proto - protoc --gogo_out=. --proto_path=../../../../../../:/usr/local/opt/protobuf/include:. $< - -clean: - rm *.pb.go diff --git a/net/mux/internal/pb/mux.pb.go b/net/mux/internal/pb/mux.pb.go deleted file mode 100644 index bedb47a53..000000000 --- a/net/mux/internal/pb/mux.pb.go +++ /dev/null @@ -1,91 +0,0 @@ -// Code generated by protoc-gen-gogo. -// source: mux.proto -// DO NOT EDIT! - -/* -Package mux_pb is a generated protocol buffer package. - -It is generated from these files: - mux.proto - -It has these top-level messages: - PBProtocolMessage -*/ -package mux_pb - -import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto" -import math "math" - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = math.Inf - -type ProtocolID int32 - -const ( - ProtocolID_Test ProtocolID = 0 - ProtocolID_Identify ProtocolID = 1 - ProtocolID_Routing ProtocolID = 2 - ProtocolID_Exchange ProtocolID = 3 - ProtocolID_Diagnostic ProtocolID = 4 -) - -var ProtocolID_name = map[int32]string{ - 0: "Test", - 1: "Identify", - 2: "Routing", - 3: "Exchange", - 4: "Diagnostic", -} -var ProtocolID_value = map[string]int32{ - "Test": 0, - "Identify": 1, - "Routing": 2, - "Exchange": 3, - "Diagnostic": 4, -} - -func (x ProtocolID) Enum() *ProtocolID { - p := new(ProtocolID) - *p = x - return p -} -func (x ProtocolID) String() string { - return proto.EnumName(ProtocolID_name, int32(x)) -} -func (x *ProtocolID) UnmarshalJSON(data []byte) error { - value, err := proto.UnmarshalJSONEnum(ProtocolID_value, data, "ProtocolID") - if err != nil { - return err - } - *x = ProtocolID(value) - return nil -} - -type PBProtocolMessage struct { - ProtocolID *ProtocolID `protobuf:"varint,1,req,enum=mux.pb.ProtocolID" json:"ProtocolID,omitempty"` - Data []byte `protobuf:"bytes,2,req" json:"Data,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *PBProtocolMessage) Reset() { *m = PBProtocolMessage{} } -func (m *PBProtocolMessage) String() string { return proto.CompactTextString(m) } -func (*PBProtocolMessage) ProtoMessage() {} - -func (m *PBProtocolMessage) GetProtocolID() ProtocolID { - if m != nil && m.ProtocolID != nil { - return *m.ProtocolID - } - return ProtocolID_Test -} - -func (m *PBProtocolMessage) GetData() []byte { - if m != nil { - return m.Data - } - return nil -} - -func init() { - proto.RegisterEnum("mux.pb.ProtocolID", ProtocolID_name, ProtocolID_value) -} diff --git a/net/mux/internal/pb/mux.proto b/net/mux/internal/pb/mux.proto deleted file mode 100644 index 841718877..000000000 --- a/net/mux/internal/pb/mux.proto +++ /dev/null @@ -1,14 +0,0 @@ -package mux.pb; - -enum ProtocolID { - Test = 0; - Identify = 1; // setup - Routing = 2; // dht - Exchange = 3; // bitswap - Diagnostic = 4; -} - -message PBProtocolMessage { - required ProtocolID ProtocolID = 1; - required bytes Data = 2; -} diff --git a/net/mux/mux.go b/net/mux/mux.go deleted file mode 100644 index b835c0e9c..000000000 --- a/net/mux/mux.go +++ /dev/null @@ -1,217 +0,0 @@ -// package mux implements a protocol muxer. -package mux - -import ( - "errors" - "fmt" - "sync" - - msg "github.com/jbenet/go-ipfs/net/message" - pb "github.com/jbenet/go-ipfs/net/mux/internal/pb" - u "github.com/jbenet/go-ipfs/util" - - proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" - router "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-router" -) - -var log = u.Logger("muxer") - -// ProtocolIDs used to identify each protocol. -// These should probably be defined elsewhere. -var ( - ProtocolID_Routing = pb.ProtocolID_Routing - ProtocolID_Exchange = pb.ProtocolID_Exchange - ProtocolID_Diagnostic = pb.ProtocolID_Diagnostic -) - -// Protocol objects produce + consume raw data. They are added to the Muxer -// with a ProtocolID, which is added to outgoing payloads. Muxer properly -// encapsulates and decapsulates when interfacing with its Protocols. The -// Protocols do not encounter their ProtocolID. -type Protocol interface { - ProtocolID() pb.ProtocolID - - // Node is a router.Node, for message connectivity. - router.Node -} - -// ProtocolMap maps ProtocolIDs to Protocols. -type ProtocolMap map[pb.ProtocolID]Protocol - -// Muxer is a simple multiplexor that reads + writes to Incoming and Outgoing -// channels. It multiplexes various protocols, wrapping and unwrapping data -// with a ProtocolID. -// -// implements router.Node and router.Route -type Muxer struct { - local router.Address - uplink router.Node - - // Protocols are the multiplexed services. - Protocols ProtocolMap - mapLock sync.Mutex - - bwiLock sync.Mutex - bwIn uint64 - msgIn uint64 - - bwoLock sync.Mutex - bwOut uint64 - msgOut uint64 -} - -// NewMuxer constructs a muxer given a protocol map. -// uplink is a Node to send all outgoing traffic to. -func NewMuxer(local router.Address, uplink router.Node) *Muxer { - return &Muxer{ - local: local, - uplink: uplink, - Protocols: ProtocolMap{}, - } -} - -// GetMessageCounts return the in/out message count measured over this muxer. -func (m *Muxer) GetMessageCounts() (in uint64, out uint64) { - m.bwiLock.Lock() - in = m.msgIn - m.bwiLock.Unlock() - - m.bwoLock.Lock() - out = m.msgOut - m.bwoLock.Unlock() - return -} - -// GetBandwidthTotals return the in/out bandwidth measured over this muxer. -func (m *Muxer) GetBandwidthTotals() (in uint64, out uint64) { - m.bwiLock.Lock() - in = m.bwIn - m.bwiLock.Unlock() - - m.bwoLock.Lock() - out = m.bwOut - m.bwoLock.Unlock() - return -} - -// AddProtocol adds a Protocol with given ProtocolID to the Muxer. -func (m *Muxer) AddProtocol(p Protocol, pid pb.ProtocolID) error { - m.mapLock.Lock() - defer m.mapLock.Unlock() - - if _, found := m.Protocols[pid]; found { - return errors.New("Another protocol already using this ProtocolID") - } - - m.Protocols[pid] = p - return nil -} - -func (m *Muxer) Address() router.Address { - return m.local -} - -func (m *Muxer) HandlePacket(p router.Packet, from router.Node) error { - pkt, ok := p.(*msg.Packet) - if !ok { - return msg.ErrInvalidPayload - } - - if from == m.uplink { - return m.handleIncomingPacket(pkt, from) - } else { - return m.handleOutgoingPacket(pkt, from) - } -} - -// handleIncomingPacket routes message to the appropriate protocol. -func (m *Muxer) handleIncomingPacket(p *msg.Packet, _ router.Node) error { - - m.bwiLock.Lock() - // TODO: compensate for overhead - m.bwIn += uint64(len(p.Data)) - m.msgIn++ - m.bwiLock.Unlock() - - data, pid, err := unwrapData(p.Data) - if err != nil { - return fmt.Errorf("muxer de-serializing error: %v", err) - } - - // TODO: fix this when mpool is fixed. - // conn.ReleaseBuffer(m1.Data()) - - p.Data = data - - m.mapLock.Lock() - proto, found := m.Protocols[pid] - m.mapLock.Unlock() - - if !found { - return fmt.Errorf("muxer: unknown protocol %v", pid) - } - - log.Debugf("muxer: outgoing packet %d -> %s", proto.ProtocolID(), m.uplink.Address()) - return proto.HandlePacket(p, m) -} - -// handleOutgoingMessages sends out messages to the outside world -func (m *Muxer) handleOutgoingPacket(p *msg.Packet, from router.Node) error { - - var pid pb.ProtocolID - var proto Protocol - m.mapLock.Lock() - for pid2, proto2 := range m.Protocols { - if proto2 == from { - pid = pid2 - proto = proto2 - break - } - } - m.mapLock.Unlock() - - if proto == nil { - return errors.New("muxer: packet sent from unknown protocol") - } - - var err error - p.Data, err = wrapData(p.Data, pid) - if err != nil { - return fmt.Errorf("muxer serializing error: %v", err) - } - - m.bwoLock.Lock() - // TODO: compensate for overhead - // TODO(jbenet): switch this to a goroutine to prevent sync waiting. - m.bwOut += uint64(len(p.Data)) - m.msgOut++ - m.bwoLock.Unlock() - - // TODO: add multiple uplinks - log.Debugf("muxer: incoming packet %s -> %d", m.uplink.Address(), proto.ProtocolID()) - return m.uplink.HandlePacket(p, m) -} - -func wrapData(data []byte, pid pb.ProtocolID) ([]byte, error) { - // Marshal - pbm := new(pb.PBProtocolMessage) - pbm.ProtocolID = &pid - pbm.Data = data - b, err := proto.Marshal(pbm) - if err != nil { - return nil, err - } - - return b, nil -} - -func unwrapData(data []byte) ([]byte, pb.ProtocolID, error) { - // Unmarshal - pbm := new(pb.PBProtocolMessage) - err := proto.Unmarshal(data, pbm) - if err != nil { - return nil, 0, err - } - - return pbm.GetData(), pbm.GetProtocolID(), nil -} diff --git a/net/mux/mux_test.go b/net/mux/mux_test.go deleted file mode 100644 index 6cfd223d8..000000000 --- a/net/mux/mux_test.go +++ /dev/null @@ -1,137 +0,0 @@ -package mux - -import ( - "bytes" - "testing" - - mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" - msg "github.com/jbenet/go-ipfs/net/message" - pb "github.com/jbenet/go-ipfs/net/mux/internal/pb" - peer "github.com/jbenet/go-ipfs/peer" - testutil "github.com/jbenet/go-ipfs/util/testutil" - - router "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-router" -) - -type TestProtocol struct { - mux *Muxer - pid pb.ProtocolID - msg []*msg.Packet -} - -func (t *TestProtocol) ProtocolID() pb.ProtocolID { - return t.pid -} - -func (t *TestProtocol) Address() router.Address { - return t.pid -} - -func (t *TestProtocol) HandlePacket(p router.Packet, from router.Node) error { - pkt, ok := p.(*msg.Packet) - if !ok { - return msg.ErrInvalidPayload - } - - log.Debugf("TestProtocol %d got: %v", t, p) - if from == t.mux { - t.msg = append(t.msg, pkt) - return nil - } - return t.mux.HandlePacket(p, t) -} - -func newPeer(t *testing.T, id string) peer.Peer { - mh, err := mh.FromHexString(id) - if err != nil { - t.Error(err) - return nil - } - - return testutil.NewPeerWithID(peer.ID(mh)) -} - -func testMsg(t *testing.T, m *msg.Packet, data []byte) { - if !bytes.Equal(data, m.Data) { - t.Errorf("Data does not match: %v != %v", data, m.Data) - } -} - -func testWrappedMsg(t *testing.T, m *msg.Packet, pid pb.ProtocolID, data []byte) { - data2, pid2, err := unwrapData(m.Data) - if err != nil { - t.Error(err) - } - - if pid != pid2 { - t.Errorf("ProtocolIDs do not match: %v != %v", pid, pid2) - } - - if !bytes.Equal(data, data2) { - t.Errorf("Data does not match: %v != %v", data, data2) - } -} - -func TestSimpleMuxer(t *testing.T) { - // setup - peer1 := newPeer(t, "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275aaaaaa") - peer2 := newPeer(t, "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275bbbbbb") - - uplink := router.NewQueueNode("queue", make(chan router.Packet, 10)) - mux1 := NewMuxer(string(peer1.ID()), uplink) - - pid1 := pb.ProtocolID_Test - pid2 := pb.ProtocolID_Routing - p1 := &TestProtocol{mux1, pid1, nil} - p2 := &TestProtocol{mux1, pid2, nil} - mux1.AddProtocol(p1, pid1) - mux1.AddProtocol(p2, pid2) - - // test outgoing p1 - for _, s := range []string{"foo", "bar", "baz"} { - - pkt := msg.Packet{Src: peer1, Dst: peer2, Data: []byte(s)} - if err := p1.HandlePacket(&pkt, nil); err != nil { - t.Fatal(err) - } - testWrappedMsg(t, (<-uplink.Queue()).(*msg.Packet), pid1, []byte(s)) - } - - // test incoming p1 - for i, s := range []string{"foo", "bar", "baz"} { - d, err := wrapData([]byte(s), pid1) - if err != nil { - t.Error(err) - } - - pkt := msg.Packet{Src: peer1, Dst: peer2, Data: d} - if err := mux1.HandlePacket(&pkt, uplink); err != nil { - t.Fatal(err) - } - testMsg(t, p1.msg[i], []byte(s)) - } - - // test outgoing p2 - for _, s := range []string{"foo", "bar", "baz"} { - - pkt := msg.Packet{Src: peer1, Dst: peer2, Data: []byte(s)} - if err := p2.HandlePacket(&pkt, nil); err != nil { - t.Fatal(err) - } - testWrappedMsg(t, (<-uplink.Queue()).(*msg.Packet), pid2, []byte(s)) - } - - // test incoming p2 - for i, s := range []string{"foo", "bar", "baz"} { - d, err := wrapData([]byte(s), pid2) - if err != nil { - t.Fatal(err) - } - - pkt := msg.Packet{Src: peer1, Dst: peer2, Data: d} - if err := mux1.HandlePacket(&pkt, uplink); err != nil { - t.Fatal(err) - } - testMsg(t, p2.msg[i], []byte(s)) - } -} diff --git a/net/service/internal/pb/Makefile b/net/service/internal/pb/Makefile deleted file mode 100644 index 08ac883d0..000000000 --- a/net/service/internal/pb/Makefile +++ /dev/null @@ -1,11 +0,0 @@ -PB = $(wildcard *.proto) -GO = $(PB:.proto=.pb.go) - -all: $(GO) - -%.pb.go: %.proto - protoc --gogo_out=. --proto_path=../../../../../../:/usr/local/opt/protobuf/include:. $< - -clean: - rm -f *.pb.go - rm -f *.go diff --git a/net/service/internal/pb/service.pb.go b/net/service/internal/pb/service.pb.go deleted file mode 100644 index 6d3a3c986..000000000 --- a/net/service/internal/pb/service.pb.go +++ /dev/null @@ -1,48 +0,0 @@ -// Code generated by protoc-gen-gogo. -// source: service.proto -// DO NOT EDIT! - -/* -Package service_pb is a generated protocol buffer package. - -It is generated from these files: - service.proto - -It has these top-level messages: - PBRequest -*/ -package service_pb - -import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto" -import math "math" - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = math.Inf - -type PBRequest struct { - Data []byte `protobuf:"bytes,1,req" json:"Data,omitempty"` - Tag []byte `protobuf:"bytes,3,opt" json:"Tag,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *PBRequest) Reset() { *m = PBRequest{} } -func (m *PBRequest) String() string { return proto.CompactTextString(m) } -func (*PBRequest) ProtoMessage() {} - -func (m *PBRequest) GetData() []byte { - if m != nil { - return m.Data - } - return nil -} - -func (m *PBRequest) GetTag() []byte { - if m != nil { - return m.Tag - } - return nil -} - -func init() { -} diff --git a/net/service/internal/pb/service.proto b/net/service/internal/pb/service.proto deleted file mode 100644 index 48e059f8d..000000000 --- a/net/service/internal/pb/service.proto +++ /dev/null @@ -1,6 +0,0 @@ -package service.pb; - -message PBRequest { - required bytes Data = 1; - optional bytes Tag = 3; -} diff --git a/net/service/request.go b/net/service/request.go deleted file mode 100644 index babf6b4d5..000000000 --- a/net/service/request.go +++ /dev/null @@ -1,128 +0,0 @@ -package service - -import ( - crand "crypto/rand" - - msg "github.com/jbenet/go-ipfs/net/message" - pb "github.com/jbenet/go-ipfs/net/service/internal/pb" - peer "github.com/jbenet/go-ipfs/peer" - - proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" -) - -const ( - // IDSize is the size of the ID in bytes. - IDSize int = 4 -) - -// RequestID is a field that identifies request-response flows. -type RequestID []byte - -// Request turns a RequestID into a Request (unsetting first bit) -func (r RequestID) Request() RequestID { - if r == nil { - return nil - } - r2 := make([]byte, len(r)) - copy(r2, r) - r2[0] = r[0] & 0x7F // unset first bit for request - return RequestID(r2) -} - -// Response turns a RequestID into a Response (setting first bit) -func (r RequestID) Response() RequestID { - if r == nil { - return nil - } - r2 := make([]byte, len(r)) - copy(r2, r) - r2[0] = r[0] | 0x80 // set first bit for response - return RequestID(r2) -} - -// IsRequest returns whether a RequestID identifies a request -func (r RequestID) IsRequest() bool { - if r == nil { - return false - } - return !r.IsResponse() -} - -// IsResponse returns whether a RequestID identifies a response -func (r RequestID) IsResponse() bool { - if r == nil { - return false - } - return bool(r[0]&0x80 == 0x80) -} - -// RandomRequestID creates and returns a new random request ID -func RandomRequestID() (RequestID, error) { - buf := make([]byte, IDSize) - _, err := crand.Read(buf) - return RequestID(buf).Request(), err -} - -// RequestMap is a map of Requests. the key = (peer.ID concat RequestID). -type RequestMap map[string]*Request - -// Request objects are used to multiplex request-response flows. -type Request struct { - - // ID is the RequestID identifying this Request-Response Flow. - ID RequestID - - // PeerID identifies the peer from whom to expect the response. - PeerID peer.ID - - // Response is the channel of incoming responses. - Response chan msg.NetMessage -} - -// NewRequest creates a request for given peer.ID -func NewRequest(pid peer.ID) (*Request, error) { - id, err := RandomRequestID() - if err != nil { - return nil, err - } - - return &Request{ - ID: id, - PeerID: pid, - Response: make(chan msg.NetMessage, 1), - }, nil -} - -// Key returns the RequestKey for this request. Use with maps. -func (r *Request) Key() string { - return RequestKey(r.PeerID, r.ID) -} - -// RequestKey is the peer.ID concatenated with the RequestID. Use with maps. -func RequestKey(pid peer.ID, rid RequestID) string { - return string(pid) + string(rid.Request()[:]) -} - -func wrapData(data []byte, rid RequestID) ([]byte, error) { - // Marshal - pbm := new(pb.PBRequest) - pbm.Data = data - pbm.Tag = rid - b, err := proto.Marshal(pbm) - if err != nil { - return nil, err - } - - return b, nil -} - -func unwrapData(data []byte) ([]byte, RequestID, error) { - // Unmarshal - pbm := new(pb.PBRequest) - err := proto.Unmarshal(data, pbm) - if err != nil { - return nil, nil, err - } - - return pbm.GetData(), pbm.GetTag(), nil -} diff --git a/net/service/request_test.go b/net/service/request_test.go deleted file mode 100644 index 1931f8f63..000000000 --- a/net/service/request_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package service - -import ( - "bytes" - "testing" -) - -func TestMarshaling(t *testing.T) { - - test := func(d1 []byte, rid1 RequestID) { - d2, err := wrapData(d1, rid1) - if err != nil { - t.Error(err) - } - - d3, rid2, err := unwrapData(d2) - if err != nil { - t.Error(err) - } - - d4, err := wrapData(d3, rid1) - if err != nil { - t.Error(err) - } - - if !bytes.Equal(rid2, rid1) { - t.Error("RequestID fail") - } - - if !bytes.Equal(d1, d3) { - t.Error("unmarshalled data should be the same") - } - - if !bytes.Equal(d2, d4) { - t.Error("marshalled data should be the same") - } - } - - test([]byte("foo"), []byte{1, 2, 3, 4}) - test([]byte("bar"), nil) -} diff --git a/net/service/service.go b/net/service/service.go deleted file mode 100644 index f28cd5380..000000000 --- a/net/service/service.go +++ /dev/null @@ -1,304 +0,0 @@ -package service - -import ( - "errors" - "fmt" - "sync" - - msg "github.com/jbenet/go-ipfs/net/message" - u "github.com/jbenet/go-ipfs/util" - - ctxgroup "github.com/jbenet/go-ctxgroup" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - router "github.com/jbenet/go-router" -) - -var log = u.Logger("service") - -// ErrNoResponse is returned by Service when a Request did not get a response, -// and no other error happened -var ErrNoResponse = errors.New("no response to request") - -// Handler is an interface that objects must implement in order to handle -// a service's requests. -type Handler interface { - - // HandleMessage receives an incoming message, and potentially returns - // a response message to send back. - HandleMessage(context.Context, msg.NetMessage) msg.NetMessage -} - -// Sender interface for network services. -type Sender interface { - // SendMessage sends out a given message, without expecting a response. - SendMessage(ctx context.Context, m msg.NetMessage) error - - // SendRequest sends out a given message, and awaits a response. - // Set Deadlines or cancellations in the context.Context you pass in. - SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) -} - -// Service is an interface for a net resource with both outgoing (sender) and -// incomig (SetHandler) requests. -type Service interface { - Sender // can use it to send out msgs - router.Node // it is a Node in the net topology. - - // SetUplink assigns the Node to send packets out - SetUplink(router.Node) - Uplink() router.Node - - // SetHandler assigns the request Handler for this service. - SetHandler(Handler) - GetHandler() Handler -} - -// Service is a networking component that protocols can use to multiplex -// messages over the same channel, and to issue + handle requests. -type service struct { - // Handler is the object registered to handle incoming requests. - Handler Handler - HandlerLock sync.RWMutex - - // Requests are all the pending requests on this service. - Requests RequestMap - RequestsLock sync.RWMutex - - // the connection to the outside world - uplink router.Node - uplinkLock sync.RWMutex - addr router.Address -} - -// NewService creates a service object with given type ID and Handler -func NewService(addr router.Address, uplink router.Node, h Handler) Service { - s := &service{ - Handler: h, - Requests: RequestMap{}, - uplink: uplink, - addr: addr, - } - return s -} - -// sendMessage sends a message out (actual leg work. SendMessage is to export w/o rid) -func (s *service) sendMessage(ctx context.Context, m msg.NetMessage, rid RequestID) error { - - // serialize ServiceMessage wrapper - data, err := wrapData(m.Data(), rid) - if err != nil { - return err - } - - // log.Debug("Service send message [to = %s]", m.Peer()) - - // send message - m2 := msg.New(m.Peer(), data) - - pkt := msg.Packet{ - Src: - } - - select { - case s.Outgoing <- m2: - case <-ctx.Done(): - return ctx.Err() - } - - pkt := msg.Packet{ - Src: m. - } - - return nil -} - -// SendMessage sends a message out -func (s *service) SendMessage(ctx context.Context, m msg.NetMessage) error { - return s.sendMessage(ctx, m, nil) -} - -// SendRequest sends a request message out and awaits a response. -func (s *service) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) { - - // check if we should bail given our contexts - select { - default: - case <-s.Closing(): - return nil, fmt.Errorf("service closed: %s", s.Context().Err()) - case <-ctx.Done(): - return nil, ctx.Err() - } - - // create a request - r, err := NewRequest(m.Peer().ID()) - if err != nil { - return nil, err - } - - // register Request - s.RequestsLock.Lock() - s.Requests[r.Key()] = r - s.RequestsLock.Unlock() - - // defer deleting this request - defer func() { - s.RequestsLock.Lock() - delete(s.Requests, r.Key()) - s.RequestsLock.Unlock() - }() - - // check if we should bail after waiting for mutex - select { - default: - case <-s.Closing(): - return nil, fmt.Errorf("service closed: %s", s.Context().Err()) - case <-ctx.Done(): - return nil, ctx.Err() - } - - // Send message - s.sendMessage(ctx, m, r.ID) - - // wait for response - m = nil - err = nil - select { - case m = <-r.Response: - case <-s.Closed(): - err = fmt.Errorf("service closed: %s", s.Context().Err()) - case <-ctx.Done(): - err = ctx.Err() - } - - if m == nil { - return nil, ErrNoResponse - } - - return m, err -} - -// handleIncoming consumes the messages on the s.Incoming channel and -// routes them appropriately (to requests, or handler). -func (s *service) handleIncomingMessages() { - defer s.Children().Done() - - for { - select { - case m, more := <-s.Incoming: - if !more { - return - } - s.Children().Add(1) - go s.handleIncomingMessage(m) - - case <-s.Closing(): - return - } - } -} - -func (s *service) handleIncomingMessage(pkt *msg.Packet) error { - - // check the packet has a valid Context - ctx := pkt.Context - if ctx == nil { - return fmt.Errorf("service got pkt without valid Context") - } - - // check the source is a peer - srcPeer, ok := pkt.Src.(peer.Peer) - if !ok { - return fmt.Errorf("service got pkt from non-Peer src: %v", pkt.Src) - } - - // unwrap the incoming message - data, rid, err := unwrapData(pkt.Data) - if err != nil { - return fmt.Errorf("service de-serializing error: %v", err) - } - - // convert to msg.NetMessage, which the rest of the system expects. - m2 := msg.New(srcPeer, data) - - // if it's a request (or has no RequestID), handle it - if rid == nil || rid.IsRequest() { - handler := s.GetHandler() - if handler == nil { - log.Errorf("service dropped msg: %v", m) - log.Event() - return nil - // no handler, drop it. - } - - // this go routine is developer friendliness to keep their stacks - // separate (and more readable) from the network goroutine. If - // problems arise and you'd like to see _the full_ stack of where - // this message is coming from, just remove the goroutine part. - response := make(chan msg.NetMessage) - go func() msg.NetMessage { - return handler.HandleMessage(ctx, m2) - }() - r1 := <-response - // Note: HandleMessage *must* respect context. We could co-opt it - // and do a select {} here on the context, BUT that would just drop - // a packet and free up the goroutine to return to the network. the - // problem is still there: the Service handler hasn't returned yet. - - // if handler gave us a response, send it out! - if r1 != nil { - if err := s.sendMessage(ctx, r1, rid.Response()); err != nil { - return fmt.Errorf("error sending response message: %v", err) - } - } - return - } - - // Otherwise, it is a response. handle it. - if !rid.IsResponse() { - log.Errorf("RequestID should identify a response here.") - } - - key := RequestKey(m.Peer().ID(), RequestID(rid)) - s.RequestsLock.RLock() - r, found := s.Requests[key] - s.RequestsLock.RUnlock() - - if !found { - log.Errorf("no request key %v (timeout?)", []byte(key)) - return - } - - select { - case r.Response <- m2: - case <-s.Closing(): - } -} - - -// Address is the router.Node address -func (s *service) Address() router.Address { - return s.addr -} - -// HandlePacket implements router.Node -// service only receives packets in HandlePacket -func (s *service) HandlePacket(p router.Packet, from router.Node) error { - pkt, ok := p.(*msg.Packet) - if !ok { - return msg.ErrInvalidPayload - } -} - -// SetHandler assigns the request Handler for this service. -func (s *service) SetHandler(h Handler) { - s.HandlerLock.Lock() - defer s.HandlerLock.Unlock() - s.Handler = h -} - -// GetHandler returns the request Handler for this service. -func (s *service) GetHandler() Handler { - s.HandlerLock.RLock() - defer s.HandlerLock.RUnlock() - return s.Handler -} diff --git a/net/service/service_test.go b/net/service/service_test.go deleted file mode 100644 index 798400ee0..000000000 --- a/net/service/service_test.go +++ /dev/null @@ -1,164 +0,0 @@ -package service - -import ( - "bytes" - "testing" - "time" - - msg "github.com/jbenet/go-ipfs/net/message" - peer "github.com/jbenet/go-ipfs/peer" - testutil "github.com/jbenet/go-ipfs/util/testutil" - - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" -) - -// ReverseHandler reverses all Data it receives and sends it back. -type ReverseHandler struct{} - -func (t *ReverseHandler) HandleMessage(ctx context.Context, m msg.NetMessage) msg.NetMessage { - - d := m.Data() - for i, j := 0, len(d)-1; i < j; i, j = i+1, j-1 { - d[i], d[j] = d[j], d[i] - } - - return msg.New(m.Peer(), d) -} - -func newPeer(t *testing.T, id string) peer.Peer { - mh, err := mh.FromHexString(id) - if err != nil { - t.Error(err) - return nil - } - - return testutil.NewPeerWithID(peer.ID(mh)) -} - -func TestServiceHandler(t *testing.T) { - ctx := context.Background() - h := &ReverseHandler{} - s := NewService(ctx, h) - peer1 := newPeer(t, "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275aaaaaa") - - d, err := wrapData([]byte("beep"), nil) - if err != nil { - t.Error(err) - } - - m1 := msg.New(peer1, d) - s.GetPipe().Incoming <- m1 - m2 := <-s.GetPipe().Outgoing - - d, rid, err := unwrapData(m2.Data()) - if err != nil { - t.Error(err) - } - - if rid != nil { - t.Error("RequestID should be nil") - } - - if !bytes.Equal(d, []byte("peeb")) { - t.Errorf("service handler data incorrect: %v != %v", d, "oof") - } -} - -func TestServiceRequest(t *testing.T) { - ctx := context.Background() - s1 := NewService(ctx, &ReverseHandler{}) - s2 := NewService(ctx, &ReverseHandler{}) - - peer1 := newPeer(t, "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275aaaaaa") - - // patch services together - go func() { - for { - select { - case m := <-s1.GetPipe().Outgoing: - s2.GetPipe().Incoming <- m - case m := <-s2.GetPipe().Outgoing: - s1.GetPipe().Incoming <- m - case <-ctx.Done(): - return - } - } - }() - - m1 := msg.New(peer1, []byte("beep")) - m2, err := s1.SendRequest(ctx, m1) - if err != nil { - t.Error(err) - } - - if !bytes.Equal(m2.Data(), []byte("peeb")) { - t.Errorf("service handler data incorrect: %v != %v", m2.Data(), "oof") - } -} - -func TestServiceRequestTimeout(t *testing.T) { - ctx, _ := context.WithTimeout(context.Background(), time.Millisecond) - s1 := NewService(ctx, &ReverseHandler{}) - s2 := NewService(ctx, &ReverseHandler{}) - peer1 := newPeer(t, "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275aaaaaa") - - // patch services together - go func() { - for { - <-time.After(time.Millisecond) - select { - case m := <-s1.GetPipe().Outgoing: - s2.GetPipe().Incoming <- m - case m := <-s2.GetPipe().Outgoing: - s1.GetPipe().Incoming <- m - case <-ctx.Done(): - return - } - } - }() - - m1 := msg.New(peer1, []byte("beep")) - m2, err := s1.SendRequest(ctx, m1) - if err == nil || m2 != nil { - t.Error("should've timed out") - } -} - -func TestServiceClose(t *testing.T) { - ctx := context.Background() - s1 := NewService(ctx, &ReverseHandler{}) - s2 := NewService(ctx, &ReverseHandler{}) - - peer1 := newPeer(t, "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275aaaaaa") - - // patch services together - go func() { - for { - select { - case m := <-s1.GetPipe().Outgoing: - s2.GetPipe().Incoming <- m - case m := <-s2.GetPipe().Outgoing: - s1.GetPipe().Incoming <- m - case <-ctx.Done(): - return - } - } - }() - - m1 := msg.New(peer1, []byte("beep")) - m2, err := s1.SendRequest(ctx, m1) - if err != nil { - t.Error(err) - } - - if !bytes.Equal(m2.Data(), []byte("peeb")) { - t.Errorf("service handler data incorrect: %v != %v", m2.Data(), "oof") - } - - s1.Close() - s2.Close() - - <-s1.Closed() - <-s2.Closed() -} diff --git a/net/swarm/addrs.go b/net/swarm/addrs.go deleted file mode 100644 index 27e7cb55b..000000000 --- a/net/swarm/addrs.go +++ /dev/null @@ -1,120 +0,0 @@ -package swarm - -import ( - "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" - "github.com/jbenet/go-ipfs/util/eventlog" - - ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" -) - -// ListenAddresses returns a list of addresses at which this swarm listens. -func (s *Swarm) ListenAddresses() []ma.Multiaddr { - addrs := make([]ma.Multiaddr, len(s.listeners)) - for i, l := range s.listeners { - addrs[i] = l.Multiaddr() - } - return addrs -} - -// InterfaceListenAddresses returns a list of addresses at which this swarm -// listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to -// use the known local interfaces. -func (s *Swarm) InterfaceListenAddresses() ([]ma.Multiaddr, error) { - return resolveUnspecifiedAddresses(s.ListenAddresses()) -} - -// resolveUnspecifiedAddresses expands unspecified ip addresses (/ip4/0.0.0.0, /ip6/::) to -// use the known local interfaces. -func resolveUnspecifiedAddresses(unspecifiedAddrs []ma.Multiaddr) ([]ma.Multiaddr, error) { - var outputAddrs []ma.Multiaddr - - // todo optimize: only fetch these if we have a "any" addr. - ifaceAddrs, err := interfaceAddresses() - if err != nil { - return nil, err - } - - for _, a := range unspecifiedAddrs { - - // split address into its components - split := ma.Split(a) - - // if first component (ip) is not unspecified, use it as is. - if !manet.IsIPUnspecified(split[0]) { - outputAddrs = append(outputAddrs, a) - continue - } - - // unspecified? add one address per interface. - for _, ia := range ifaceAddrs { - split[0] = ia - joined := ma.Join(split...) - outputAddrs = append(outputAddrs, joined) - } - } - - log.Event(context.TODO(), "interfaceListenAddresses", func() eventlog.Loggable { - var addrs []string - for _, addr := range outputAddrs { - addrs = append(addrs, addr.String()) - } - return eventlog.Metadata{"addresses": addrs} - }()) - log.Debug("InterfaceListenAddresses:", outputAddrs) - return outputAddrs, nil -} - -// interfaceAddresses returns a list of addresses associated with local machine -func interfaceAddresses() ([]ma.Multiaddr, error) { - maddrs, err := manet.InterfaceMultiaddrs() - if err != nil { - return nil, err - } - - var nonLoopback []ma.Multiaddr - for _, a := range maddrs { - if !manet.IsIPLoopback(a) { - nonLoopback = append(nonLoopback, a) - } - } - - return nonLoopback, nil -} - -// addrInList returns whether or not an address is part of a list. -// this is useful to check if NAT is happening (or other bugs?) -func addrInList(addr ma.Multiaddr, list []ma.Multiaddr) bool { - for _, addr2 := range list { - if addr.Equal(addr2) { - return true - } - } - return false -} - -// checkNATWarning checks if our observed addresses differ. if so, -// informs the user that certain things might not work yet -func (s *Swarm) checkNATWarning(observed ma.Multiaddr, expected ma.Multiaddr) { - if observed.Equal(expected) { - return - } - - listen, err := s.InterfaceListenAddresses() - if err != nil { - log.Errorf("Error retrieving swarm.InterfaceListenAddresses: %s", err) - return - } - - if !addrInList(observed, listen) { // probably a nat - log.Warningf(natWarning, observed, listen) - } -} - -const natWarning = `Remote peer observed our address to be: %s -The local addresses are: %s -Thus, connection is going through NAT, and other connections may fail. - -IPFS NAT traversal is still under development. Please bug us on github or irc to fix this. -Baby steps: http://jbenet.static.s3.amazonaws.com/271dfcf/baby-steps.gif -` diff --git a/net/swarm/simul_test.go b/net/swarm/simul_test.go deleted file mode 100644 index 1bdb3edd3..000000000 --- a/net/swarm/simul_test.go +++ /dev/null @@ -1,80 +0,0 @@ -package swarm - -import ( - "fmt" - "sync" - "testing" - - peer "github.com/jbenet/go-ipfs/peer" - "github.com/jbenet/go-ipfs/util/testutil" - - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" -) - -func TestSimultOpen(t *testing.T) { - // t.Skip("skipping for another test") - - addrs := []string{ - "/ip4/127.0.0.1/tcp/1244", - "/ip4/127.0.0.1/tcp/1245", - } - - ctx := context.Background() - swarms, _ := makeSwarms(ctx, t, addrs) - - // connect everyone - { - var wg sync.WaitGroup - connect := func(s *Swarm, dst peer.Peer) { - // copy for other peer - cp := testutil.NewPeerWithID(dst.ID()) - cp.AddAddress(dst.Addresses()[0]) - - if _, err := s.Dial(cp); err != nil { - t.Fatal("error swarm dialing to peer", err) - } - wg.Done() - } - - log.Info("Connecting swarms simultaneously.") - wg.Add(2) - go connect(swarms[0], swarms[1].local) - go connect(swarms[1], swarms[0].local) - wg.Wait() - } - - for _, s := range swarms { - s.Close() - } -} - -func TestSimultOpenMany(t *testing.T) { - t.Skip("very very slow") - - many := 500 - addrs := []string{} - for i := 2200; i < (2200 + many); i++ { - s := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", i) - addrs = append(addrs, s) - } - - SubtestSwarm(t, addrs, 10) -} - -func TestSimultOpenFewStress(t *testing.T) { - if testing.Short() { - t.SkipNow() - } - // t.Skip("skipping for another test") - - num := 10 - // num := 100 - for i := 0; i < num; i++ { - addrs := []string{ - fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 1900+i), - fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 2900+i), - } - - SubtestSwarm(t, addrs, 10) - } -} diff --git a/net/swarm/swarm.go b/net/swarm/swarm.go deleted file mode 100644 index 0259783c1..000000000 --- a/net/swarm/swarm.go +++ /dev/null @@ -1,193 +0,0 @@ -// package swarm implements a connection muxer with a pair of channels -// to synchronize all network communication. -package swarm - -import ( - "errors" - "fmt" - - conn "github.com/jbenet/go-ipfs/net/conn" - peer "github.com/jbenet/go-ipfs/peer" - "github.com/jbenet/go-ipfs/util/eventlog" - - ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" - router "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-router" -) - -var log = eventlog.Logger("swarm") - -// ErrAlreadyOpen signals that a connection to a peer is already open. -var ErrAlreadyOpen = errors.New("Error: Connection to this peer already open.") - -// ListenErr contains a set of errors mapping to each of the swarms addresses. -// Used to return multiple errors, as in listen. -type ListenErr struct { - Errors []error -} - -func (e *ListenErr) Error() string { - if e == nil { - return "" - } - var out string - for i, v := range e.Errors { - if v != nil { - out += fmt.Sprintf("%d: %s\n", i, v) - } - } - return out -} - -// Swarm is a connection muxer, allowing connections to other peers to -// be opened and closed, while still using the same Chan for all -// communication. The Chan sends/receives Messages, which note the -// destination or source Peer. -// -// Implements router.Node -type Swarm struct { - - // local is the peer this swarm represents - local peer.Peer - - // peers is a collection of peers for swarm to use - peers peer.Peerstore - - // rt handles the open connections the swarm is handling. - rt *swarmRoutingTable - - // listeners for each network address - listeners []conn.Listener - - // ContextGroup - cg ctxgroup.ContextGroup -} - -// NewSwarm constructs a Swarm, with a Chan. -func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, - local peer.Peer, ps peer.Peerstore, client router.Node) (*Swarm, error) { - - s := &Swarm{ - local: local, - peers: ps, - cg: ctxgroup.WithContext(ctx), - rt: newRoutingTable(local, client), - } - - s.cg.SetTeardown(s.close) - return s, s.listen(listenAddrs) -} - -// SetClient assign's the Swarm's client node. -func (s *Swarm) SetClient(n router.Node) { - s.rt.client = n -} - -// Close stops a swarm. waits till it exits -func (s *Swarm) Close() error { - return s.cg.Close() -} - -// close stops a swarm. It's the underlying function called by ContextGroup -func (s *Swarm) close() error { - // close listeners - for _, list := range s.listeners { - list.Close() - } - // close connections - conn.CloseConns(s.Connections()...) - return nil -} - -// Dial connects to a peer. -// -// The idea is that the client of Swarm does not need to know what network -// the connection will happen over. Swarm can use whichever it choses. -// This allows us to use various transport protocols, do NAT traversal/relay, -// etc. to achive connection. -// -// For now, Dial uses only TCP. This will be extended. -func (s *Swarm) Dial(peer peer.Peer) (conn.Conn, error) { - if peer.ID().Equal(s.local.ID()) { - return nil, errors.New("Attempted connection to self!") - } - - // check if we already have an open connection first - c := s.GetConnection(peer.ID()) - if c != nil { - return c, nil - } - - // check if we don't have the peer in Peerstore - peer, err := s.peers.Add(peer) - if err != nil { - return nil, err - } - - // open connection to peer - d := &conn.Dialer{ - LocalPeer: s.local, - Peerstore: s.peers, - } - - if len(peer.Addresses()) == 0 { - return nil, errors.New("peer has no addresses") - } - // try to connect to one of the peer's known addresses. - // for simplicity, we do this sequentially. - // A future commit will do this asynchronously. - for _, addr := range peer.Addresses() { - c, err = d.DialAddr(s.cg.Context(), addr, peer) - if err == nil { - break - } - } - if err != nil { - return nil, err - } - - c2, err := s.connSetup(context.TODO(), c) - if err != nil { - c.Close() - return nil, err - } - - // TODO replace the TODO ctx with a context passed in from caller - log.Event(context.TODO(), "dial", peer) - return c2, nil -} - -// GetConnection returns the connection in the swarm to given peer.ID -func (s *Swarm) GetConnection(pid peer.ID) conn.Conn { - sp := s.rt.getByID(pid) - if sp == nil { - return nil - } - return sp.conn -} - -// Connections returns a slice of all connections. -func (s *Swarm) Connections() []conn.Conn { - return s.rt.connList() -} - -// CloseConnection removes a given peer from swarm + closes the connection -func (s *Swarm) CloseConnection(p peer.Peer) error { - return s.closeConn(p) -} - -// GetPeerList returns a copy of the set of peers swarm is connected to. -func (s *Swarm) GetPeerList() []peer.Peer { - return s.rt.peerList() -} - -// LocalPeer returns the local peer swarm is associated to. -func (s *Swarm) LocalPeer() peer.Peer { - return s.local -} - -// Address returns the address of *this* service. -func (s *Swarm) Address() router.Address { - return "/ipfs/service/swarm" // for now dont need anything more complicated. -} diff --git a/net/swarm/swarm_conn.go b/net/swarm/swarm_conn.go deleted file mode 100644 index 208ebdf2c..000000000 --- a/net/swarm/swarm_conn.go +++ /dev/null @@ -1,128 +0,0 @@ -package swarm - -import ( - "errors" - "fmt" - - conn "github.com/jbenet/go-ipfs/net/conn" - - ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" -) - -// Open listeners for each network the swarm should listen on -func (s *Swarm) listen(addrs []ma.Multiaddr) error { - hasErr := false - retErr := &ListenErr{ - Errors: make([]error, len(addrs)), - } - - // listen on every address - for i, addr := range addrs { - err := s.connListen(addr) - if err != nil { - hasErr = true - retErr.Errors[i] = err - log.Errorf("Failed to listen on: %s - %s", addr, err) - } - } - - if hasErr { - return retErr - } - return nil -} - -// Listen for new connections on the given multiaddr -func (s *Swarm) connListen(maddr ma.Multiaddr) error { - - resolved, err := resolveUnspecifiedAddresses([]ma.Multiaddr{maddr}) - if err != nil { - return err - } - - list, err := conn.Listen(s.cg.Context(), maddr, s.local, s.peers) - if err != nil { - return err - } - - // add resolved local addresses to peer - for _, addr := range resolved { - s.local.AddAddress(addr) - } - - // make sure port can be reused. TOOD this doesn't work... - // if err := setSocketReuse(list); err != nil { - // return err - // } - - // NOTE: this may require a lock around it later. currently, only run on setup - s.listeners = append(s.listeners, list) - - // Accept and handle new connections on this listener until it errors - // this listener is a child. - s.cg.AddChildFunc(func(parent ctxgroup.ContextGroup) { - for { - select { - case <-parent.Closing(): - return - - case conn := <-list.Accept(): - s.handleIncomingConn(parent.Context(), conn) - } - } - }) - - return nil -} - -// Handle getting ID from this peer, handshake, and adding it into the map -func (s *Swarm) handleIncomingConn(ctx context.Context, nconn conn.Conn) { - // Setup the new connection - _, err := s.connSetup(ctx, nconn) - if err != nil { - log.Errorf("swarm: failed to add incoming connection: %s", err) - log.Event(ctx, "handleIncomingConn failed", s.LocalPeer(), nconn.RemotePeer()) - nconn.Close() - } -} - -// connSetup takes a new connection, performs the IPFS handshake (handshake3) -// and then adds it to the appropriate MultiConn. -func (s *Swarm) connSetup(ctx context.Context, c conn.Conn) (conn.Conn, error) { - if c == nil { - return nil, errors.New("Tried to start nil connection.") - } - - log.Event(ctx, "connSetupBegin", c.LocalPeer(), c.RemotePeer()) - - // add address of connection to Peer. Maybe it should happen in connSecure. - // NOT adding this address here, because the incoming address in TCP - // is an EPHEMERAL address, and not the address we want to keep around. - // addresses should be figured out through the DHT. - // c.Remote.AddAddress(c.Conn.RemoteMultiaddr()) - - // handshake3 - ctxT, _ := context.WithTimeout(c.Context(), conn.HandshakeTimeout) - h3result, err := conn.Handshake3(ctxT, c) - if err != nil { - c.Close() - return nil, fmt.Errorf("Handshake3 failed: %s", err) - } - - // check for nats. you know, just in case. - if h3result.LocalObservedAddress != nil { - s.checkNATWarning(h3result.LocalObservedAddress, c.LocalMultiaddr()) - } else { - log.Warningf("Received nil observed address from %s", c.RemotePeer()) - } - - // add to conns - if err := s.addConn(c); err != nil { - c.Close() - return nil, err - } - log.Event(ctx, "connSetupSuccess", c.LocalPeer(), c.RemotePeer()) - return c, nil -} diff --git a/net/swarm/swarm_peer.go b/net/swarm/swarm_peer.go deleted file mode 100644 index 8cf1eec98..000000000 --- a/net/swarm/swarm_peer.go +++ /dev/null @@ -1,178 +0,0 @@ -package swarm - -import ( - "fmt" - - conn "github.com/jbenet/go-ipfs/net/conn" - netmsg "github.com/jbenet/go-ipfs/net/message" - peer "github.com/jbenet/go-ipfs/peer" - - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup" - router "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-router" -) - -// MaxConcurrentRequestsPerPeer defines the pipelining that we can do per-peer. -// the networking layer makes sure to provide proper backpressure to the remote -// side by only handling a max number of concurrent requests to completion. -const MaxConcurrentRequestsPerPeer = 20 - -// swarmPeer represents a connection to the outside world. -// Implements router.Node -type swarmPeer struct { - swarm *Swarm - conn *conn.MultiConn - cg ctxgroup.ContextGroup -} - -// newSwarmPeer constructs a new swarmPeer, and starts is worker. -// this doesn't connect it, or add it to the swarm's routing table. -// Implements router.Node -func newSwarmPeer(s *Swarm, p peer.Peer) (*swarmPeer, error) { - c, err := conn.NewMultiConn(s.cg.Context(), s.LocalPeer(), p, nil) - if err != nil { - return nil, fmt.Errorf("Error creating MultiConn: %s", err) - } - - sp := &swarmPeer{ - swarm: s, - conn: c, - cg: ctxgroup.WithParent(s.cg), // swarmPeer closes when swarm closes. - } - - // kicks off the worker. - // ctggroup makes sure swarmPeer doesn't close until this func returns - sp.cg.AddChildFunc(sp.listen) - return sp, nil -} - -// LocalPeer returns the local peer -func (sp *swarmPeer) LocalPeer() peer.Peer { - return sp.conn.LocalPeer() -} - -// RemotePeer returns the peer we're connected to. -func (sp *swarmPeer) RemotePeer() peer.Peer { - return sp.conn.RemotePeer() -} - -// Address is the peer's ID -func (sp *swarmPeer) Address() router.Address { - return sp.RemotePeer() -} - -// Close closes the swarmPeer service -func (sp *swarmPeer) Close() error { - return sp.cg.Close() -} - -// list to the multiconn and route packets in. -func (sp *swarmPeer) listen(parent ctxgroup.ContextGroup) { - - // we listen and pipeline using: - // - 1x listener (this function, the for loop below) - // - 1x pipelining semaphore - // - up to Nx goroutine pipeline workers - // this approach is chosen over N persistent goroutines because - // spawning a goroutine every time is cheaper than keeping N - // additional goroutines all the time, for inactive connections - - pipelineSema := make(chan struct{}, MaxConcurrentRequestsPerPeer) - for i := 0; i < MaxConcurrentRequestsPerPeer; i++ { - pipelineSema <- struct{}{} - } - - // the sad part of using io is we still need to consume msgs - // using a context-less api, which means we need to use an - // extra goroutine, to make sure we close the connection and - // unlock our blocked listener. - // (the Context just does not mix well with io.ReadWriters) - // - conn.SetDeadline could be explored - go func() { - <-parent.Closing() - sp.conn.Close() - }() - - listener := func() { - for { - msg, err := sp.conn.ReadMsg() - // we want this to happen before checking the error, as we may - // be closing (which is not an error). any last message is dropped. - select { - case <-parent.Closing(): - return - case <-sp.conn.Closing(): - return - default: - } - - if err != nil { - log.Errorf("error receiving message from multiconn: %s", err) - continue - } - - select { - case <-parent.Closing(): - return - case <-pipelineSema: // acquire pipelining resource - go func(m []byte) { - sp.handleIncomingMessage(parent.Context(), m) - pipelineSema <- struct{}{} - }(msg) - } - } - } - - // function call so that we can isolate the functionality, - // and so we can call return in loops above and not confuse flow. - listener() -} - -func (sp *swarmPeer) handleIncomingMessage(ctx context.Context, msg []byte) { - // handle incoming message message. - // we derive a new context for this incoming request. - ctx, _ = context.WithCancel(ctx) - - p := netmsg.Packet{ - Src: sp.RemotePeer(), - Dst: sp.swarm.client().Address(), - Data: msg, - Context: ctx, - } - - // We also can't yet pass unread io.RW to the clients directly. - // muxado, SPDY, QUIC, and other stream multiplexors could - // make this a breeze. - - // this runs the entire request. it should not return until ALL action - // is done. this is so that we rate limit and respond to backpressure well. - // TODO: pipelining (handle up to N concurrent requests). - // doing pipelining with SPDY or muxado is probably TRTTD. - if err := sp.HandlePacket(&p, nil); err != nil { - log.Errorf("error handling incoming request: %v", err) - } - - // should be done with the underlying bytes. release (the kraken)! - // TODO: enable this. there is a bug relating to mpool or something. swarm_tests fail. - // sp.conn.ReleaseMsg(msg) -} - -func (sp *swarmPeer) HandlePacket(p router.Packet, n router.Node) error { - switch p.Destination() { - case sp.swarm.client().Address(): // incoming - return sp.swarm.HandlePacket(p, sp) - - case sp.RemotePeer(): // outgoing - buf, ok := p.Payload().([]byte) - if !ok { - return netmsg.ErrInvalidPayload - } - if err := sp.conn.WriteMsg(buf); err != nil { - return fmt.Errorf("swarmPeer error sending: %s", err) - } - return nil - - default: // problem - return fmt.Errorf("swarmPeer routing error: %v got %v", sp, p) - } -} diff --git a/net/swarm/swarm_rt.go b/net/swarm/swarm_rt.go deleted file mode 100644 index 5c91e9f88..000000000 --- a/net/swarm/swarm_rt.go +++ /dev/null @@ -1,159 +0,0 @@ -package swarm - -import ( - "sync" - - conn "github.com/jbenet/go-ipfs/net/conn" - netmsg "github.com/jbenet/go-ipfs/net/message" - peer "github.com/jbenet/go-ipfs/peer" - u "github.com/jbenet/go-ipfs/util" - - router "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-router" -) - -// swarmRoutingTable collects the peers -type swarmRoutingTable struct { - local peer.Peer - client router.Node - peers map[u.Key]*swarmPeer - sync.RWMutex -} - -func newRoutingTable(local peer.Peer, client router.Node) *swarmRoutingTable { - return &swarmRoutingTable{ - local: local, - client: client, - peers: map[u.Key]*swarmPeer{}, - } -} - -func (rt *swarmRoutingTable) getOrAdd(s *Swarm, p peer.Peer) (*swarmPeer, error) { - rt.Lock() - defer rt.Unlock() - - sp, ok := rt.peers[p.Key()] - if ok { - return sp, nil - } - - // newSwarmPeer is what kicks off the reader goroutines. - sp, err := newSwarmPeer(s, p) - if err != nil { - return nil, err - } - rt.peers[p.Key()] = sp - return sp, nil -} - -func (rt *swarmRoutingTable) remove(p peer.Peer) *swarmPeer { - rt.Lock() - defer rt.Unlock() - sp, ok := rt.peers[p.Key()] - if ok { - delete(rt.peers, p.Key()) - } - return sp -} - -func (rt *swarmRoutingTable) getByID(pid peer.ID) *swarmPeer { - rt.RLock() - defer rt.RUnlock() - return rt.peers[u.Key(pid)] -} - -func (rt *swarmRoutingTable) get(p peer.Peer) *swarmPeer { - rt.RLock() - defer rt.RUnlock() - return rt.peers[p.Key()] -} - -func (rt *swarmRoutingTable) connList() []conn.Conn { - rt.RLock() - defer rt.RUnlock() - - var out []conn.Conn - for _, sp := range rt.peers { - out = append(out, sp.conn) - } - return out -} - -func (rt *swarmRoutingTable) peerList() []peer.Peer { - rt.RLock() - defer rt.RUnlock() - - var out []peer.Peer - for _, sp := range rt.peers { - out = append(out, sp.RemotePeer()) - } - return out -} - -// Route implements routing.Route -func (rt *swarmRoutingTable) Route(p router.Packet) router.Node { - - // no need to lock :) - if p.Destination() == rt.client.Address() { - // log.Debugf("%s swarmRoutingTable route %s to client %s ? ", p.Destination(), rt.client.Address(), p.Payload()) - return rt.client - } - - rt.RLock() - defer rt.RUnlock() - - for _, sp := range rt.peers { - if sp.RemotePeer() == p.Destination() { - // log.Debugf("%s swarmRoutingTable route %s to peer %s ? ", p.Destination(), sp.RemotePeer(), p.Payload()) - return sp - } - } - - return nil // no route -} - -func (s *Swarm) client() router.Node { - return s.rt.client -} - -func (s *Swarm) addConn(c conn.Conn) error { - sp, err := s.rt.getOrAdd(s, c.RemotePeer()) - if err != nil { - return err - } - - sp.conn.Add(c) - return nil -} - -func (s *Swarm) closeConn(p peer.Peer) error { - sp := s.rt.remove(p) - if sp == nil { - return nil - } - - return sp.Close() -} - -// HandlePacket routes messages out through connections, or to the client -func (s *Swarm) HandlePacket(p router.Packet, from router.Node) error { - msg, ok := p.Payload().([]byte) - if !ok { - return netmsg.ErrInvalidPayload - } - - if len(msg) >= conn.MaxMessageSize { - log.Criticalf("Attempted to send message bigger than max size. (%d)", len(msg)) - } - - next := s.rt.Route(p) - if next == nil { - // log.Debugf("%s swarm HandlePacket %s -> %s -> %s -> %s: %s", - // s.local, from.Address(), s.Address(), "????", p.Destination(), p.Payload()) - return router.ErrNoRoute - } - - // log.Debugf("%s swarm HandlePacket %s -> %s -> %s -> %s: %s", - // s.local, from.Address(), s.Address(), next.Address(), p.Destination(), p.Payload()) - - return next.HandlePacket(p, s) -} diff --git a/net/swarm/swarm_test.go b/net/swarm/swarm_test.go deleted file mode 100644 index af333c072..000000000 --- a/net/swarm/swarm_test.go +++ /dev/null @@ -1,252 +0,0 @@ -package swarm - -import ( - "bytes" - "sync" - "testing" - "time" - - ci "github.com/jbenet/go-ipfs/crypto" - netmsg "github.com/jbenet/go-ipfs/net/message" - peer "github.com/jbenet/go-ipfs/peer" - u "github.com/jbenet/go-ipfs/util" - testutil "github.com/jbenet/go-ipfs/util/testutil" - - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" - router "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-router" -) - -// needed to copy the data. otherwise gets reused... :/ -type queueClient struct { - addr router.Address - queue chan *netmsg.Packet -} - -func newQueueClient(addr router.Address) *queueClient { - return &queueClient{addr, make(chan *netmsg.Packet, 20)} -} - -func (qc *queueClient) Address() router.Address { - return qc.addr -} - -func (qc *queueClient) HandlePacket(p router.Packet, n router.Node) error { - - pkt1 := p.(*netmsg.Packet) - pkt2 := netmsg.Packet{} - pkt2 = *pkt1 - pkt2.Data = make([]byte, len(pkt1.Data)) - copy(pkt2.Data, pkt1.Data) - - qc.queue <- &pkt2 - return nil -} - -type pongClient struct { - peer peer.Peer - count int - queue chan pongPkt -} - -type pongPkt struct { - msg netmsg.Packet - dst router.Node -} - -func newPongClient(ctx context.Context, peer peer.Peer) *pongClient { - pc := &pongClient{peer: peer, queue: make(chan pongPkt, 10)} - go pc.echo(ctx) - return pc -} - -func (pc *pongClient) Address() router.Address { - return pc.peer.ID().Pretty() + "/pong" -} - -func (pc *pongClient) HandlePacket(p router.Packet, n router.Node) error { - pkt1 := p.(*netmsg.Packet) - if !bytes.Equal(pkt1.Data, []byte("ping")) { - log.Debugf("%s pong dropped pkt: %s (%s -> %s)", pc.Address(), pkt1.Data, pkt1.Src, pkt1.Dst) - panic("why") - return nil // drop - } - - pc.queue <- pongPkt{pkt1.Response([]byte("pong")), n} - return nil -} - -func (pc *pongClient) echo(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - - case pkt := <-pc.queue: - pc.count++ - log.Debugf("%s pong %s (%d)", pkt.msg.Src, pkt.msg.Dst, pc.count) - if err := pkt.dst.HandlePacket(&pkt.msg, pc); err != nil { - log.Errorf("pong error sending: %s", err) - } - } - } -} - -func setupPeer(t *testing.T, addr string) peer.Peer { - tcp, err := ma.NewMultiaddr(addr) - if err != nil { - t.Fatal(err) - } - - sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512) - if err != nil { - t.Fatal(err) - } - - p, err := testutil.NewPeerWithKeyPair(sk, pk) - if err != nil { - t.Fatal(err) - } - p.AddAddress(tcp) - return p -} - -func makeSwarms(ctx context.Context, t *testing.T, addrs []string) ([]*Swarm, []peer.Peer) { - swarms := []*Swarm{} - - for _, addr := range addrs { - local := setupPeer(t, addr) - peerstore := peer.NewPeerstore() - pong := newPongClient(ctx, local) - swarm, err := NewSwarm(ctx, local.Addresses(), local, peerstore, pong) - if err != nil { - t.Fatal(err) - } - swarms = append(swarms, swarm) - } - - peers := make([]peer.Peer, len(swarms)) - for i, s := range swarms { - peers[i] = s.local - } - - return swarms, peers -} - -func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) { - // t.Skip("skipping for another test") - - ctx := context.Background() - swarms, peers := makeSwarms(ctx, t, addrs) - - // connect everyone - { - var wg sync.WaitGroup - connect := func(s *Swarm, dst peer.Peer) { - // copy for other peer - - cp, err := s.peers.FindOrCreate(dst.ID()) - if err != nil { - t.Fatal(err) - } - cp.AddAddress(dst.Addresses()[0]) - - log.Infof("SWARM TEST: %s dialing %s", s.local, dst) - if _, err := s.Dial(cp); err != nil { - t.Fatal("error swarm dialing to peer", err) - } - log.Infof("SWARM TEST: %s connected to %s", s.local, dst) - wg.Done() - } - - log.Info("Connecting swarms simultaneously.") - for _, s := range swarms { - for _, p := range peers { - if p != s.local { // don't connect to self. - wg.Add(1) - connect(s, p) - } - } - } - wg.Wait() - - for _, s := range swarms { - log.Infof("%s swarm routing table: %s", s.local, s.GetPeerList()) - } - } - - // ping/pong - for _, s1 := range swarms { - log.Debugf("-------------------------------------------------------") - log.Debugf("%s ping pong round", s1.local) - log.Debugf("-------------------------------------------------------") - - // for this test, we'll listen on s1. - queue := newQueueClient(s1.client().Address()) - pong := s1.client() // set it back at the end. - s1.SetClient(queue) - - ctx, cancel := context.WithCancel(ctx) - peers, err := s1.peers.All() - if err != nil { - t.Fatal(err) - } - - for k := 0; k < MsgNum; k++ { - for _, p := range *peers { - log.Debugf("%s ping %s (%d)", s1.local, p, k) - pkt := netmsg.Packet{Src: s1.local, Dst: p, Data: []byte("ping"), Context: ctx} - s1.HandlePacket(&pkt, queue) - } - } - - got := map[u.Key]int{} - for k := 0; k < (MsgNum * len(*peers)); k++ { - log.Debugf("%s waiting for pong (%d)", s1.local, k) - - msg := <-queue.queue - if string(msg.Data) != "pong" { - t.Error("unexpected conn output", string(msg.Data), msg.Data) - } - - p := msg.Src.(peer.Peer) - n, _ := got[p.Key()] - got[p.Key()] = n + 1 - } - - log.Debugf("%s got pongs", s1.local) - if len(*peers) != len(got) { - t.Error("got less messages than sent") - } - - for p, n := range got { - if n != MsgNum { - t.Error("peer did not get all msgs", p, n, "/", MsgNum) - } - } - - cancel() - <-time.After(10 * time.Millisecond) - s1.SetClient(pong) - } - - for _, s := range swarms { - s.Close() - } -} - -func TestSwarm(t *testing.T) { - // t.Skip("skipping for another test") - - addrs := []string{ - "/ip4/127.0.0.1/tcp/10234", - "/ip4/127.0.0.1/tcp/10235", - "/ip4/127.0.0.1/tcp/10236", - "/ip4/127.0.0.1/tcp/10237", - "/ip4/127.0.0.1/tcp/10238", - } - - // msgs := 1000 - msgs := 100 - SubtestSwarm(t, addrs, msgs) -}