From fd3cd399e51e2cd5bf41c82834eddfef2d4cca29 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Tue, 16 Dec 2014 11:27:40 -0800 Subject: [PATCH] mock network --- net/mock/mock.go | 291 ++++++++++++++++++++++++++++++++++++++++++ net/mock/mock_test.go | 103 +++++++++++++++ net/net.go | 6 +- 3 files changed, 397 insertions(+), 3 deletions(-) create mode 100644 net/mock/mock.go create mode 100644 net/mock/mock_test.go diff --git a/net/mock/mock.go b/net/mock/mock.go new file mode 100644 index 000000000..955a19c47 --- /dev/null +++ b/net/mock/mock.go @@ -0,0 +1,291 @@ +// Package mocknet provides a mock net.Network to test with. +package mocknet + +import ( + "io" + "sync" + + inet "github.com/jbenet/go-ipfs/net" + peer "github.com/jbenet/go-ipfs/peer" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup" + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" +) + +type Stream struct { + io.Reader + io.Writer + conn *Conn +} + +func (s *Stream) Close() error { + s.conn.removeStream(s) + if r, ok := (s.Reader).(io.Closer); ok { + r.Close() + } + if w, ok := (s.Writer).(io.Closer); ok { + return w.Close() + } + return nil +} + +func (s *Stream) Conn() inet.Conn { + return s.conn +} + +// wire pipe between two network conns. yay io. +func newStreamPair(n1 *Network, p2 peer.Peer) (*Stream, *Stream) { + p1 := n1.local + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + s1 := &Stream{Reader: r1, Writer: w2} + s2 := &Stream{Reader: r2, Writer: w1} + + n1.Lock() + n1.conns[p2].addStream(s1) + n2 := n1.conns[p2].remote + n1.Unlock() + + n2.Lock() + n2.conns[p1].addStream(s2) + n2.Unlock() + n2.handle(s2) + + return s1, s2 +} + +type Conn struct { + connected bool + local *Network + remote *Network + streams []*Stream + sync.RWMutex +} + +func (c *Conn) Close() error { + c.Lock() + defer c.Unlock() + + c.connected = false + for _, s := range c.streams { + go s.Close() + } + c.streams = nil + return nil +} + +func (c *Conn) addStream(s *Stream) { + c.Lock() + defer c.Unlock() + + s.conn = c + c.streams = append(c.streams, s) +} + +func (c *Conn) removeStream(s *Stream) { + c.Lock() + defer c.Unlock() + + strs := make([]*Stream, 0, len(c.streams)) + for _, s2 := range c.streams { + if s2 != s { + strs = append(strs, s2) + } + } +} + +func (c *Conn) NewStreamWithProtocol(pr inet.ProtocolID, p peer.Peer) (inet.Stream, error) { + + ss, _ := newStreamPair(c.local, p) + + if err := inet.WriteProtocolHeader(pr, ss); err != nil { + ss.Close() + return nil, err + } + + return ss, nil +} + +// LocalMultiaddr is the Multiaddr on this side +func (c *Conn) LocalMultiaddr() ma.Multiaddr { + return nil +} + +// LocalPeer is the Peer on our side of the connection +func (c *Conn) LocalPeer() peer.Peer { + return c.local.local +} + +// RemoteMultiaddr is the Multiaddr on the remote side +func (c *Conn) RemoteMultiaddr() ma.Multiaddr { + return nil +} + +// RemotePeer is the Peer on the remote side +func (c *Conn) RemotePeer() peer.Peer { + return c.remote.local +} + +// network implements the Network interface, +type Network struct { + local peer.Peer // local peer + mux inet.Mux // protocol multiplexing + + conns map[peer.Peer]*Conn + sync.RWMutex + + cg ctxgroup.ContextGroup // for Context closing +} + +func MakeNetworks(ctx context.Context, peers []peer.Peer) (nets []*Network, err error) { + nets = make([]*Network, len(peers)) + for i, p := range peers { + ps := peer.NewPeerstore() + nets[i], err = newNetwork(ctx, p, ps) + if err != nil { + return nil, err + } + } + + for _, n1 := range nets { + for _, n2 := range nets { + if n1 == n2 { + continue + } + + n1.conns[n2.local] = &Conn{local: n1, remote: n2} + } + } + + return nets, nil +} + +// NewNetwork constructs a new Mock network +func newNetwork(ctx context.Context, local peer.Peer, peers peer.Peerstore) (*Network, error) { + + n := &Network{ + local: local, + mux: inet.Mux{Handlers: inet.StreamHandlerMap{}}, + cg: ctxgroup.WithContext(ctx), + conns: map[peer.Peer]*Conn{}, + } + + n.cg.SetTeardown(n.close) + return n, nil +} + +func (n *Network) handle(s inet.Stream) { + go n.mux.Handle(s) +} + +// DialPeer attempts to establish a connection to a given peer. +// Respects the context. +func (n *Network) DialPeer(ctx context.Context, p peer.Peer) error { + n.Lock() + defer n.Unlock() + + n.conns[p].connected = true + return nil +} + +// CtxGroup returns the network's ContextGroup +func (n *Network) CtxGroup() ctxgroup.ContextGroup { + return n.cg +} + +// LocalPeer the network's LocalPeer +func (n *Network) LocalPeer() peer.Peer { + return n.local +} + +// Peers returns the connected peers +func (n *Network) Peers() []peer.Peer { + n.RLock() + defer n.RUnlock() + + peers := make([]peer.Peer, 0, len(n.conns)) + for _, c := range n.conns { + if c.connected { + peers = append(peers, c.RemotePeer()) + } + } + return peers +} + +// Conns returns the connected peers +func (n *Network) Conns() []inet.Conn { + n.RLock() + defer n.RUnlock() + + out := make([]inet.Conn, 0, len(n.conns)) + for _, c := range n.conns { + if c.connected { + out = append(out, c) + } + } + return out +} + +// ClosePeer connection to peer +func (n *Network) ClosePeer(p peer.Peer) error { + return n.conns[p].Close() +} + +// close is the real teardown function +func (n *Network) close() error { + for _, c := range n.conns { + c.Close() + } + return nil +} + +// Close calls the ContextCloser func +func (n *Network) Close() error { + return n.cg.Close() +} + +// BandwidthTotals returns the total amount of bandwidth transferred +func (n *Network) BandwidthTotals() (in uint64, out uint64) { + // need to implement this. probably best to do it in swarm this time. + // need a "metrics" object + return 0, 0 +} + +// ListenAddresses returns a list of addresses at which this network listens. +func (n *Network) ListenAddresses() []ma.Multiaddr { + return []ma.Multiaddr{} +} + +// InterfaceListenAddresses returns a list of addresses at which this network +// listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to +// use the known local interfaces. +func (n *Network) InterfaceListenAddresses() ([]ma.Multiaddr, error) { + return []ma.Multiaddr{}, nil +} + +// Connectedness returns a state signaling connection capabilities +// For now only returns Connecter || NotConnected. Expand into more later. +func (n *Network) Connectedness(p peer.Peer) inet.Connectedness { + n.Lock() + defer n.Unlock() + + if _, found := n.conns[p]; found && n.conns[p].connected { + return inet.Connected + } + return inet.NotConnected +} + +// NewStream returns a new stream to given peer p. +// If there is no connection to p, attempts to create one. +// If ProtocolID is "", writes no header. +func (c *Network) NewStream(pr inet.ProtocolID, p peer.Peer) (inet.Stream, error) { + return c.conns[p].NewStreamWithProtocol(pr, p) +} + +// SetHandler sets the protocol handler on the Network's Muxer. +// This operation is threadsafe. +func (n *Network) SetHandler(p inet.ProtocolID, h inet.StreamHandler) { + n.mux.SetHandler(p, h) +} diff --git a/net/mock/mock_test.go b/net/mock/mock_test.go new file mode 100644 index 000000000..77ee67204 --- /dev/null +++ b/net/mock/mock_test.go @@ -0,0 +1,103 @@ +package mocknet + +import ( + "bytes" + "io" + "testing" + + inet "github.com/jbenet/go-ipfs/net" + peer "github.com/jbenet/go-ipfs/peer" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + testutil "github.com/jbenet/go-ipfs/util/testutil" +) + +func TestNetworkSetup(t *testing.T) { + + p1 := testutil.RandPeer() + p2 := testutil.RandPeer() + p3 := testutil.RandPeer() + peers := []peer.Peer{p1, p2, p3} + + nets, err := MakeNetworks(context.Background(), peers) + if err != nil { + t.Fatal(err) + } + + // check things + + if len(nets) != 3 { + t.Error("nets must be 3") + } + + for i, n := range nets { + if n.local != peers[i] { + t.Error("peer mismatch") + } + + if len(n.conns) != (len(nets) - 1) { + t.Error("conn mismatch") + } + + for _, c := range n.conns { + if c.remote.local == n.local { + t.Error("conn to self") + } + + if c.remote.conns[n.local] == nil { + t.Error("conn other side fail") + } + if c.remote.conns[n.local].remote.local != n.local { + t.Error("conn other side fail") + } + } + + } + +} + +func TestStreams(t *testing.T) { + + p1 := testutil.RandPeer() + p2 := testutil.RandPeer() + p3 := testutil.RandPeer() + peers := []peer.Peer{p1, p2, p3} + + nets, err := MakeNetworks(context.Background(), peers) + if err != nil { + t.Fatal(err) + } + + nets[1].SetHandler(inet.ProtocolDHT, func(s inet.Stream) { + go func() { + b := make([]byte, 4) + if _, err := io.ReadFull(s, b); err != nil { + panic(err) + } + if !bytes.Equal(b, []byte("beep")) { + panic("bytes mismatch") + } + if _, err := s.Write([]byte("boop")); err != nil { + panic(err) + } + s.Close() + }() + }) + + s, err := nets[0].NewStream(inet.ProtocolDHT, nets[1].local) + if err != nil { + t.Fatal(err) + } + + if _, err := s.Write([]byte("beep")); err != nil { + panic(err) + } + b := make([]byte, 4) + if _, err := io.ReadFull(s, b); err != nil { + panic(err) + } + if !bytes.Equal(b, []byte("boop")) { + panic("bytes mismatch 2") + } + +} diff --git a/net/net.go b/net/net.go index f55bc3b56..e27edc94e 100644 --- a/net/net.go +++ b/net/net.go @@ -52,7 +52,7 @@ func (c *conn_) NewStreamWithProtocol(pr ProtocolID, p peer.Peer) (Stream, error ss := (*stream)(s) - if err := writeProtocolHeader(pr, ss); err != nil { + if err := WriteProtocolHeader(pr, ss); err != nil { ss.Close() return nil, err } @@ -209,7 +209,7 @@ func (c *network) NewStream(pr ProtocolID, p peer.Peer) (Stream, error) { ss := (*stream)(s) - if err := writeProtocolHeader(pr, ss); err != nil { + if err := WriteProtocolHeader(pr, ss); err != nil { ss.Close() return nil, err } @@ -223,7 +223,7 @@ func (n *network) SetHandler(p ProtocolID, h StreamHandler) { n.mux.SetHandler(p, h) } -func writeProtocolHeader(pr ProtocolID, s Stream) error { +func WriteProtocolHeader(pr ProtocolID, s Stream) error { if pr != "" { // only write proper protocol headers if err := WriteLengthPrefix(s, string(pr)); err != nil { return err