coreapi swarm: rewire connect/disconnect

License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:
Łukasz Magiera 2018-09-17 16:45:59 +02:00
parent 30d42f4550
commit df9f101892
3 changed files with 47 additions and 92 deletions

View File

@ -368,23 +368,13 @@ ipfs swarm connect /ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3
cmdkit.StringArg("address", true, true, "Address of peer to connect to.").EnableStdin(),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
return err
}
addrs := req.Arguments
if n.PeerHost == nil {
return err
}
// FIXME(steb): Nasty
swrm, ok := n.PeerHost.Network().(*swarm.Swarm)
if !ok {
return fmt.Errorf("peerhost network was not swarm")
}
pis, err := peersWithAddresses(addrs)
if err != nil {
return err
@ -392,18 +382,16 @@ ipfs swarm connect /ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3
output := make([]string, len(pis))
for i, pi := range pis {
swrm.Backoff().Clear(pi.ID)
output[i] = "connect " + pi.ID.Pretty()
err := n.PeerHost.Connect(req.Context, pi)
err := api.Swarm().Connect(req.Context, pi)
if err != nil {
return fmt.Errorf("%s failure: %s", output[i], err)
}
output[i] += " success"
}
return cmds.EmitOnce(res, &stringList{addrs})
return cmds.EmitOnce(res, &stringList{output})
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeEncoder(stringListEncoder),
@ -428,57 +416,24 @@ it will reconnect.
cmdkit.StringArg("address", true, true, "Address of peer to disconnect from.").EnableStdin(),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
return err
}
addrs := req.Arguments
if n.PeerHost == nil {
return ErrNotOnline
}
iaddrs, err := parseAddresses(addrs)
iaddrs, err := parseAddresses(req.Arguments)
if err != nil {
return err
}
output := make([]string, len(iaddrs))
for i, addr := range iaddrs {
taddr := addr.Transport()
id := addr.ID()
output[i] = "disconnect " + id.Pretty()
output[i] = "disconnect " + addr.ID().Pretty()
net := n.PeerHost.Network()
if taddr == nil {
if net.Connectedness(id) != inet.Connected {
output[i] += " failure: not connected"
} else if err := net.ClosePeer(id); err != nil {
output[i] += " failure: " + err.Error()
} else {
output[i] += " success"
}
if err := api.Swarm().Disconnect(req.Context, addr.Multiaddr()); err != nil {
output[i] += " failure: " + err.Error()
} else {
found := false
for _, conn := range net.ConnsToPeer(id) {
if !conn.RemoteMultiaddr().Equal(taddr) {
continue
}
if err := conn.Close(); err != nil {
output[i] += " failure: " + err.Error()
} else {
output[i] += " success"
}
found = true
break
}
if !found {
output[i] += " failure: conn not found"
}
output[i] += " success"
}
}
return cmds.EmitOnce(res, &stringList{output})

View File

@ -2,14 +2,22 @@ package iface
import (
"context"
"errors"
"time"
ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer"
"gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
"gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer"
pstore "gx/ipfs/QmfAQMFpgDU2U4BXG64qVr8HSiictfWvkSBz7Y2oDj65st/go-libp2p-peerstore"
)
// PeerInfo contains information about a peer
type PeerInfo interface {
var (
ErrNotConnected = errors.New("not connected")
ErrConnNotFound = errors.New("conn not found")
)
// ConnectionInfo contains information about a peer
type ConnectionInfo interface {
// ID returns PeerID
ID() peer.ID
@ -20,18 +28,17 @@ type PeerInfo interface {
Latency(context.Context) (time.Duration, error)
// Streams returns list of streams established with the peer
// TODO: should this return multicodecs?
Streams(context.Context) ([]string, error)
Streams(context.Context) ([]protocol.ID, error)
}
// SwarmAPI specifies the interface to libp2p swarm
type SwarmAPI interface {
// Connect to a given address
Connect(context.Context, ma.Multiaddr) error
// Connect to a given peer
Connect(context.Context, pstore.PeerInfo) error
// Disconnect from a given address
Disconnect(context.Context, ma.Multiaddr) error
// Peers returns the list of peers we are connected to
Peers(context.Context) ([]PeerInfo, error)
Peers(context.Context) ([]ConnectionInfo, error)
}

View File

@ -2,7 +2,6 @@ package coreapi
import (
"context"
"errors"
"fmt"
"time"
@ -11,8 +10,10 @@ import (
swarm "gx/ipfs/QmPQoCVRHaGD25VffyB7DFV5qP65hFSQJdSDy75P1vYBKe/go-libp2p-swarm"
iaddr "gx/ipfs/QmSzdvo9aPzLj4HXWTcgGAp8N84tZc8LbLmFZFwUb1dpWk/go-ipfs-addr"
ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer"
pstore "gx/ipfs/QmfAQMFpgDU2U4BXG64qVr8HSiictfWvkSBz7Y2oDj65st/go-libp2p-peerstore"
inet "gx/ipfs/QmfDPh144WGBqRxZb1TGDHerbMnZATrHZggAPw7putNnBq/go-libp2p-net"
net "gx/ipfs/QmfDPh144WGBqRxZb1TGDHerbMnZATrHZggAPw7putNnBq/go-libp2p-net"
)
@ -29,7 +30,7 @@ type connInfo struct {
muxer string
}
func (api *SwarmAPI) Connect(ctx context.Context, addr ma.Multiaddr) error {
func (api *SwarmAPI) Connect(ctx context.Context, pi pstore.PeerInfo) error {
if api.node.PeerHost == nil {
return coreiface.ErrOffline
}
@ -39,16 +40,6 @@ func (api *SwarmAPI) Connect(ctx context.Context, addr ma.Multiaddr) error {
return fmt.Errorf("peerhost network was not swarm")
}
ia, err := iaddr.ParseMultiaddr(ma.Multiaddr(addr))
if err != nil {
return err
}
pi := pstore.PeerInfo{
ID: ia.ID(),
Addrs: []ma.Multiaddr{ia.Transport()},
}
swrm.Backoff().Clear(pi.ID)
return api.node.PeerHost.Connect(ctx, pi)
@ -65,36 +56,38 @@ func (api *SwarmAPI) Disconnect(ctx context.Context, addr ma.Multiaddr) error {
}
taddr := ia.Transport()
id := ia.ID()
net := api.node.PeerHost.Network()
found := false
conns := api.node.PeerHost.Network().ConnsToPeer(ia.ID())
for _, conn := range conns {
if !conn.RemoteMultiaddr().Equal(taddr) {
continue
}
if err := conn.Close(); err != nil {
if taddr == nil {
if net.Connectedness(id) != inet.Connected {
return coreiface.ErrNotConnected
} else if err := net.ClosePeer(id); err != nil {
return err
}
found = true
break
}
} else {
for _, conn := range net.ConnsToPeer(id) {
if !conn.RemoteMultiaddr().Equal(taddr) {
continue
}
if !found {
return errors.New("conn not found")
return conn.Close()
}
return coreiface.ErrConnNotFound
}
return nil
}
func (api *SwarmAPI) Peers(context.Context) ([]coreiface.PeerInfo, error) {
func (api *SwarmAPI) Peers(context.Context) ([]coreiface.ConnectionInfo, error) {
if api.node.PeerHost == nil {
return nil, coreiface.ErrOffline
}
conns := api.node.PeerHost.Network().Conns()
var out []coreiface.PeerInfo
var out []coreiface.ConnectionInfo
for _, c := range conns {
pid := c.RemotePeer()
addr := c.RemoteMultiaddr()
@ -133,12 +126,12 @@ func (ci *connInfo) Latency(context.Context) (time.Duration, error) {
return ci.api.node.Peerstore.LatencyEWMA(peer.ID(ci.ID())), nil
}
func (ci *connInfo) Streams(context.Context) ([]string, error) {
func (ci *connInfo) Streams(context.Context) ([]protocol.ID, error) {
streams := ci.conn.GetStreams()
out := make([]string, len(streams))
out := make([]protocol.ID, len(streams))
for i, s := range streams {
out[i] = string(s.Protocol())
out[i] = s.Protocol()
}
return out, nil