diff --git a/core/commands/p2p.go b/core/commands/p2p.go index 6046ce453..459923ac2 100644 --- a/core/commands/p2p.go +++ b/core/commands/p2p.go @@ -88,7 +88,7 @@ Examples: cmdkit.StringArg("target-address", true, false, "Target endpoint."), }, Run: func(req cmds.Request, res cmds.Response) { - n, err := getNode(req) + n, err := p2pGetNode(req) if err != nil { res.SetError(err, cmdkit.ErrNormal) return @@ -164,7 +164,7 @@ var p2pLsCmd = &cmds.Command{ cmdkit.BoolOption("headers", "v", "Print table headers (Protocol, Listen, Target)."), }, Run: func(req cmds.Request, res cmds.Response) { - n, err := getNode(req) + n, err := p2pGetNode(req) if err != nil { res.SetError(err, cmdkit.ErrNormal) return @@ -221,7 +221,7 @@ var p2pCloseCmd = &cmds.Command{ Run: func(req cmds.Request, res cmds.Response) { res.SetOutput(nil) - n, err := getNode(req) + n, err := p2pGetNode(req) if err != nil { res.SetError(err, cmdkit.ErrNormal) return @@ -244,6 +244,10 @@ var p2pCloseCmd = &cmds.Command{ match := func(listener p2p.Listener) bool { out := true + if p || !strings.HasPrefix(proto, "/p2p/") { + proto = "/p2p/" + proto + } + if p { out = out && (proto == listener.Protocol()) } @@ -258,12 +262,30 @@ var p2pCloseCmd = &cmds.Command{ return out } + var closed int for _, listener := range n.P2P.Listeners.Listeners { if !match(listener) { continue } listener.Close() + closed++ } + res.SetOutput(closed) + }, + Type: int(0), + Marshalers: cmds.MarshalerMap{ + cmds.Text: func(res cmds.Response) (io.Reader, error) { + v, err := unwrapOutput(res.Output()) + if err != nil { + return nil, err + } + + closed := v.(int) + buf := new(bytes.Buffer) + fmt.Fprintf(buf, "Closed %d stream(s)\n", closed) + + return buf, nil + }, }, } @@ -292,7 +314,7 @@ var p2pStreamLsCmd = &cmds.Command{ cmdkit.BoolOption("headers", "v", "Print table headers (HagndlerID, Protocol, Local, Remote)."), }, Run: func(req cmds.Request, res cmds.Response) { - n, err := getNode(req) + n, err := p2pGetNode(req) if err != nil { res.SetError(err, cmdkit.ErrNormal) return @@ -352,7 +374,7 @@ var p2pStreamCloseCmd = &cmds.Command{ Run: func(req cmds.Request, res cmds.Response) { res.SetOutput(nil) - n, err := getNode(req) + n, err := p2pGetNode(req) if err != nil { res.SetError(err, cmdkit.ErrNormal) return @@ -386,7 +408,7 @@ var p2pStreamCloseCmd = &cmds.Command{ }, } -func getNode(req cmds.Request) (*core.IpfsNode, error) { +func p2pGetNode(req cmds.Request) (*core.IpfsNode, error) { n, err := req.InvocContext().GetNode() if err != nil { return nil, err diff --git a/p2p/listener.go b/p2p/listener.go index 972e97880..2fca81f3d 100644 --- a/p2p/listener.go +++ b/p2p/listener.go @@ -2,6 +2,8 @@ package p2p import ( "sync" + + "github.com/pkg/errors" ) type Listener interface { @@ -25,9 +27,22 @@ type ListenerRegistry struct { lk *sync.Mutex } +func (r *ListenerRegistry) Lock(l Listener) error { + r.lk.Lock() + + if _, ok := r.Listeners[getListenerKey(l)]; ok { + r.lk.Unlock() + return errors.New("listener already registered") + } + return nil +} + +func (r *ListenerRegistry) Unlock() { + r.lk.Unlock() +} + // Register registers listenerInfo in this registry func (r *ListenerRegistry) Register(l Listener) { - r.lk.Lock() defer r.lk.Unlock() r.Listeners[getListenerKey(l)] = l diff --git a/p2p/local.go b/p2p/local.go index acb23ba08..c34cc7031 100644 --- a/p2p/local.go +++ b/p2p/local.go @@ -20,6 +20,7 @@ type localListener struct { id peer.ID proto string + laddr ma.Multiaddr peer peer.ID listener manet.Listener @@ -27,11 +28,6 @@ type localListener struct { // ForwardLocal creates new P2P stream to a remote listener func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto string, bindAddr ma.Multiaddr) (Listener, error) { - maListener, err := manet.Listen(bindAddr) - if err != nil { - return nil, err - } - listener := &localListener{ ctx: ctx, @@ -39,11 +35,22 @@ func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto string, bi id: p2p.identity, proto: proto, + laddr: bindAddr, peer: peer, - - listener: maListener, } + if err := p2p.Listeners.Lock(listener); err != nil { + return nil, err + } + + maListener, err := manet.Listen(bindAddr) + if err != nil { + p2p.Listeners.Unlock() + return nil, err + } + + listener.listener = maListener + p2p.Listeners.Register(listener) go listener.acceptConns() @@ -109,7 +116,7 @@ func (l *localListener) Protocol() string { } func (l *localListener) ListenAddress() string { - return l.listener.Multiaddr().String() + return l.laddr.String() } func (l *localListener) TargetAddress() string { diff --git a/p2p/remote.go b/p2p/remote.go index 3d32f700e..57f4efe9c 100644 --- a/p2p/remote.go +++ b/p2p/remote.go @@ -22,14 +22,16 @@ type remoteListener struct { // ForwardRemote creates new p2p listener func (p2p *P2P) ForwardRemote(ctx context.Context, proto string, addr ma.Multiaddr) (Listener, error) { - listenerInfo := &remoteListener{ + listener := &remoteListener{ p2p: p2p, proto: proto, addr: addr, } - p2p.Listeners.Register(listenerInfo) + if err := p2p.Listeners.Lock(listener); err != nil { + return nil, err + } p2p.peerHost.SetStreamHandler(protocol.ID(proto), func(remote net.Stream) { local, err := manet.Dial(addr) @@ -38,10 +40,17 @@ func (p2p *P2P) ForwardRemote(ctx context.Context, proto string, addr ma.Multiad return } + //TODO: review: is there a better way to do this? + peerMa, err := ma.NewMultiaddr("/ipfs/" + remote.Conn().RemotePeer().Pretty()) + if err != nil { + remote.Reset() + return + } + stream := &Stream{ Protocol: proto, - OriginAddr: remote.Conn().RemoteMultiaddr(), + OriginAddr: peerMa, TargetAddr: addr, Local: local, @@ -54,7 +63,9 @@ func (p2p *P2P) ForwardRemote(ctx context.Context, proto string, addr ma.Multiad stream.startStreaming() }) - return listenerInfo, nil + p2p.Listeners.Register(listener) + + return listener, nil } func (l *remoteListener) Protocol() string { diff --git a/test/sharness/t0180-p2p.sh b/test/sharness/t0180-p2p.sh index a11ab35f1..71b8b5003 100755 --- a/test/sharness/t0180-p2p.sh +++ b/test/sharness/t0180-p2p.sh @@ -20,11 +20,13 @@ test_expect_success 'peer ids' ' PEERID_0=$(iptb get id 0) && PEERID_1=$(iptb get id 1) ' - -test_expect_success "test ports are closed" ' - (! (netstat -ln | grep "LISTEN" | grep ":10101 ")) && - (! (netstat -ln | grep "LISTEN" | grep ":10102 ")) -' +check_test_ports() { + test_expect_success "test ports are closed" ' + (! (netstat -lnp | grep "LISTEN" | grep ":10101 ")) && + (! (netstat -lnp | grep "LISTEN" | grep ":10102 ")) + ' +} +check_test_ports test_expect_success 'fail without config option being enabled' ' test_must_fail ipfsi 0 p2p stream ls @@ -36,51 +38,105 @@ test_expect_success "enable filestore config setting" ' ' test_expect_success 'start p2p listener' ' - ipfsi 0 p2p listener open p2p-test /ip4/127.0.0.1/tcp/10101 2>&1 > listener-stdouterr.log + ipfsi 0 p2p forward p2p-test /ipfs /ip4/127.0.0.1/tcp/10101 2>&1 > listener-stdouterr.log ' -test_expect_success 'Test server to client communications' ' - ma-pipe-unidir --listen --pidFile=listener.pid send /ip4/127.0.0.1/tcp/10101 < test0.bin & +# Server to client communications - test_wait_for_file 30 100ms listener.pid && - kill -0 $(cat listener.pid) && +spawn_sending_server() { + test_expect_success 'S->C Spawn sending server' ' + ma-pipe-unidir --listen --pidFile=listener.pid send /ip4/127.0.0.1/tcp/10101 < test0.bin & - ipfsi 1 p2p stream dial $PEERID_0 p2p-test /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log && - ma-pipe-unidir recv /ip4/127.0.0.1/tcp/10102 > client.out && - test ! -f listener.pid + test_wait_for_file 30 100ms listener.pid && + kill -0 $(cat listener.pid) + ' +} + +test_server_to_client() { + test_expect_success 'S->C Connect and receive data' ' + ma-pipe-unidir recv /ip4/127.0.0.1/tcp/10102 > client.out + ' + + test_expect_success 'S->C Ensure server finished' ' + test ! -f listener.pid + ' + + test_expect_success 'S->C Output looks good' ' + test_cmp client.out test0.bin + ' +} + +spawn_sending_server + +test_expect_success 'S->C Setup client side' ' + ipfsi 1 p2p forward p2p-test /ip4/127.0.0.1/tcp/10102 /ipfs/${PEERID_0} 2>&1 > dialer-stdouterr.log ' -test_expect_success 'Test client to server communications' ' +test_server_to_client + +test_expect_success 'S->C Connect with dead server' ' + ma-pipe-unidir recv /ip4/127.0.0.1/tcp/10102 > client.out +' + +test_expect_success 'S->C Output is empty' ' + test_must_be_empty client.out +' + +spawn_sending_server + +test_server_to_client + +test_expect_success 'S->C Close local listener' ' + ipfsi 1 p2p close -p p2p-test +' + +check_test_ports + +# Client to server communications + +test_expect_success 'C->S Spawn receiving server' ' ma-pipe-unidir --listen --pidFile=listener.pid recv /ip4/127.0.0.1/tcp/10101 > server.out & test_wait_for_file 30 100ms listener.pid && - kill -0 $(cat listener.pid) && + kill -0 $(cat listener.pid) +' - ipfsi 1 p2p stream dial $PEERID_0 p2p-test /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log && - ma-pipe-unidir send /ip4/127.0.0.1/tcp/10102 < test1.bin && +test_expect_success 'C->S Setup client side' ' + ipfsi 1 p2p forward p2p-test /ip4/127.0.0.1/tcp/10102 /ipfs/${PEERID_0} 2>&1 > dialer-stdouterr.log +' + +test_expect_success 'C->S Connect and receive data' ' + ma-pipe-unidir send /ip4/127.0.0.1/tcp/10102 < test1.bin +' + +test_expect_success 'C->S Ensure server finished' ' go-sleep 250ms && test ! -f listener.pid ' -test_expect_success 'server to client output looks good' ' - test_cmp client.out test0.bin -' - -test_expect_success 'client to server output looks good' ' +test_expect_success 'C->S Output looks good' ' test_cmp server.out test1.bin ' -test_expect_success "'ipfs listener p2p ls' succeeds" ' - echo "/ip4/127.0.0.1/tcp/10101 /p2p/p2p-test" > expected && - ipfsi 0 p2p listener ls > actual +test_expect_success 'C->S Close local listener' ' + ipfsi 1 p2p close -p p2p-test ' -test_expect_success "'ipfs p2p listener ls' output looks good" ' +check_test_ports + +# Listing streams + +test_expect_success "'ipfs p2p ls' succeeds" ' + echo "/p2p/p2p-test /ipfs /ip4/127.0.0.1/tcp/10101" > expected && + ipfsi 0 p2p ls > actual +' + +test_expect_success "'ipfs p2p ls' output looks good" ' test_cmp expected actual ' test_expect_success "Cannot re-register app handler" ' - (! ipfsi 0 p2p listener open p2p-test /ip4/127.0.0.1/tcp/10101) + test_must_fail ipfsi 0 p2p forward p2p-test /ipfs /ip4/127.0.0.1/tcp/10101 ' test_expect_success "'ipfs p2p stream ls' output is empty" ' @@ -91,7 +147,7 @@ test_expect_success "'ipfs p2p stream ls' output is empty" ' test_expect_success "Setup: Idle stream" ' ma-pipe-unidir --listen --pidFile=listener.pid recv /ip4/127.0.0.1/tcp/10101 & - ipfsi 1 p2p stream dial $PEERID_0 p2p-test /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log && + ipfsi 1 p2p forward p2p-test /ip4/127.0.0.1/tcp/10102 /ipfs/$PEERID_0 2>&1 > dialer-stdouterr.log && ma-pipe-unidir --pidFile=client.pid recv /ip4/127.0.0.1/tcp/10102 & test_wait_for_file 30 100ms listener.pid && @@ -100,7 +156,7 @@ test_expect_success "Setup: Idle stream" ' ' test_expect_success "'ipfs p2p stream ls' succeeds" ' - echo "2 /p2p/p2p-test /ip4/127.0.0.1/tcp/10101 $PEERID_1" > expected + echo "3 /p2p/p2p-test /ipfs/$PEERID_1 /ip4/127.0.0.1/tcp/10101" > expected ipfsi 0 p2p stream ls > actual ' @@ -109,23 +165,31 @@ test_expect_success "'ipfs p2p stream ls' output looks good" ' ' test_expect_success "'ipfs p2p stream close' closes stream" ' - ipfsi 0 p2p stream close 2 && + ipfsi 0 p2p stream close 3 && ipfsi 0 p2p stream ls > actual && [ ! -f listener.pid ] && [ ! -f client.pid ] && test_must_be_empty actual ' -test_expect_success "'ipfs p2p listener close' closes app handler" ' - ipfsi 0 p2p listener close p2p-test && - ipfsi 0 p2p listener ls > actual && +test_expect_success "'ipfs p2p close' closes remote handler" ' + ipfsi 0 p2p close -p p2p-test && + ipfsi 0 p2p ls > actual && test_must_be_empty actual ' +test_expect_success "'ipfs p2p close' closes local handler" ' + ipfsi 1 p2p close -p p2p-test && + ipfsi 1 p2p ls > actual && + test_must_be_empty actual +' + +check_test_ports + test_expect_success "Setup: Idle stream(2)" ' ma-pipe-unidir --listen --pidFile=listener.pid recv /ip4/127.0.0.1/tcp/10101 & - ipfsi 0 p2p listener open p2p-test2 /ip4/127.0.0.1/tcp/10101 2>&1 > listener-stdouterr.log && - ipfsi 1 p2p stream dial $PEERID_0 p2p-test2 /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log && + ipfsi 0 p2p forward p2p-test2 /ipfs /ip4/127.0.0.1/tcp/10101 2>&1 > listener-stdouterr.log && + ipfsi 1 p2p forward p2p-test2 /ip4/127.0.0.1/tcp/10102 /ipfs/$PEERID_0 2>&1 > dialer-stdouterr.log && ma-pipe-unidir --pidFile=client.pid recv /ip4/127.0.0.1/tcp/10102 & test_wait_for_file 30 100ms listener.pid && @@ -134,14 +198,20 @@ test_expect_success "Setup: Idle stream(2)" ' ' test_expect_success "'ipfs p2p stream ls' succeeds(2)" ' - echo "3 /p2p/p2p-test2 /ip4/127.0.0.1/tcp/10101 $PEERID_1" > expected + echo "4 /p2p/p2p-test2 /ipfs/$PEERID_1 /ip4/127.0.0.1/tcp/10101" > expected ipfsi 0 p2p stream ls > actual test_cmp expected actual ' -test_expect_success "'ipfs p2p listener close -a' closes app handlers" ' - ipfsi 0 p2p listener close -a && - ipfsi 0 p2p listener ls > actual && +test_expect_success "'ipfs p2p close -a' closes remote app handlers" ' + ipfsi 0 p2p close -a && + ipfsi 0 p2p ls > actual && + test_must_be_empty actual +' + +test_expect_success "'ipfs p2p close -a' closes local app handlers" ' + ipfsi 1 p2p close -a && + ipfsi 1 p2p ls > actual && test_must_be_empty actual ' @@ -152,13 +222,17 @@ test_expect_success "'ipfs p2p stream close -a' closes streams" ' test_must_be_empty actual ' -test_expect_success "'ipfs p2p listener close' closes app numeric handlers" ' - ipfsi 0 p2p listener open 1234 /ip4/127.0.0.1/tcp/10101 && - ipfsi 0 p2p listener close 1234 && - ipfsi 0 p2p listener ls > actual && +check_test_ports + +test_expect_success "'ipfs p2p close' closes app numeric handlers" ' + ipfsi 0 p2p forward 1234 /ipfs /ip4/127.0.0.1/tcp/10101 && + ipfsi 0 p2p close -p 1234 && + ipfsi 0 p2p ls > actual && test_must_be_empty actual ' +check_test_ports + test_expect_success 'stop iptb' ' iptb stop '