removed old network

This commit is contained in:
Juan Batiz-Benet 2014-12-16 04:46:40 -08:00
parent 6334e19374
commit c63ffdd0ae
20 changed files with 0 additions and 2371 deletions

View File

@ -1,90 +0,0 @@
package message
import (
"errors"
peer "github.com/jbenet/go-ipfs/peer"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
router "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-router"
)
// ErrInvalidPayload is an error used in the router.HandlePacket implementations
var ErrInvalidPayload = errors.New("invalid packet: non-[]byte payload")
// Packet is used inside the network package to represent a message
// flowing across the subsystems (Conn, Swarm, Mux, Service).
// implements router.Packet
type Packet struct {
Src router.Address // peer.ID or service string
Dst router.Address // peer.ID or service string
Data []byte // raw data
Context context.Context // context of the Packet.
}
func (p *Packet) Destination() router.Address {
return p.Dst
}
func (p *Packet) Payload() interface{} {
return p.Data
}
func (p *Packet) Response(data []byte) Packet {
return Packet{
Src: p.Dst,
Dst: p.Src,
Data: data,
Context: p.Context,
}
}
// NetMessage is the interface for the message
type NetMessage interface {
Peer() peer.Peer
Data() []byte
Loggable() map[string]interface{}
}
// New is the interface for constructing a new message.
func New(p peer.Peer, data []byte) NetMessage {
return &message{peer: p, data: data}
}
// message represents a packet of information sent to or received from a
// particular Peer.
type message struct {
// To or from, depending on direction.
peer peer.Peer
// Opaque data
data []byte
}
func (m *message) Peer() peer.Peer {
return m.peer
}
func (m *message) Data() []byte {
return m.data
}
func (m *message) Loggable() map[string]interface{} {
return map[string]interface{}{
"netMessage": map[string]interface{}{
"recipient": m.Peer().Loggable(),
// TODO sizeBytes? bytes? lenBytes?
"size": len(m.Data()),
},
}
}
// FromObject creates a message from a protobuf-marshallable message.
func FromObject(p peer.Peer, data proto.Message) (NetMessage, error) {
bytes, err := proto.Marshal(data)
if err != nil {
return nil, err
}
return New(p, bytes), nil
}

View File

@ -1,10 +0,0 @@
PB = $(wildcard *.proto)
GO = $(PB:.proto=.pb.go)
all: $(GO)
%.pb.go: %.proto
protoc --gogo_out=. --proto_path=../../../../../../:/usr/local/opt/protobuf/include:. $<
clean:
rm *.pb.go

View File

@ -1,91 +0,0 @@
// Code generated by protoc-gen-gogo.
// source: mux.proto
// DO NOT EDIT!
/*
Package mux_pb is a generated protocol buffer package.
It is generated from these files:
mux.proto
It has these top-level messages:
PBProtocolMessage
*/
package mux_pb
import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = math.Inf
type ProtocolID int32
const (
ProtocolID_Test ProtocolID = 0
ProtocolID_Identify ProtocolID = 1
ProtocolID_Routing ProtocolID = 2
ProtocolID_Exchange ProtocolID = 3
ProtocolID_Diagnostic ProtocolID = 4
)
var ProtocolID_name = map[int32]string{
0: "Test",
1: "Identify",
2: "Routing",
3: "Exchange",
4: "Diagnostic",
}
var ProtocolID_value = map[string]int32{
"Test": 0,
"Identify": 1,
"Routing": 2,
"Exchange": 3,
"Diagnostic": 4,
}
func (x ProtocolID) Enum() *ProtocolID {
p := new(ProtocolID)
*p = x
return p
}
func (x ProtocolID) String() string {
return proto.EnumName(ProtocolID_name, int32(x))
}
func (x *ProtocolID) UnmarshalJSON(data []byte) error {
value, err := proto.UnmarshalJSONEnum(ProtocolID_value, data, "ProtocolID")
if err != nil {
return err
}
*x = ProtocolID(value)
return nil
}
type PBProtocolMessage struct {
ProtocolID *ProtocolID `protobuf:"varint,1,req,enum=mux.pb.ProtocolID" json:"ProtocolID,omitempty"`
Data []byte `protobuf:"bytes,2,req" json:"Data,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *PBProtocolMessage) Reset() { *m = PBProtocolMessage{} }
func (m *PBProtocolMessage) String() string { return proto.CompactTextString(m) }
func (*PBProtocolMessage) ProtoMessage() {}
func (m *PBProtocolMessage) GetProtocolID() ProtocolID {
if m != nil && m.ProtocolID != nil {
return *m.ProtocolID
}
return ProtocolID_Test
}
func (m *PBProtocolMessage) GetData() []byte {
if m != nil {
return m.Data
}
return nil
}
func init() {
proto.RegisterEnum("mux.pb.ProtocolID", ProtocolID_name, ProtocolID_value)
}

View File

@ -1,14 +0,0 @@
package mux.pb;
enum ProtocolID {
Test = 0;
Identify = 1; // setup
Routing = 2; // dht
Exchange = 3; // bitswap
Diagnostic = 4;
}
message PBProtocolMessage {
required ProtocolID ProtocolID = 1;
required bytes Data = 2;
}

View File

@ -1,217 +0,0 @@
// package mux implements a protocol muxer.
package mux
import (
"errors"
"fmt"
"sync"
msg "github.com/jbenet/go-ipfs/net/message"
pb "github.com/jbenet/go-ipfs/net/mux/internal/pb"
u "github.com/jbenet/go-ipfs/util"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
router "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-router"
)
var log = u.Logger("muxer")
// ProtocolIDs used to identify each protocol.
// These should probably be defined elsewhere.
var (
ProtocolID_Routing = pb.ProtocolID_Routing
ProtocolID_Exchange = pb.ProtocolID_Exchange
ProtocolID_Diagnostic = pb.ProtocolID_Diagnostic
)
// Protocol objects produce + consume raw data. They are added to the Muxer
// with a ProtocolID, which is added to outgoing payloads. Muxer properly
// encapsulates and decapsulates when interfacing with its Protocols. The
// Protocols do not encounter their ProtocolID.
type Protocol interface {
ProtocolID() pb.ProtocolID
// Node is a router.Node, for message connectivity.
router.Node
}
// ProtocolMap maps ProtocolIDs to Protocols.
type ProtocolMap map[pb.ProtocolID]Protocol
// Muxer is a simple multiplexor that reads + writes to Incoming and Outgoing
// channels. It multiplexes various protocols, wrapping and unwrapping data
// with a ProtocolID.
//
// implements router.Node and router.Route
type Muxer struct {
local router.Address
uplink router.Node
// Protocols are the multiplexed services.
Protocols ProtocolMap
mapLock sync.Mutex
bwiLock sync.Mutex
bwIn uint64
msgIn uint64
bwoLock sync.Mutex
bwOut uint64
msgOut uint64
}
// NewMuxer constructs a muxer given a protocol map.
// uplink is a Node to send all outgoing traffic to.
func NewMuxer(local router.Address, uplink router.Node) *Muxer {
return &Muxer{
local: local,
uplink: uplink,
Protocols: ProtocolMap{},
}
}
// GetMessageCounts return the in/out message count measured over this muxer.
func (m *Muxer) GetMessageCounts() (in uint64, out uint64) {
m.bwiLock.Lock()
in = m.msgIn
m.bwiLock.Unlock()
m.bwoLock.Lock()
out = m.msgOut
m.bwoLock.Unlock()
return
}
// GetBandwidthTotals return the in/out bandwidth measured over this muxer.
func (m *Muxer) GetBandwidthTotals() (in uint64, out uint64) {
m.bwiLock.Lock()
in = m.bwIn
m.bwiLock.Unlock()
m.bwoLock.Lock()
out = m.bwOut
m.bwoLock.Unlock()
return
}
// AddProtocol adds a Protocol with given ProtocolID to the Muxer.
func (m *Muxer) AddProtocol(p Protocol, pid pb.ProtocolID) error {
m.mapLock.Lock()
defer m.mapLock.Unlock()
if _, found := m.Protocols[pid]; found {
return errors.New("Another protocol already using this ProtocolID")
}
m.Protocols[pid] = p
return nil
}
func (m *Muxer) Address() router.Address {
return m.local
}
func (m *Muxer) HandlePacket(p router.Packet, from router.Node) error {
pkt, ok := p.(*msg.Packet)
if !ok {
return msg.ErrInvalidPayload
}
if from == m.uplink {
return m.handleIncomingPacket(pkt, from)
} else {
return m.handleOutgoingPacket(pkt, from)
}
}
// handleIncomingPacket routes message to the appropriate protocol.
func (m *Muxer) handleIncomingPacket(p *msg.Packet, _ router.Node) error {
m.bwiLock.Lock()
// TODO: compensate for overhead
m.bwIn += uint64(len(p.Data))
m.msgIn++
m.bwiLock.Unlock()
data, pid, err := unwrapData(p.Data)
if err != nil {
return fmt.Errorf("muxer de-serializing error: %v", err)
}
// TODO: fix this when mpool is fixed.
// conn.ReleaseBuffer(m1.Data())
p.Data = data
m.mapLock.Lock()
proto, found := m.Protocols[pid]
m.mapLock.Unlock()
if !found {
return fmt.Errorf("muxer: unknown protocol %v", pid)
}
log.Debugf("muxer: outgoing packet %d -> %s", proto.ProtocolID(), m.uplink.Address())
return proto.HandlePacket(p, m)
}
// handleOutgoingMessages sends out messages to the outside world
func (m *Muxer) handleOutgoingPacket(p *msg.Packet, from router.Node) error {
var pid pb.ProtocolID
var proto Protocol
m.mapLock.Lock()
for pid2, proto2 := range m.Protocols {
if proto2 == from {
pid = pid2
proto = proto2
break
}
}
m.mapLock.Unlock()
if proto == nil {
return errors.New("muxer: packet sent from unknown protocol")
}
var err error
p.Data, err = wrapData(p.Data, pid)
if err != nil {
return fmt.Errorf("muxer serializing error: %v", err)
}
m.bwoLock.Lock()
// TODO: compensate for overhead
// TODO(jbenet): switch this to a goroutine to prevent sync waiting.
m.bwOut += uint64(len(p.Data))
m.msgOut++
m.bwoLock.Unlock()
// TODO: add multiple uplinks
log.Debugf("muxer: incoming packet %s -> %d", m.uplink.Address(), proto.ProtocolID())
return m.uplink.HandlePacket(p, m)
}
func wrapData(data []byte, pid pb.ProtocolID) ([]byte, error) {
// Marshal
pbm := new(pb.PBProtocolMessage)
pbm.ProtocolID = &pid
pbm.Data = data
b, err := proto.Marshal(pbm)
if err != nil {
return nil, err
}
return b, nil
}
func unwrapData(data []byte) ([]byte, pb.ProtocolID, error) {
// Unmarshal
pbm := new(pb.PBProtocolMessage)
err := proto.Unmarshal(data, pbm)
if err != nil {
return nil, 0, err
}
return pbm.GetData(), pbm.GetProtocolID(), nil
}

View File

@ -1,137 +0,0 @@
package mux
import (
"bytes"
"testing"
mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
msg "github.com/jbenet/go-ipfs/net/message"
pb "github.com/jbenet/go-ipfs/net/mux/internal/pb"
peer "github.com/jbenet/go-ipfs/peer"
testutil "github.com/jbenet/go-ipfs/util/testutil"
router "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-router"
)
type TestProtocol struct {
mux *Muxer
pid pb.ProtocolID
msg []*msg.Packet
}
func (t *TestProtocol) ProtocolID() pb.ProtocolID {
return t.pid
}
func (t *TestProtocol) Address() router.Address {
return t.pid
}
func (t *TestProtocol) HandlePacket(p router.Packet, from router.Node) error {
pkt, ok := p.(*msg.Packet)
if !ok {
return msg.ErrInvalidPayload
}
log.Debugf("TestProtocol %d got: %v", t, p)
if from == t.mux {
t.msg = append(t.msg, pkt)
return nil
}
return t.mux.HandlePacket(p, t)
}
func newPeer(t *testing.T, id string) peer.Peer {
mh, err := mh.FromHexString(id)
if err != nil {
t.Error(err)
return nil
}
return testutil.NewPeerWithID(peer.ID(mh))
}
func testMsg(t *testing.T, m *msg.Packet, data []byte) {
if !bytes.Equal(data, m.Data) {
t.Errorf("Data does not match: %v != %v", data, m.Data)
}
}
func testWrappedMsg(t *testing.T, m *msg.Packet, pid pb.ProtocolID, data []byte) {
data2, pid2, err := unwrapData(m.Data)
if err != nil {
t.Error(err)
}
if pid != pid2 {
t.Errorf("ProtocolIDs do not match: %v != %v", pid, pid2)
}
if !bytes.Equal(data, data2) {
t.Errorf("Data does not match: %v != %v", data, data2)
}
}
func TestSimpleMuxer(t *testing.T) {
// setup
peer1 := newPeer(t, "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275aaaaaa")
peer2 := newPeer(t, "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275bbbbbb")
uplink := router.NewQueueNode("queue", make(chan router.Packet, 10))
mux1 := NewMuxer(string(peer1.ID()), uplink)
pid1 := pb.ProtocolID_Test
pid2 := pb.ProtocolID_Routing
p1 := &TestProtocol{mux1, pid1, nil}
p2 := &TestProtocol{mux1, pid2, nil}
mux1.AddProtocol(p1, pid1)
mux1.AddProtocol(p2, pid2)
// test outgoing p1
for _, s := range []string{"foo", "bar", "baz"} {
pkt := msg.Packet{Src: peer1, Dst: peer2, Data: []byte(s)}
if err := p1.HandlePacket(&pkt, nil); err != nil {
t.Fatal(err)
}
testWrappedMsg(t, (<-uplink.Queue()).(*msg.Packet), pid1, []byte(s))
}
// test incoming p1
for i, s := range []string{"foo", "bar", "baz"} {
d, err := wrapData([]byte(s), pid1)
if err != nil {
t.Error(err)
}
pkt := msg.Packet{Src: peer1, Dst: peer2, Data: d}
if err := mux1.HandlePacket(&pkt, uplink); err != nil {
t.Fatal(err)
}
testMsg(t, p1.msg[i], []byte(s))
}
// test outgoing p2
for _, s := range []string{"foo", "bar", "baz"} {
pkt := msg.Packet{Src: peer1, Dst: peer2, Data: []byte(s)}
if err := p2.HandlePacket(&pkt, nil); err != nil {
t.Fatal(err)
}
testWrappedMsg(t, (<-uplink.Queue()).(*msg.Packet), pid2, []byte(s))
}
// test incoming p2
for i, s := range []string{"foo", "bar", "baz"} {
d, err := wrapData([]byte(s), pid2)
if err != nil {
t.Fatal(err)
}
pkt := msg.Packet{Src: peer1, Dst: peer2, Data: d}
if err := mux1.HandlePacket(&pkt, uplink); err != nil {
t.Fatal(err)
}
testMsg(t, p2.msg[i], []byte(s))
}
}

View File

@ -1,11 +0,0 @@
PB = $(wildcard *.proto)
GO = $(PB:.proto=.pb.go)
all: $(GO)
%.pb.go: %.proto
protoc --gogo_out=. --proto_path=../../../../../../:/usr/local/opt/protobuf/include:. $<
clean:
rm -f *.pb.go
rm -f *.go

View File

@ -1,48 +0,0 @@
// Code generated by protoc-gen-gogo.
// source: service.proto
// DO NOT EDIT!
/*
Package service_pb is a generated protocol buffer package.
It is generated from these files:
service.proto
It has these top-level messages:
PBRequest
*/
package service_pb
import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = math.Inf
type PBRequest struct {
Data []byte `protobuf:"bytes,1,req" json:"Data,omitempty"`
Tag []byte `protobuf:"bytes,3,opt" json:"Tag,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *PBRequest) Reset() { *m = PBRequest{} }
func (m *PBRequest) String() string { return proto.CompactTextString(m) }
func (*PBRequest) ProtoMessage() {}
func (m *PBRequest) GetData() []byte {
if m != nil {
return m.Data
}
return nil
}
func (m *PBRequest) GetTag() []byte {
if m != nil {
return m.Tag
}
return nil
}
func init() {
}

View File

@ -1,6 +0,0 @@
package service.pb;
message PBRequest {
required bytes Data = 1;
optional bytes Tag = 3;
}

View File

@ -1,128 +0,0 @@
package service
import (
crand "crypto/rand"
msg "github.com/jbenet/go-ipfs/net/message"
pb "github.com/jbenet/go-ipfs/net/service/internal/pb"
peer "github.com/jbenet/go-ipfs/peer"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
)
const (
// IDSize is the size of the ID in bytes.
IDSize int = 4
)
// RequestID is a field that identifies request-response flows.
type RequestID []byte
// Request turns a RequestID into a Request (unsetting first bit)
func (r RequestID) Request() RequestID {
if r == nil {
return nil
}
r2 := make([]byte, len(r))
copy(r2, r)
r2[0] = r[0] & 0x7F // unset first bit for request
return RequestID(r2)
}
// Response turns a RequestID into a Response (setting first bit)
func (r RequestID) Response() RequestID {
if r == nil {
return nil
}
r2 := make([]byte, len(r))
copy(r2, r)
r2[0] = r[0] | 0x80 // set first bit for response
return RequestID(r2)
}
// IsRequest returns whether a RequestID identifies a request
func (r RequestID) IsRequest() bool {
if r == nil {
return false
}
return !r.IsResponse()
}
// IsResponse returns whether a RequestID identifies a response
func (r RequestID) IsResponse() bool {
if r == nil {
return false
}
return bool(r[0]&0x80 == 0x80)
}
// RandomRequestID creates and returns a new random request ID
func RandomRequestID() (RequestID, error) {
buf := make([]byte, IDSize)
_, err := crand.Read(buf)
return RequestID(buf).Request(), err
}
// RequestMap is a map of Requests. the key = (peer.ID concat RequestID).
type RequestMap map[string]*Request
// Request objects are used to multiplex request-response flows.
type Request struct {
// ID is the RequestID identifying this Request-Response Flow.
ID RequestID
// PeerID identifies the peer from whom to expect the response.
PeerID peer.ID
// Response is the channel of incoming responses.
Response chan msg.NetMessage
}
// NewRequest creates a request for given peer.ID
func NewRequest(pid peer.ID) (*Request, error) {
id, err := RandomRequestID()
if err != nil {
return nil, err
}
return &Request{
ID: id,
PeerID: pid,
Response: make(chan msg.NetMessage, 1),
}, nil
}
// Key returns the RequestKey for this request. Use with maps.
func (r *Request) Key() string {
return RequestKey(r.PeerID, r.ID)
}
// RequestKey is the peer.ID concatenated with the RequestID. Use with maps.
func RequestKey(pid peer.ID, rid RequestID) string {
return string(pid) + string(rid.Request()[:])
}
func wrapData(data []byte, rid RequestID) ([]byte, error) {
// Marshal
pbm := new(pb.PBRequest)
pbm.Data = data
pbm.Tag = rid
b, err := proto.Marshal(pbm)
if err != nil {
return nil, err
}
return b, nil
}
func unwrapData(data []byte) ([]byte, RequestID, error) {
// Unmarshal
pbm := new(pb.PBRequest)
err := proto.Unmarshal(data, pbm)
if err != nil {
return nil, nil, err
}
return pbm.GetData(), pbm.GetTag(), nil
}

View File

@ -1,41 +0,0 @@
package service
import (
"bytes"
"testing"
)
func TestMarshaling(t *testing.T) {
test := func(d1 []byte, rid1 RequestID) {
d2, err := wrapData(d1, rid1)
if err != nil {
t.Error(err)
}
d3, rid2, err := unwrapData(d2)
if err != nil {
t.Error(err)
}
d4, err := wrapData(d3, rid1)
if err != nil {
t.Error(err)
}
if !bytes.Equal(rid2, rid1) {
t.Error("RequestID fail")
}
if !bytes.Equal(d1, d3) {
t.Error("unmarshalled data should be the same")
}
if !bytes.Equal(d2, d4) {
t.Error("marshalled data should be the same")
}
}
test([]byte("foo"), []byte{1, 2, 3, 4})
test([]byte("bar"), nil)
}

View File

@ -1,304 +0,0 @@
package service
import (
"errors"
"fmt"
"sync"
msg "github.com/jbenet/go-ipfs/net/message"
u "github.com/jbenet/go-ipfs/util"
ctxgroup "github.com/jbenet/go-ctxgroup"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
router "github.com/jbenet/go-router"
)
var log = u.Logger("service")
// ErrNoResponse is returned by Service when a Request did not get a response,
// and no other error happened
var ErrNoResponse = errors.New("no response to request")
// Handler is an interface that objects must implement in order to handle
// a service's requests.
type Handler interface {
// HandleMessage receives an incoming message, and potentially returns
// a response message to send back.
HandleMessage(context.Context, msg.NetMessage) msg.NetMessage
}
// Sender interface for network services.
type Sender interface {
// SendMessage sends out a given message, without expecting a response.
SendMessage(ctx context.Context, m msg.NetMessage) error
// SendRequest sends out a given message, and awaits a response.
// Set Deadlines or cancellations in the context.Context you pass in.
SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error)
}
// Service is an interface for a net resource with both outgoing (sender) and
// incomig (SetHandler) requests.
type Service interface {
Sender // can use it to send out msgs
router.Node // it is a Node in the net topology.
// SetUplink assigns the Node to send packets out
SetUplink(router.Node)
Uplink() router.Node
// SetHandler assigns the request Handler for this service.
SetHandler(Handler)
GetHandler() Handler
}
// Service is a networking component that protocols can use to multiplex
// messages over the same channel, and to issue + handle requests.
type service struct {
// Handler is the object registered to handle incoming requests.
Handler Handler
HandlerLock sync.RWMutex
// Requests are all the pending requests on this service.
Requests RequestMap
RequestsLock sync.RWMutex
// the connection to the outside world
uplink router.Node
uplinkLock sync.RWMutex
addr router.Address
}
// NewService creates a service object with given type ID and Handler
func NewService(addr router.Address, uplink router.Node, h Handler) Service {
s := &service{
Handler: h,
Requests: RequestMap{},
uplink: uplink,
addr: addr,
}
return s
}
// sendMessage sends a message out (actual leg work. SendMessage is to export w/o rid)
func (s *service) sendMessage(ctx context.Context, m msg.NetMessage, rid RequestID) error {
// serialize ServiceMessage wrapper
data, err := wrapData(m.Data(), rid)
if err != nil {
return err
}
// log.Debug("Service send message [to = %s]", m.Peer())
// send message
m2 := msg.New(m.Peer(), data)
pkt := msg.Packet{
Src:
}
select {
case s.Outgoing <- m2:
case <-ctx.Done():
return ctx.Err()
}
pkt := msg.Packet{
Src: m.
}
return nil
}
// SendMessage sends a message out
func (s *service) SendMessage(ctx context.Context, m msg.NetMessage) error {
return s.sendMessage(ctx, m, nil)
}
// SendRequest sends a request message out and awaits a response.
func (s *service) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) {
// check if we should bail given our contexts
select {
default:
case <-s.Closing():
return nil, fmt.Errorf("service closed: %s", s.Context().Err())
case <-ctx.Done():
return nil, ctx.Err()
}
// create a request
r, err := NewRequest(m.Peer().ID())
if err != nil {
return nil, err
}
// register Request
s.RequestsLock.Lock()
s.Requests[r.Key()] = r
s.RequestsLock.Unlock()
// defer deleting this request
defer func() {
s.RequestsLock.Lock()
delete(s.Requests, r.Key())
s.RequestsLock.Unlock()
}()
// check if we should bail after waiting for mutex
select {
default:
case <-s.Closing():
return nil, fmt.Errorf("service closed: %s", s.Context().Err())
case <-ctx.Done():
return nil, ctx.Err()
}
// Send message
s.sendMessage(ctx, m, r.ID)
// wait for response
m = nil
err = nil
select {
case m = <-r.Response:
case <-s.Closed():
err = fmt.Errorf("service closed: %s", s.Context().Err())
case <-ctx.Done():
err = ctx.Err()
}
if m == nil {
return nil, ErrNoResponse
}
return m, err
}
// handleIncoming consumes the messages on the s.Incoming channel and
// routes them appropriately (to requests, or handler).
func (s *service) handleIncomingMessages() {
defer s.Children().Done()
for {
select {
case m, more := <-s.Incoming:
if !more {
return
}
s.Children().Add(1)
go s.handleIncomingMessage(m)
case <-s.Closing():
return
}
}
}
func (s *service) handleIncomingMessage(pkt *msg.Packet) error {
// check the packet has a valid Context
ctx := pkt.Context
if ctx == nil {
return fmt.Errorf("service got pkt without valid Context")
}
// check the source is a peer
srcPeer, ok := pkt.Src.(peer.Peer)
if !ok {
return fmt.Errorf("service got pkt from non-Peer src: %v", pkt.Src)
}
// unwrap the incoming message
data, rid, err := unwrapData(pkt.Data)
if err != nil {
return fmt.Errorf("service de-serializing error: %v", err)
}
// convert to msg.NetMessage, which the rest of the system expects.
m2 := msg.New(srcPeer, data)
// if it's a request (or has no RequestID), handle it
if rid == nil || rid.IsRequest() {
handler := s.GetHandler()
if handler == nil {
log.Errorf("service dropped msg: %v", m)
log.Event()
return nil
// no handler, drop it.
}
// this go routine is developer friendliness to keep their stacks
// separate (and more readable) from the network goroutine. If
// problems arise and you'd like to see _the full_ stack of where
// this message is coming from, just remove the goroutine part.
response := make(chan msg.NetMessage)
go func() msg.NetMessage {
return handler.HandleMessage(ctx, m2)
}()
r1 := <-response
// Note: HandleMessage *must* respect context. We could co-opt it
// and do a select {} here on the context, BUT that would just drop
// a packet and free up the goroutine to return to the network. the
// problem is still there: the Service handler hasn't returned yet.
// if handler gave us a response, send it out!
if r1 != nil {
if err := s.sendMessage(ctx, r1, rid.Response()); err != nil {
return fmt.Errorf("error sending response message: %v", err)
}
}
return
}
// Otherwise, it is a response. handle it.
if !rid.IsResponse() {
log.Errorf("RequestID should identify a response here.")
}
key := RequestKey(m.Peer().ID(), RequestID(rid))
s.RequestsLock.RLock()
r, found := s.Requests[key]
s.RequestsLock.RUnlock()
if !found {
log.Errorf("no request key %v (timeout?)", []byte(key))
return
}
select {
case r.Response <- m2:
case <-s.Closing():
}
}
// Address is the router.Node address
func (s *service) Address() router.Address {
return s.addr
}
// HandlePacket implements router.Node
// service only receives packets in HandlePacket
func (s *service) HandlePacket(p router.Packet, from router.Node) error {
pkt, ok := p.(*msg.Packet)
if !ok {
return msg.ErrInvalidPayload
}
}
// SetHandler assigns the request Handler for this service.
func (s *service) SetHandler(h Handler) {
s.HandlerLock.Lock()
defer s.HandlerLock.Unlock()
s.Handler = h
}
// GetHandler returns the request Handler for this service.
func (s *service) GetHandler() Handler {
s.HandlerLock.RLock()
defer s.HandlerLock.RUnlock()
return s.Handler
}

View File

@ -1,164 +0,0 @@
package service
import (
"bytes"
"testing"
"time"
msg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
testutil "github.com/jbenet/go-ipfs/util/testutil"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
)
// ReverseHandler reverses all Data it receives and sends it back.
type ReverseHandler struct{}
func (t *ReverseHandler) HandleMessage(ctx context.Context, m msg.NetMessage) msg.NetMessage {
d := m.Data()
for i, j := 0, len(d)-1; i < j; i, j = i+1, j-1 {
d[i], d[j] = d[j], d[i]
}
return msg.New(m.Peer(), d)
}
func newPeer(t *testing.T, id string) peer.Peer {
mh, err := mh.FromHexString(id)
if err != nil {
t.Error(err)
return nil
}
return testutil.NewPeerWithID(peer.ID(mh))
}
func TestServiceHandler(t *testing.T) {
ctx := context.Background()
h := &ReverseHandler{}
s := NewService(ctx, h)
peer1 := newPeer(t, "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275aaaaaa")
d, err := wrapData([]byte("beep"), nil)
if err != nil {
t.Error(err)
}
m1 := msg.New(peer1, d)
s.GetPipe().Incoming <- m1
m2 := <-s.GetPipe().Outgoing
d, rid, err := unwrapData(m2.Data())
if err != nil {
t.Error(err)
}
if rid != nil {
t.Error("RequestID should be nil")
}
if !bytes.Equal(d, []byte("peeb")) {
t.Errorf("service handler data incorrect: %v != %v", d, "oof")
}
}
func TestServiceRequest(t *testing.T) {
ctx := context.Background()
s1 := NewService(ctx, &ReverseHandler{})
s2 := NewService(ctx, &ReverseHandler{})
peer1 := newPeer(t, "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275aaaaaa")
// patch services together
go func() {
for {
select {
case m := <-s1.GetPipe().Outgoing:
s2.GetPipe().Incoming <- m
case m := <-s2.GetPipe().Outgoing:
s1.GetPipe().Incoming <- m
case <-ctx.Done():
return
}
}
}()
m1 := msg.New(peer1, []byte("beep"))
m2, err := s1.SendRequest(ctx, m1)
if err != nil {
t.Error(err)
}
if !bytes.Equal(m2.Data(), []byte("peeb")) {
t.Errorf("service handler data incorrect: %v != %v", m2.Data(), "oof")
}
}
func TestServiceRequestTimeout(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), time.Millisecond)
s1 := NewService(ctx, &ReverseHandler{})
s2 := NewService(ctx, &ReverseHandler{})
peer1 := newPeer(t, "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275aaaaaa")
// patch services together
go func() {
for {
<-time.After(time.Millisecond)
select {
case m := <-s1.GetPipe().Outgoing:
s2.GetPipe().Incoming <- m
case m := <-s2.GetPipe().Outgoing:
s1.GetPipe().Incoming <- m
case <-ctx.Done():
return
}
}
}()
m1 := msg.New(peer1, []byte("beep"))
m2, err := s1.SendRequest(ctx, m1)
if err == nil || m2 != nil {
t.Error("should've timed out")
}
}
func TestServiceClose(t *testing.T) {
ctx := context.Background()
s1 := NewService(ctx, &ReverseHandler{})
s2 := NewService(ctx, &ReverseHandler{})
peer1 := newPeer(t, "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275aaaaaa")
// patch services together
go func() {
for {
select {
case m := <-s1.GetPipe().Outgoing:
s2.GetPipe().Incoming <- m
case m := <-s2.GetPipe().Outgoing:
s1.GetPipe().Incoming <- m
case <-ctx.Done():
return
}
}
}()
m1 := msg.New(peer1, []byte("beep"))
m2, err := s1.SendRequest(ctx, m1)
if err != nil {
t.Error(err)
}
if !bytes.Equal(m2.Data(), []byte("peeb")) {
t.Errorf("service handler data incorrect: %v != %v", m2.Data(), "oof")
}
s1.Close()
s2.Close()
<-s1.Closed()
<-s2.Closed()
}

View File

@ -1,120 +0,0 @@
package swarm
import (
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
"github.com/jbenet/go-ipfs/util/eventlog"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
// ListenAddresses returns a list of addresses at which this swarm listens.
func (s *Swarm) ListenAddresses() []ma.Multiaddr {
addrs := make([]ma.Multiaddr, len(s.listeners))
for i, l := range s.listeners {
addrs[i] = l.Multiaddr()
}
return addrs
}
// InterfaceListenAddresses returns a list of addresses at which this swarm
// listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to
// use the known local interfaces.
func (s *Swarm) InterfaceListenAddresses() ([]ma.Multiaddr, error) {
return resolveUnspecifiedAddresses(s.ListenAddresses())
}
// resolveUnspecifiedAddresses expands unspecified ip addresses (/ip4/0.0.0.0, /ip6/::) to
// use the known local interfaces.
func resolveUnspecifiedAddresses(unspecifiedAddrs []ma.Multiaddr) ([]ma.Multiaddr, error) {
var outputAddrs []ma.Multiaddr
// todo optimize: only fetch these if we have a "any" addr.
ifaceAddrs, err := interfaceAddresses()
if err != nil {
return nil, err
}
for _, a := range unspecifiedAddrs {
// split address into its components
split := ma.Split(a)
// if first component (ip) is not unspecified, use it as is.
if !manet.IsIPUnspecified(split[0]) {
outputAddrs = append(outputAddrs, a)
continue
}
// unspecified? add one address per interface.
for _, ia := range ifaceAddrs {
split[0] = ia
joined := ma.Join(split...)
outputAddrs = append(outputAddrs, joined)
}
}
log.Event(context.TODO(), "interfaceListenAddresses", func() eventlog.Loggable {
var addrs []string
for _, addr := range outputAddrs {
addrs = append(addrs, addr.String())
}
return eventlog.Metadata{"addresses": addrs}
}())
log.Debug("InterfaceListenAddresses:", outputAddrs)
return outputAddrs, nil
}
// interfaceAddresses returns a list of addresses associated with local machine
func interfaceAddresses() ([]ma.Multiaddr, error) {
maddrs, err := manet.InterfaceMultiaddrs()
if err != nil {
return nil, err
}
var nonLoopback []ma.Multiaddr
for _, a := range maddrs {
if !manet.IsIPLoopback(a) {
nonLoopback = append(nonLoopback, a)
}
}
return nonLoopback, nil
}
// addrInList returns whether or not an address is part of a list.
// this is useful to check if NAT is happening (or other bugs?)
func addrInList(addr ma.Multiaddr, list []ma.Multiaddr) bool {
for _, addr2 := range list {
if addr.Equal(addr2) {
return true
}
}
return false
}
// checkNATWarning checks if our observed addresses differ. if so,
// informs the user that certain things might not work yet
func (s *Swarm) checkNATWarning(observed ma.Multiaddr, expected ma.Multiaddr) {
if observed.Equal(expected) {
return
}
listen, err := s.InterfaceListenAddresses()
if err != nil {
log.Errorf("Error retrieving swarm.InterfaceListenAddresses: %s", err)
return
}
if !addrInList(observed, listen) { // probably a nat
log.Warningf(natWarning, observed, listen)
}
}
const natWarning = `Remote peer observed our address to be: %s
The local addresses are: %s
Thus, connection is going through NAT, and other connections may fail.
IPFS NAT traversal is still under development. Please bug us on github or irc to fix this.
Baby steps: http://jbenet.static.s3.amazonaws.com/271dfcf/baby-steps.gif
`

View File

@ -1,80 +0,0 @@
package swarm
import (
"fmt"
"sync"
"testing"
peer "github.com/jbenet/go-ipfs/peer"
"github.com/jbenet/go-ipfs/util/testutil"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
func TestSimultOpen(t *testing.T) {
// t.Skip("skipping for another test")
addrs := []string{
"/ip4/127.0.0.1/tcp/1244",
"/ip4/127.0.0.1/tcp/1245",
}
ctx := context.Background()
swarms, _ := makeSwarms(ctx, t, addrs)
// connect everyone
{
var wg sync.WaitGroup
connect := func(s *Swarm, dst peer.Peer) {
// copy for other peer
cp := testutil.NewPeerWithID(dst.ID())
cp.AddAddress(dst.Addresses()[0])
if _, err := s.Dial(cp); err != nil {
t.Fatal("error swarm dialing to peer", err)
}
wg.Done()
}
log.Info("Connecting swarms simultaneously.")
wg.Add(2)
go connect(swarms[0], swarms[1].local)
go connect(swarms[1], swarms[0].local)
wg.Wait()
}
for _, s := range swarms {
s.Close()
}
}
func TestSimultOpenMany(t *testing.T) {
t.Skip("very very slow")
many := 500
addrs := []string{}
for i := 2200; i < (2200 + many); i++ {
s := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", i)
addrs = append(addrs, s)
}
SubtestSwarm(t, addrs, 10)
}
func TestSimultOpenFewStress(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
// t.Skip("skipping for another test")
num := 10
// num := 100
for i := 0; i < num; i++ {
addrs := []string{
fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 1900+i),
fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 2900+i),
}
SubtestSwarm(t, addrs, 10)
}
}

View File

@ -1,193 +0,0 @@
// package swarm implements a connection muxer with a pair of channels
// to synchronize all network communication.
package swarm
import (
"errors"
"fmt"
conn "github.com/jbenet/go-ipfs/net/conn"
peer "github.com/jbenet/go-ipfs/peer"
"github.com/jbenet/go-ipfs/util/eventlog"
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
router "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-router"
)
var log = eventlog.Logger("swarm")
// ErrAlreadyOpen signals that a connection to a peer is already open.
var ErrAlreadyOpen = errors.New("Error: Connection to this peer already open.")
// ListenErr contains a set of errors mapping to each of the swarms addresses.
// Used to return multiple errors, as in listen.
type ListenErr struct {
Errors []error
}
func (e *ListenErr) Error() string {
if e == nil {
return "<nil error>"
}
var out string
for i, v := range e.Errors {
if v != nil {
out += fmt.Sprintf("%d: %s\n", i, v)
}
}
return out
}
// Swarm is a connection muxer, allowing connections to other peers to
// be opened and closed, while still using the same Chan for all
// communication. The Chan sends/receives Messages, which note the
// destination or source Peer.
//
// Implements router.Node
type Swarm struct {
// local is the peer this swarm represents
local peer.Peer
// peers is a collection of peers for swarm to use
peers peer.Peerstore
// rt handles the open connections the swarm is handling.
rt *swarmRoutingTable
// listeners for each network address
listeners []conn.Listener
// ContextGroup
cg ctxgroup.ContextGroup
}
// NewSwarm constructs a Swarm, with a Chan.
func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
local peer.Peer, ps peer.Peerstore, client router.Node) (*Swarm, error) {
s := &Swarm{
local: local,
peers: ps,
cg: ctxgroup.WithContext(ctx),
rt: newRoutingTable(local, client),
}
s.cg.SetTeardown(s.close)
return s, s.listen(listenAddrs)
}
// SetClient assign's the Swarm's client node.
func (s *Swarm) SetClient(n router.Node) {
s.rt.client = n
}
// Close stops a swarm. waits till it exits
func (s *Swarm) Close() error {
return s.cg.Close()
}
// close stops a swarm. It's the underlying function called by ContextGroup
func (s *Swarm) close() error {
// close listeners
for _, list := range s.listeners {
list.Close()
}
// close connections
conn.CloseConns(s.Connections()...)
return nil
}
// Dial connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
//
// For now, Dial uses only TCP. This will be extended.
func (s *Swarm) Dial(peer peer.Peer) (conn.Conn, error) {
if peer.ID().Equal(s.local.ID()) {
return nil, errors.New("Attempted connection to self!")
}
// check if we already have an open connection first
c := s.GetConnection(peer.ID())
if c != nil {
return c, nil
}
// check if we don't have the peer in Peerstore
peer, err := s.peers.Add(peer)
if err != nil {
return nil, err
}
// open connection to peer
d := &conn.Dialer{
LocalPeer: s.local,
Peerstore: s.peers,
}
if len(peer.Addresses()) == 0 {
return nil, errors.New("peer has no addresses")
}
// try to connect to one of the peer's known addresses.
// for simplicity, we do this sequentially.
// A future commit will do this asynchronously.
for _, addr := range peer.Addresses() {
c, err = d.DialAddr(s.cg.Context(), addr, peer)
if err == nil {
break
}
}
if err != nil {
return nil, err
}
c2, err := s.connSetup(context.TODO(), c)
if err != nil {
c.Close()
return nil, err
}
// TODO replace the TODO ctx with a context passed in from caller
log.Event(context.TODO(), "dial", peer)
return c2, nil
}
// GetConnection returns the connection in the swarm to given peer.ID
func (s *Swarm) GetConnection(pid peer.ID) conn.Conn {
sp := s.rt.getByID(pid)
if sp == nil {
return nil
}
return sp.conn
}
// Connections returns a slice of all connections.
func (s *Swarm) Connections() []conn.Conn {
return s.rt.connList()
}
// CloseConnection removes a given peer from swarm + closes the connection
func (s *Swarm) CloseConnection(p peer.Peer) error {
return s.closeConn(p)
}
// GetPeerList returns a copy of the set of peers swarm is connected to.
func (s *Swarm) GetPeerList() []peer.Peer {
return s.rt.peerList()
}
// LocalPeer returns the local peer swarm is associated to.
func (s *Swarm) LocalPeer() peer.Peer {
return s.local
}
// Address returns the address of *this* service.
func (s *Swarm) Address() router.Address {
return "/ipfs/service/swarm" // for now dont need anything more complicated.
}

View File

@ -1,128 +0,0 @@
package swarm
import (
"errors"
"fmt"
conn "github.com/jbenet/go-ipfs/net/conn"
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
// Open listeners for each network the swarm should listen on
func (s *Swarm) listen(addrs []ma.Multiaddr) error {
hasErr := false
retErr := &ListenErr{
Errors: make([]error, len(addrs)),
}
// listen on every address
for i, addr := range addrs {
err := s.connListen(addr)
if err != nil {
hasErr = true
retErr.Errors[i] = err
log.Errorf("Failed to listen on: %s - %s", addr, err)
}
}
if hasErr {
return retErr
}
return nil
}
// Listen for new connections on the given multiaddr
func (s *Swarm) connListen(maddr ma.Multiaddr) error {
resolved, err := resolveUnspecifiedAddresses([]ma.Multiaddr{maddr})
if err != nil {
return err
}
list, err := conn.Listen(s.cg.Context(), maddr, s.local, s.peers)
if err != nil {
return err
}
// add resolved local addresses to peer
for _, addr := range resolved {
s.local.AddAddress(addr)
}
// make sure port can be reused. TOOD this doesn't work...
// if err := setSocketReuse(list); err != nil {
// return err
// }
// NOTE: this may require a lock around it later. currently, only run on setup
s.listeners = append(s.listeners, list)
// Accept and handle new connections on this listener until it errors
// this listener is a child.
s.cg.AddChildFunc(func(parent ctxgroup.ContextGroup) {
for {
select {
case <-parent.Closing():
return
case conn := <-list.Accept():
s.handleIncomingConn(parent.Context(), conn)
}
}
})
return nil
}
// Handle getting ID from this peer, handshake, and adding it into the map
func (s *Swarm) handleIncomingConn(ctx context.Context, nconn conn.Conn) {
// Setup the new connection
_, err := s.connSetup(ctx, nconn)
if err != nil {
log.Errorf("swarm: failed to add incoming connection: %s", err)
log.Event(ctx, "handleIncomingConn failed", s.LocalPeer(), nconn.RemotePeer())
nconn.Close()
}
}
// connSetup takes a new connection, performs the IPFS handshake (handshake3)
// and then adds it to the appropriate MultiConn.
func (s *Swarm) connSetup(ctx context.Context, c conn.Conn) (conn.Conn, error) {
if c == nil {
return nil, errors.New("Tried to start nil connection.")
}
log.Event(ctx, "connSetupBegin", c.LocalPeer(), c.RemotePeer())
// add address of connection to Peer. Maybe it should happen in connSecure.
// NOT adding this address here, because the incoming address in TCP
// is an EPHEMERAL address, and not the address we want to keep around.
// addresses should be figured out through the DHT.
// c.Remote.AddAddress(c.Conn.RemoteMultiaddr())
// handshake3
ctxT, _ := context.WithTimeout(c.Context(), conn.HandshakeTimeout)
h3result, err := conn.Handshake3(ctxT, c)
if err != nil {
c.Close()
return nil, fmt.Errorf("Handshake3 failed: %s", err)
}
// check for nats. you know, just in case.
if h3result.LocalObservedAddress != nil {
s.checkNATWarning(h3result.LocalObservedAddress, c.LocalMultiaddr())
} else {
log.Warningf("Received nil observed address from %s", c.RemotePeer())
}
// add to conns
if err := s.addConn(c); err != nil {
c.Close()
return nil, err
}
log.Event(ctx, "connSetupSuccess", c.LocalPeer(), c.RemotePeer())
return c, nil
}

View File

@ -1,178 +0,0 @@
package swarm
import (
"fmt"
conn "github.com/jbenet/go-ipfs/net/conn"
netmsg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
router "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-router"
)
// MaxConcurrentRequestsPerPeer defines the pipelining that we can do per-peer.
// the networking layer makes sure to provide proper backpressure to the remote
// side by only handling a max number of concurrent requests to completion.
const MaxConcurrentRequestsPerPeer = 20
// swarmPeer represents a connection to the outside world.
// Implements router.Node
type swarmPeer struct {
swarm *Swarm
conn *conn.MultiConn
cg ctxgroup.ContextGroup
}
// newSwarmPeer constructs a new swarmPeer, and starts is worker.
// this doesn't connect it, or add it to the swarm's routing table.
// Implements router.Node
func newSwarmPeer(s *Swarm, p peer.Peer) (*swarmPeer, error) {
c, err := conn.NewMultiConn(s.cg.Context(), s.LocalPeer(), p, nil)
if err != nil {
return nil, fmt.Errorf("Error creating MultiConn: %s", err)
}
sp := &swarmPeer{
swarm: s,
conn: c,
cg: ctxgroup.WithParent(s.cg), // swarmPeer closes when swarm closes.
}
// kicks off the worker.
// ctggroup makes sure swarmPeer doesn't close until this func returns
sp.cg.AddChildFunc(sp.listen)
return sp, nil
}
// LocalPeer returns the local peer
func (sp *swarmPeer) LocalPeer() peer.Peer {
return sp.conn.LocalPeer()
}
// RemotePeer returns the peer we're connected to.
func (sp *swarmPeer) RemotePeer() peer.Peer {
return sp.conn.RemotePeer()
}
// Address is the peer's ID
func (sp *swarmPeer) Address() router.Address {
return sp.RemotePeer()
}
// Close closes the swarmPeer service
func (sp *swarmPeer) Close() error {
return sp.cg.Close()
}
// list to the multiconn and route packets in.
func (sp *swarmPeer) listen(parent ctxgroup.ContextGroup) {
// we listen and pipeline using:
// - 1x listener (this function, the for loop below)
// - 1x pipelining semaphore
// - up to Nx goroutine pipeline workers
// this approach is chosen over N persistent goroutines because
// spawning a goroutine every time is cheaper than keeping N
// additional goroutines all the time, for inactive connections
pipelineSema := make(chan struct{}, MaxConcurrentRequestsPerPeer)
for i := 0; i < MaxConcurrentRequestsPerPeer; i++ {
pipelineSema <- struct{}{}
}
// the sad part of using io is we still need to consume msgs
// using a context-less api, which means we need to use an
// extra goroutine, to make sure we close the connection and
// unlock our blocked listener.
// (the Context just does not mix well with io.ReadWriters)
// - conn.SetDeadline could be explored
go func() {
<-parent.Closing()
sp.conn.Close()
}()
listener := func() {
for {
msg, err := sp.conn.ReadMsg()
// we want this to happen before checking the error, as we may
// be closing (which is not an error). any last message is dropped.
select {
case <-parent.Closing():
return
case <-sp.conn.Closing():
return
default:
}
if err != nil {
log.Errorf("error receiving message from multiconn: %s", err)
continue
}
select {
case <-parent.Closing():
return
case <-pipelineSema: // acquire pipelining resource
go func(m []byte) {
sp.handleIncomingMessage(parent.Context(), m)
pipelineSema <- struct{}{}
}(msg)
}
}
}
// function call so that we can isolate the functionality,
// and so we can call return in loops above and not confuse flow.
listener()
}
func (sp *swarmPeer) handleIncomingMessage(ctx context.Context, msg []byte) {
// handle incoming message message.
// we derive a new context for this incoming request.
ctx, _ = context.WithCancel(ctx)
p := netmsg.Packet{
Src: sp.RemotePeer(),
Dst: sp.swarm.client().Address(),
Data: msg,
Context: ctx,
}
// We also can't yet pass unread io.RW to the clients directly.
// muxado, SPDY, QUIC, and other stream multiplexors could
// make this a breeze.
// this runs the entire request. it should not return until ALL action
// is done. this is so that we rate limit and respond to backpressure well.
// TODO: pipelining (handle up to N concurrent requests).
// doing pipelining with SPDY or muxado is probably TRTTD.
if err := sp.HandlePacket(&p, nil); err != nil {
log.Errorf("error handling incoming request: %v", err)
}
// should be done with the underlying bytes. release (the kraken)!
// TODO: enable this. there is a bug relating to mpool or something. swarm_tests fail.
// sp.conn.ReleaseMsg(msg)
}
func (sp *swarmPeer) HandlePacket(p router.Packet, n router.Node) error {
switch p.Destination() {
case sp.swarm.client().Address(): // incoming
return sp.swarm.HandlePacket(p, sp)
case sp.RemotePeer(): // outgoing
buf, ok := p.Payload().([]byte)
if !ok {
return netmsg.ErrInvalidPayload
}
if err := sp.conn.WriteMsg(buf); err != nil {
return fmt.Errorf("swarmPeer error sending: %s", err)
}
return nil
default: // problem
return fmt.Errorf("swarmPeer routing error: %v got %v", sp, p)
}
}

View File

@ -1,159 +0,0 @@
package swarm
import (
"sync"
conn "github.com/jbenet/go-ipfs/net/conn"
netmsg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
router "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-router"
)
// swarmRoutingTable collects the peers
type swarmRoutingTable struct {
local peer.Peer
client router.Node
peers map[u.Key]*swarmPeer
sync.RWMutex
}
func newRoutingTable(local peer.Peer, client router.Node) *swarmRoutingTable {
return &swarmRoutingTable{
local: local,
client: client,
peers: map[u.Key]*swarmPeer{},
}
}
func (rt *swarmRoutingTable) getOrAdd(s *Swarm, p peer.Peer) (*swarmPeer, error) {
rt.Lock()
defer rt.Unlock()
sp, ok := rt.peers[p.Key()]
if ok {
return sp, nil
}
// newSwarmPeer is what kicks off the reader goroutines.
sp, err := newSwarmPeer(s, p)
if err != nil {
return nil, err
}
rt.peers[p.Key()] = sp
return sp, nil
}
func (rt *swarmRoutingTable) remove(p peer.Peer) *swarmPeer {
rt.Lock()
defer rt.Unlock()
sp, ok := rt.peers[p.Key()]
if ok {
delete(rt.peers, p.Key())
}
return sp
}
func (rt *swarmRoutingTable) getByID(pid peer.ID) *swarmPeer {
rt.RLock()
defer rt.RUnlock()
return rt.peers[u.Key(pid)]
}
func (rt *swarmRoutingTable) get(p peer.Peer) *swarmPeer {
rt.RLock()
defer rt.RUnlock()
return rt.peers[p.Key()]
}
func (rt *swarmRoutingTable) connList() []conn.Conn {
rt.RLock()
defer rt.RUnlock()
var out []conn.Conn
for _, sp := range rt.peers {
out = append(out, sp.conn)
}
return out
}
func (rt *swarmRoutingTable) peerList() []peer.Peer {
rt.RLock()
defer rt.RUnlock()
var out []peer.Peer
for _, sp := range rt.peers {
out = append(out, sp.RemotePeer())
}
return out
}
// Route implements routing.Route
func (rt *swarmRoutingTable) Route(p router.Packet) router.Node {
// no need to lock :)
if p.Destination() == rt.client.Address() {
// log.Debugf("%s swarmRoutingTable route %s to client %s ? ", p.Destination(), rt.client.Address(), p.Payload())
return rt.client
}
rt.RLock()
defer rt.RUnlock()
for _, sp := range rt.peers {
if sp.RemotePeer() == p.Destination() {
// log.Debugf("%s swarmRoutingTable route %s to peer %s ? ", p.Destination(), sp.RemotePeer(), p.Payload())
return sp
}
}
return nil // no route
}
func (s *Swarm) client() router.Node {
return s.rt.client
}
func (s *Swarm) addConn(c conn.Conn) error {
sp, err := s.rt.getOrAdd(s, c.RemotePeer())
if err != nil {
return err
}
sp.conn.Add(c)
return nil
}
func (s *Swarm) closeConn(p peer.Peer) error {
sp := s.rt.remove(p)
if sp == nil {
return nil
}
return sp.Close()
}
// HandlePacket routes messages out through connections, or to the client
func (s *Swarm) HandlePacket(p router.Packet, from router.Node) error {
msg, ok := p.Payload().([]byte)
if !ok {
return netmsg.ErrInvalidPayload
}
if len(msg) >= conn.MaxMessageSize {
log.Criticalf("Attempted to send message bigger than max size. (%d)", len(msg))
}
next := s.rt.Route(p)
if next == nil {
// log.Debugf("%s swarm HandlePacket %s -> %s -> %s -> %s: %s",
// s.local, from.Address(), s.Address(), "????", p.Destination(), p.Payload())
return router.ErrNoRoute
}
// log.Debugf("%s swarm HandlePacket %s -> %s -> %s -> %s: %s",
// s.local, from.Address(), s.Address(), next.Address(), p.Destination(), p.Payload())
return next.HandlePacket(p, s)
}

View File

@ -1,252 +0,0 @@
package swarm
import (
"bytes"
"sync"
"testing"
"time"
ci "github.com/jbenet/go-ipfs/crypto"
netmsg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
testutil "github.com/jbenet/go-ipfs/util/testutil"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
router "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-router"
)
// needed to copy the data. otherwise gets reused... :/
type queueClient struct {
addr router.Address
queue chan *netmsg.Packet
}
func newQueueClient(addr router.Address) *queueClient {
return &queueClient{addr, make(chan *netmsg.Packet, 20)}
}
func (qc *queueClient) Address() router.Address {
return qc.addr
}
func (qc *queueClient) HandlePacket(p router.Packet, n router.Node) error {
pkt1 := p.(*netmsg.Packet)
pkt2 := netmsg.Packet{}
pkt2 = *pkt1
pkt2.Data = make([]byte, len(pkt1.Data))
copy(pkt2.Data, pkt1.Data)
qc.queue <- &pkt2
return nil
}
type pongClient struct {
peer peer.Peer
count int
queue chan pongPkt
}
type pongPkt struct {
msg netmsg.Packet
dst router.Node
}
func newPongClient(ctx context.Context, peer peer.Peer) *pongClient {
pc := &pongClient{peer: peer, queue: make(chan pongPkt, 10)}
go pc.echo(ctx)
return pc
}
func (pc *pongClient) Address() router.Address {
return pc.peer.ID().Pretty() + "/pong"
}
func (pc *pongClient) HandlePacket(p router.Packet, n router.Node) error {
pkt1 := p.(*netmsg.Packet)
if !bytes.Equal(pkt1.Data, []byte("ping")) {
log.Debugf("%s pong dropped pkt: %s (%s -> %s)", pc.Address(), pkt1.Data, pkt1.Src, pkt1.Dst)
panic("why")
return nil // drop
}
pc.queue <- pongPkt{pkt1.Response([]byte("pong")), n}
return nil
}
func (pc *pongClient) echo(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case pkt := <-pc.queue:
pc.count++
log.Debugf("%s pong %s (%d)", pkt.msg.Src, pkt.msg.Dst, pc.count)
if err := pkt.dst.HandlePacket(&pkt.msg, pc); err != nil {
log.Errorf("pong error sending: %s", err)
}
}
}
}
func setupPeer(t *testing.T, addr string) peer.Peer {
tcp, err := ma.NewMultiaddr(addr)
if err != nil {
t.Fatal(err)
}
sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512)
if err != nil {
t.Fatal(err)
}
p, err := testutil.NewPeerWithKeyPair(sk, pk)
if err != nil {
t.Fatal(err)
}
p.AddAddress(tcp)
return p
}
func makeSwarms(ctx context.Context, t *testing.T, addrs []string) ([]*Swarm, []peer.Peer) {
swarms := []*Swarm{}
for _, addr := range addrs {
local := setupPeer(t, addr)
peerstore := peer.NewPeerstore()
pong := newPongClient(ctx, local)
swarm, err := NewSwarm(ctx, local.Addresses(), local, peerstore, pong)
if err != nil {
t.Fatal(err)
}
swarms = append(swarms, swarm)
}
peers := make([]peer.Peer, len(swarms))
for i, s := range swarms {
peers[i] = s.local
}
return swarms, peers
}
func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) {
// t.Skip("skipping for another test")
ctx := context.Background()
swarms, peers := makeSwarms(ctx, t, addrs)
// connect everyone
{
var wg sync.WaitGroup
connect := func(s *Swarm, dst peer.Peer) {
// copy for other peer
cp, err := s.peers.FindOrCreate(dst.ID())
if err != nil {
t.Fatal(err)
}
cp.AddAddress(dst.Addresses()[0])
log.Infof("SWARM TEST: %s dialing %s", s.local, dst)
if _, err := s.Dial(cp); err != nil {
t.Fatal("error swarm dialing to peer", err)
}
log.Infof("SWARM TEST: %s connected to %s", s.local, dst)
wg.Done()
}
log.Info("Connecting swarms simultaneously.")
for _, s := range swarms {
for _, p := range peers {
if p != s.local { // don't connect to self.
wg.Add(1)
connect(s, p)
}
}
}
wg.Wait()
for _, s := range swarms {
log.Infof("%s swarm routing table: %s", s.local, s.GetPeerList())
}
}
// ping/pong
for _, s1 := range swarms {
log.Debugf("-------------------------------------------------------")
log.Debugf("%s ping pong round", s1.local)
log.Debugf("-------------------------------------------------------")
// for this test, we'll listen on s1.
queue := newQueueClient(s1.client().Address())
pong := s1.client() // set it back at the end.
s1.SetClient(queue)
ctx, cancel := context.WithCancel(ctx)
peers, err := s1.peers.All()
if err != nil {
t.Fatal(err)
}
for k := 0; k < MsgNum; k++ {
for _, p := range *peers {
log.Debugf("%s ping %s (%d)", s1.local, p, k)
pkt := netmsg.Packet{Src: s1.local, Dst: p, Data: []byte("ping"), Context: ctx}
s1.HandlePacket(&pkt, queue)
}
}
got := map[u.Key]int{}
for k := 0; k < (MsgNum * len(*peers)); k++ {
log.Debugf("%s waiting for pong (%d)", s1.local, k)
msg := <-queue.queue
if string(msg.Data) != "pong" {
t.Error("unexpected conn output", string(msg.Data), msg.Data)
}
p := msg.Src.(peer.Peer)
n, _ := got[p.Key()]
got[p.Key()] = n + 1
}
log.Debugf("%s got pongs", s1.local)
if len(*peers) != len(got) {
t.Error("got less messages than sent")
}
for p, n := range got {
if n != MsgNum {
t.Error("peer did not get all msgs", p, n, "/", MsgNum)
}
}
cancel()
<-time.After(10 * time.Millisecond)
s1.SetClient(pong)
}
for _, s := range swarms {
s.Close()
}
}
func TestSwarm(t *testing.T) {
// t.Skip("skipping for another test")
addrs := []string{
"/ip4/127.0.0.1/tcp/10234",
"/ip4/127.0.0.1/tcp/10235",
"/ip4/127.0.0.1/tcp/10236",
"/ip4/127.0.0.1/tcp/10237",
"/ip4/127.0.0.1/tcp/10238",
}
// msgs := 1000
msgs := 100
SubtestSwarm(t, addrs, msgs)
}