transport refactor update

License: MIT
Signed-off-by: Steven Allen <steven@stebalien.com>
This commit is contained in:
Steven Allen 2018-03-07 22:06:17 -08:00
parent e094a84fe0
commit b84a71de8c
11 changed files with 125 additions and 152 deletions

View File

@ -25,7 +25,6 @@ import (
cmds "gx/ipfs/QmSKYWC84fqkKB54Te5JMcov2MBVzucXaRGxFqByzzCbHe/go-ipfs-cmds"
ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr"
"gx/ipfs/QmX3QZ5jHEPidwUrymXV1iSCSUhdGxj15sm2gP4jKMef7B/client_golang/prometheus"
iconn "gx/ipfs/QmYDNqBAMWVMHKndYR35Sd8PfEVWBiDmpHYkuRJTunJDeJ/go-libp2p-interface-conn"
mprome "gx/ipfs/Qma63DWYgaK1snYcNEv1dBfrZGc961V6frGQiVBGc4TU6h/go-metrics-prometheus"
"gx/ipfs/QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM/go-ipfs-cmdkit"
)
@ -215,7 +214,6 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
if unencrypted {
log.Warningf(`Running with --%s: All connections are UNENCRYPTED.
You will not be able to connect to regular encrypted networks.`, unencryptTransportKwd)
iconn.EncryptConnections = false
}
// first, whether user has provided the initialization flag. we may be
@ -292,6 +290,7 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
Repo: repo,
Permanent: true, // It is temporary way to signify that node is permanent
Online: !offline,
DisableEncryptedConnections: unencrypted,
ExtraOpts: map[string]bool{
"pubsub": pubsub,
"ipnsps": ipnsps,
@ -474,7 +473,7 @@ func serveHTTPApi(req *cmds.Request, cctx *oldcmds.Context) (<-chan error, error
errc := make(chan error)
go func() {
errc <- corehttp.Serve(node, apiLis.NetListener(), opts...)
errc <- corehttp.Serve(node, manet.NetListener(apiLis), opts...)
close(errc)
}()
return errc, nil
@ -561,7 +560,7 @@ func serveHTTPGateway(req *cmds.Request, cctx *oldcmds.Context) (<-chan error, e
errc := make(chan error)
go func() {
errc <- corehttp.Serve(node, gwLis.NetListener(), opts...)
errc <- corehttp.Serve(node, manet.NetListener(gwLis), opts...)
close(errc)
}()
return errc, nil

View File

@ -152,17 +152,20 @@ func connect(args args) error {
}
// log everything that goes through conn
rwc := &logRW{n: "conn", rw: conn}
rwc := &logConn{n: "conn", Conn: conn}
// OK, let's setup the channel.
sk := ps.PrivKey(p)
sg := secio.SessionGenerator{LocalID: p, PrivateKey: sk}
sess, err := sg.NewSession(context.TODO(), rwc)
sg, err := secio.New(sk)
if err != nil {
return err
}
out("remote peer id: %s", sess.RemotePeer())
netcat(sess.ReadWriter().(io.ReadWriteCloser))
sconn, err := sg.SecureInbound(context.TODO(), rwc)
if err != nil {
return err
}
out("remote peer id: %s", sconn.RemotePeer())
netcat(sconn)
return nil
}

View File

@ -2,7 +2,7 @@ package main
import (
"fmt"
"io"
"net"
"os"
logging "gx/ipfs/QmTG23dvpBCBjqQwyDxV8CQT6jmS4PSftNr1VqHhE3MLy7/go-log"
@ -24,28 +24,24 @@ func out(format string, vals ...interface{}) {
}
}
type logRW struct {
n string
rw io.ReadWriter
type logConn struct {
net.Conn
n string
}
func (r *logRW) Read(buf []byte) (int, error) {
n, err := r.rw.Read(buf)
func (r *logConn) Read(buf []byte) (int, error) {
n, err := r.Conn.Read(buf)
if n > 0 {
log.Debugf("%s read: %v", r.n, buf)
}
return n, err
}
func (r *logRW) Write(buf []byte) (int, error) {
func (r *logConn) Write(buf []byte) (int, error) {
log.Debugf("%s write: %v", r.n, buf)
return r.rw.Write(buf)
return r.Conn.Write(buf)
}
func (r *logRW) Close() error {
c, ok := r.rw.(io.Closer)
if ok {
return c.Close()
}
return nil
func (r *logConn) Close() error {
return r.Conn.Close()
}

View File

@ -21,7 +21,9 @@ import (
metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
goprocessctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
libp2p "gx/ipfs/QmY6iAoG9DVgZwh5ZRcQEpa2uErAe1Hbei8qXPCjpDS9Ge/go-libp2p"
offline "gx/ipfs/QmYk9mQ4iByLLFzZPGWMnjJof3DQ3QneFFR6ZtNAXd8UvS/go-ipfs-exchange-offline"
p2phost "gx/ipfs/QmaSfSMvc1VPZ8JbMponFs4WHvF9FgEruF56opm5E1RgQA/go-libp2p-host"
bstore "gx/ipfs/QmayRSLCiM2gWR7Kay8vqu3Yy5mf7yPqocF9ZRgDUPYMcc/go-ipfs-blockstore"
peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer"
pstore "gx/ipfs/QmdeiKhUy1TVGBaKxt7y1QmBDLBdisSrLJ1x58Eoj4PXUh/go-libp2p-peerstore"
@ -42,6 +44,10 @@ type BuildCfg struct {
// that will improve performance in long run
Permanent bool
// DisableEncryptedConnections disables connection encryption *entirely*.
// DO NOT SET THIS UNLESS YOU'RE TESTING.
DisableEncryptedConnections bool
// If NilRepo is set, a repo backed by a nil datastore will be constructed
NilRepo bool
@ -126,6 +132,7 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) {
if err != nil {
return nil, err
}
ctx = metrics.CtxScope(ctx, "ipfs")
n := &IpfsNode{
@ -214,9 +221,19 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
bs.HashOnRead(true)
}
hostOption := cfg.Host
if cfg.DisableEncryptedConnections {
innerHostOption := hostOption
hostOption = func(ctx context.Context, id peer.ID, ps pstore.Peerstore, options ...libp2p.Option) (p2phost.Host, error) {
return innerHostOption(ctx, id, ps, append(options, libp2p.NoSecurity)...)
}
log.Warningf(`Your IPFS node has been configured to run WITHOUT ENCRYPTED CONNECTIONS.
You will not be able to connect to any nodes configured to use encrypted connections`)
}
if cfg.Online {
do := setupDiscoveryOption(rcfg.Discovery)
if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub"), cfg.getOpt("ipnsps"), cfg.getOpt("mplex")); err != nil {
if err := n.startOnlineServices(ctx, cfg.Routing, hostOption, do, cfg.getOpt("pubsub"), cfg.getOpt("ipnsps"), cfg.getOpt("mplex")); err != nil {
return err
}
} else {

View File

@ -90,10 +90,13 @@ var swarmPeersCmd = &cmds.Command{
Peer: pid.Pretty(),
}
swcon, ok := c.(*swarm.Conn)
if ok {
ci.Muxer = fmt.Sprintf("%T", swcon.StreamConn().Conn())
}
/*
// FIXME(steb):
swcon, ok := c.(*swarm.Conn)
if ok {
ci.Muxer = fmt.Sprintf("%T", swcon.StreamConn().Conn())
}
*/
if verbose || latency {
lat := n.Peerstore.LatencyEWMA(pid)
@ -104,11 +107,7 @@ var swarmPeersCmd = &cmds.Command{
}
}
if verbose || streams {
strs, err := c.GetStreams()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
strs := c.GetStreams()
for _, s := range strs {
ci.Streams = append(ci.Streams, streamInfo{Protocol: string(s.Protocol())})
@ -384,14 +383,13 @@ ipfs swarm connect /ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3
return
}
snet, ok := n.PeerHost.Network().(*swarm.Network)
// FIXME(steb): Nasty
swrm, ok := n.PeerHost.Network().(*swarm.Swarm)
if !ok {
res.SetError(fmt.Errorf("peerhost network was not swarm"), cmdkit.ErrNormal)
return
}
swrm := snet.Swarm()
pis, err := peersWithAddresses(addrs)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
@ -574,14 +572,15 @@ Filters default to those specified under the "Swarm.AddrFilters" config key.
return
}
snet, ok := n.PeerHost.Network().(*swarm.Network)
// FIXME(steb)
swrm, ok := n.PeerHost.Network().(*swarm.Swarm)
if !ok {
res.SetError(errors.New("failed to cast network to swarm network"), cmdkit.ErrNormal)
return
}
var output []string
for _, f := range snet.Filters.Filters() {
for _, f := range swrm.Filters.Filters() {
s, err := mafilter.ConvertIPNet(f)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
@ -621,7 +620,8 @@ add your filters to the ipfs config file.
return
}
snet, ok := n.PeerHost.Network().(*swarm.Network)
// FIXME(steb)
swrm, ok := n.PeerHost.Network().(*swarm.Swarm)
if !ok {
res.SetError(errors.New("failed to cast network to swarm network"), cmdkit.ErrNormal)
return
@ -651,7 +651,7 @@ add your filters to the ipfs config file.
return
}
snet.Filters.AddDialFilter(mask)
swrm.Filters.AddDialFilter(mask)
}
added, err := filtersAdd(r, cfg, req.Arguments())
@ -693,7 +693,7 @@ remove your filters from the ipfs config file.
return
}
snet, ok := n.PeerHost.Network().(*swarm.Network)
swrm, ok := n.PeerHost.Network().(*swarm.Swarm)
if !ok {
res.SetError(errors.New("failed to cast network to swarm network"), cmdkit.ErrNormal)
return
@ -712,9 +712,9 @@ remove your filters from the ipfs config file.
}
if req.Arguments()[0] == "all" || req.Arguments()[0] == "*" {
fs := snet.Filters.Filters()
fs := swrm.Filters.Filters()
for _, f := range fs {
snet.Filters.Remove(f)
swrm.Filters.Remove(f)
}
removed, err := filtersRemoveAll(r, cfg)
@ -735,7 +735,7 @@ remove your filters from the ipfs config file.
return
}
snet.Filters.Remove(mask)
swrm.Filters.Remove(mask)
}
removed, err := filtersRemove(r, cfg, req.Arguments())

View File

@ -16,7 +16,6 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"os"
"strings"
"time"
@ -41,18 +40,16 @@ import (
u "gx/ipfs/QmNiJuT8Ja3hMVpBHXv3Q6dwmperaQ6JjLtpMQgMCD7xvx/go-ipfs-util"
circuit "gx/ipfs/QmR5sXZi68rm9m2E3KiXj6hE5m3GeLaDjbLPUeV6W3MLR8/go-libp2p-circuit"
floodsub "gx/ipfs/QmRMgHdiLHJvySrXbtLBehr1W1yTQyuNmZG8HghG54ZPDz/go-libp2p-floodsub"
swarm "gx/ipfs/QmRpKdg1xs4Yyrn9yrVYRBp7AQqyRxMLpD6Jgp1eZAGqEr/go-libp2p-swarm"
goprocess "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
pnet "gx/ipfs/QmSGoP33Ufev1UDsUuHco8rfhVTzxfq6smXhwhN16c5CWd/go-libp2p-pnet"
mamask "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter"
logging "gx/ipfs/QmTG23dvpBCBjqQwyDxV8CQT6jmS4PSftNr1VqHhE3MLy7/go-log"
addrutil "gx/ipfs/QmTGSre9j1otFgsr1opCUQDXTPSM6BTZnMWwPeA5nYJM7w/go-addr-util"
record "gx/ipfs/QmTUyK82BVPA6LmSzEJpfEunk9uBaQzWtMsNP917tVj4sT/go-libp2p-record"
routing "gx/ipfs/QmUHRKTeaoASDvDj7cTAXsmjAY7KQ13ErtzkQHZQq6uFUz/go-libp2p-routing"
metrics "gx/ipfs/QmVvu4bS5QLfS19ePkp5Wgzn2ZUma5oXTT9BgDFyQLxUZF/go-libp2p-metrics"
psrouter "gx/ipfs/QmWKLW1C2jmGAEzX8jNpCTii6n2ScGxytnoRMRdNTK5Knt/go-libp2p-pubsub-router"
ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr"
mssmux "gx/ipfs/QmWzjXAyBTygw6CeCTUnhJzhFucfxY5FJivSoiGuiSbPjS/go-smux-multistream"
libp2p "gx/ipfs/QmY6iAoG9DVgZwh5ZRcQEpa2uErAe1Hbei8qXPCjpDS9Ge/go-libp2p"
discovery "gx/ipfs/QmY6iAoG9DVgZwh5ZRcQEpa2uErAe1Hbei8qXPCjpDS9Ge/go-libp2p/p2p/discovery"
p2pbhost "gx/ipfs/QmY6iAoG9DVgZwh5ZRcQEpa2uErAe1Hbei8qXPCjpDS9Ge/go-libp2p/p2p/host/basic"
rhost "gx/ipfs/QmY6iAoG9DVgZwh5ZRcQEpa2uErAe1Hbei8qXPCjpDS9Ge/go-libp2p/p2p/host/routed"
@ -68,7 +65,6 @@ import (
peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
yamux "gx/ipfs/QmcsgrV3nCAKjiHKZhKVXWc4oY3WBECJCqahXEMpHeMrev/go-smux-yamux"
ipnet "gx/ipfs/Qmd3oYWVLCVWryDV6Pobv6whZcvDXAHqS3chemZ658y4a8/go-libp2p-interface-pnet"
exchange "gx/ipfs/QmdcAXgEHUueP4A7b5hjabKn2EooeHgMreMvFC249dGCgc/go-ipfs-exchange-interface"
pstore "gx/ipfs/QmdeiKhUy1TVGBaKxt7y1QmBDLBdisSrLJ1x58Eoj4PXUh/go-libp2p-peerstore"
ic "gx/ipfs/Qme1knMqwt1hKZbc1BmQFmnm9f36nyQGwXxPGVpVJ9rMK5/go-libp2p-crypto"
@ -173,30 +169,29 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
if err != nil {
return err
}
var addrfilter []*net.IPNet
var libp2pOpts []libp2p.Option
for _, s := range cfg.Swarm.AddrFilters {
f, err := mamask.NewMask(s)
if err != nil {
return fmt.Errorf("incorrectly formatted address filter in config: %s", s)
}
addrfilter = append(addrfilter, f)
libp2pOpts = append(libp2pOpts, libp2p.FilterAddresses(f))
}
if !cfg.Swarm.DisableBandwidthMetrics {
// Set reporter
n.Reporter = metrics.NewBandwidthCounter()
libp2pOpts = append(libp2pOpts, libp2p.BandwidthReporter(n.Reporter))
}
tpt := makeSmuxTransport(mplex)
swarmkey, err := n.Repo.SwarmKey()
if err != nil {
return err
}
var protec ipnet.Protector
if swarmkey != nil {
protec, err = pnet.NewProtector(bytes.NewReader(swarmkey))
protec, err := pnet.NewProtector(bytes.NewReader(swarmkey))
if err != nil {
return fmt.Errorf("failed to configure private network: %s", err)
}
@ -219,27 +214,39 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
}
}
}()
libp2pOpts = append(libp2pOpts, libp2p.PrivateNetwork(protec))
}
addrsFactory, err := makeAddrsFactory(cfg.Addresses)
if err != nil {
return err
}
if !cfg.Swarm.DisableRelay {
addrsFactory = composeAddrsFactory(addrsFactory, filterRelayAddrs)
}
libp2pOpts = append(libp2pOpts, libp2p.AddrsFactory(addrsFactory))
connmgr, err := constructConnMgr(cfg.Swarm.ConnMgr)
connm, err := constructConnMgr(cfg.Swarm.ConnMgr)
if err != nil {
return err
}
libp2pOpts = append(libp2pOpts, libp2p.ConnectionManager(connm))
hostopts := &ConstructPeerHostOpts{
AddrsFactory: addrsFactory,
DisableNatPortMap: cfg.Swarm.DisableNatPortMap,
DisableRelay: cfg.Swarm.DisableRelay,
EnableRelayHop: cfg.Swarm.EnableRelayHop,
ConnectionManager: connmgr,
libp2pOpts = append(libp2pOpts, makeSmuxTransportOption(mplex))
if !cfg.Swarm.DisableNatPortMap {
libp2pOpts = append(libp2pOpts, libp2p.NATPortMap())
}
peerhost, err := hostOption(ctx, n.Identity, n.Peerstore, n.Reporter,
addrfilter, tpt, protec, hostopts)
if !cfg.Swarm.DisableRelay {
var opts []circuit.RelayOpt
if cfg.Swarm.EnableRelayHop {
opts = append(opts, circuit.OptHop)
}
libp2pOpts = append(libp2pOpts, libp2p.EnableRelay(opts...))
}
peerhost, err := hostOption(ctx, n.Identity, n.Peerstore, libp2pOpts...)
if err != nil {
return err
@ -372,8 +379,9 @@ func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) {
}, nil
}
func makeSmuxTransport(mplexExp bool) smux.Transport {
mstpt := mssmux.NewBlankTransport()
func makeSmuxTransportOption(mplexExp bool) libp2p.Option {
const yamuxID = "/yamux/1.0.0"
const mplexID = "/mplex/6.7.0"
ymxtpt := &yamux.Transport{
AcceptBacklog: 512,
@ -388,18 +396,29 @@ func makeSmuxTransport(mplexExp bool) smux.Transport {
ymxtpt.LogOutput = os.Stderr
}
mstpt.AddTransport("/yamux/1.0.0", ymxtpt)
muxers := map[string]smux.Transport{yamuxID: ymxtpt}
if mplexExp {
mstpt.AddTransport("/mplex/6.7.0", mplex.DefaultTransport)
muxers[mplexID] = mplex.DefaultTransport
}
// Allow muxer preference order overriding
order := []string{yamuxID, mplexID}
if prefs := os.Getenv("LIBP2P_MUX_PREFS"); prefs != "" {
mstpt.OrderPreference = strings.Fields(prefs)
order = strings.Fields(prefs)
}
return mstpt
opts := make([]libp2p.Option, 0, len(order))
for _, id := range order {
tpt, ok := muxers[id]
if !ok {
log.Warning("unknown or duplicate muxer in LIBP2P_MUX_PREFS: %s", id)
continue
}
delete(muxers, id)
opts = append(opts, libp2p.Muxer(id, tpt))
}
return libp2p.ChainOptions(opts...)
}
func setupDiscoveryOption(d config.Discovery) DiscoveryOption {
@ -853,62 +872,18 @@ type ConstructPeerHostOpts struct {
ConnectionManager ifconnmgr.ConnManager
}
type HostOption func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet, tpt smux.Transport, protc ipnet.Protector, opts *ConstructPeerHostOpts) (p2phost.Host, error)
type HostOption func(ctx context.Context, id peer.ID, ps pstore.Peerstore, options ...libp2p.Option) (p2phost.Host, error)
var DefaultHostOption HostOption = constructPeerHost
// isolates the complex initialization steps
func constructPeerHost(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet, tpt smux.Transport, protec ipnet.Protector, opts *ConstructPeerHostOpts) (p2phost.Host, error) {
// no addresses to begin with. we'll start later.
swrm, err := swarm.NewSwarmWithProtector(ctx, nil, id, ps, protec, tpt, bwr)
if err != nil {
return nil, err
func constructPeerHost(ctx context.Context, id peer.ID, ps pstore.Peerstore, options ...libp2p.Option) (p2phost.Host, error) {
pkey := ps.PrivKey(id)
if pkey == nil {
return nil, fmt.Errorf("missing private key for node ID: %s", id.Pretty())
}
network := (*swarm.Network)(swrm)
for _, f := range fs {
network.Swarm().Filters.AddDialFilter(f)
}
hostOpts := []interface{}{bwr}
if !opts.DisableNatPortMap {
hostOpts = append(hostOpts, p2pbhost.NATPortMap)
}
if opts.ConnectionManager != nil {
hostOpts = append(hostOpts, opts.ConnectionManager)
}
addrsFactory := opts.AddrsFactory
if !opts.DisableRelay {
if addrsFactory != nil {
addrsFactory = composeAddrsFactory(addrsFactory, filterRelayAddrs)
} else {
addrsFactory = filterRelayAddrs
}
}
if addrsFactory != nil {
hostOpts = append(hostOpts, addrsFactory)
}
host := p2pbhost.New(network, hostOpts...)
if !opts.DisableRelay {
var relayOpts []circuit.RelayOpt
if opts.EnableRelayHop {
relayOpts = append(relayOpts, circuit.OptHop)
}
err := circuit.AddRelayTransport(ctx, host, relayOpts...)
if err != nil {
host.Close()
return nil, err
}
}
return host, nil
options = append([]libp2p.Option{libp2p.Identity(pkey), libp2p.Peerstore(ps)}, options...)
return libp2p.New(ctx, options...)
}
func filterRelayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
@ -936,16 +911,8 @@ func startListening(host p2phost.Host, cfg *config.Config) error {
return err
}
// make sure we error out if our config does not have addresses we can use
log.Debugf("Config.Addresses.Swarm:%s", listenAddrs)
filteredAddrs := addrutil.FilterUsableAddrs(listenAddrs)
log.Debugf("Config.Addresses.Swarm:%s (filtered)", filteredAddrs)
if len(filteredAddrs) < 1 {
return fmt.Errorf("addresses in config not usable: %s", listenAddrs)
}
// Actually start listening:
if err := host.Network().Listen(filteredAddrs...); err != nil {
if err := host.Network().Listen(listenAddrs...); err != nil {
return err
}

View File

@ -61,7 +61,7 @@ func ListenAndServe(n *core.IpfsNode, listeningMultiAddr string, options ...Serv
addr = list.Multiaddr()
fmt.Printf("API server listening on %s\n", addr)
return Serve(n, list.NetListener(), options...)
return Serve(n, manet.NetListener(list), options...)
}
func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error {

View File

@ -7,9 +7,9 @@ import (
core "github.com/ipfs/go-ipfs/core"
swarmt "gx/ipfs/QmRpKdg1xs4Yyrn9yrVYRBp7AQqyRxMLpD6Jgp1eZAGqEr/go-libp2p-swarm/testing"
inet "gx/ipfs/QmXoz9o2PT3tEzf7hicegwex5UgVP54n3k82K7jrWFyN86/go-libp2p-net"
bhost "gx/ipfs/QmY6iAoG9DVgZwh5ZRcQEpa2uErAe1Hbei8qXPCjpDS9Ge/go-libp2p/p2p/host/basic"
testutil "gx/ipfs/Qma2UuHusnaFV24DgeZ5hyrM9uc4UdyVaZbtn2FQsPRhES/go-libp2p-netutil"
)
// This test is based on go-libp2p/p2p/net/swarm.TestConnectednessCorrect
@ -20,11 +20,11 @@ func TestPeersTotal(t *testing.T) {
hosts := make([]*bhost.BasicHost, 4)
for i := 0; i < 4; i++ {
hosts[i] = bhost.New(testutil.GenSwarmNetwork(t, ctx))
hosts[i] = bhost.New(swarmt.GenSwarm(t, ctx))
}
dial := func(a, b inet.Network) {
testutil.DivulgeAddresses(b, a)
swarmt.DivulgeAddresses(b, a)
if _, err := a.DialPeer(ctx, b.LocalPeer()); err != nil {
t.Fatalf("Failed to dial: %s", err)
}

View File

@ -2,7 +2,6 @@ package coremock
import (
"context"
"net"
commands "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
@ -10,12 +9,10 @@ import (
config "github.com/ipfs/go-ipfs/repo/config"
testutil "gx/ipfs/QmUJzxQQ2kzwQubsMqBTr1NGDpLfh7pGA2E1oaJULcKDPq/go-testutil"
metrics "gx/ipfs/QmVvu4bS5QLfS19ePkp5Wgzn2ZUma5oXTT9BgDFyQLxUZF/go-libp2p-metrics"
libp2p "gx/ipfs/QmY6iAoG9DVgZwh5ZRcQEpa2uErAe1Hbei8qXPCjpDS9Ge/go-libp2p"
mocknet "gx/ipfs/QmY6iAoG9DVgZwh5ZRcQEpa2uErAe1Hbei8qXPCjpDS9Ge/go-libp2p/p2p/net/mock"
smux "gx/ipfs/QmY9JXR3FupnYAYJWK9aMr9bCpqWKcToQ1tz8DVGTrHpHw/go-stream-muxer"
host "gx/ipfs/QmaSfSMvc1VPZ8JbMponFs4WHvF9FgEruF56opm5E1RgQA/go-libp2p-host"
peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer"
ipnet "gx/ipfs/Qmd3oYWVLCVWryDV6Pobv6whZcvDXAHqS3chemZ658y4a8/go-libp2p-interface-pnet"
pstore "gx/ipfs/QmdeiKhUy1TVGBaKxt7y1QmBDLBdisSrLJ1x58Eoj4PXUh/go-libp2p-peerstore"
datastore "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore"
syncds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore/sync"
@ -33,7 +30,7 @@ func NewMockNode() (*core.IpfsNode, error) {
}
func MockHostOption(mn mocknet.Mocknet) core.HostOption {
return func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet, _ smux.Transport, _ ipnet.Protector, _ *core.ConstructPeerHostOpts) (host.Host, error) {
return func(ctx context.Context, id peer.ID, ps pstore.Peerstore, _ ...libp2p.Option) (host.Host, error) {
return mn.AddPeerWithPeerstore(id, ps)
}
}

View File

@ -54,7 +54,7 @@ type streamMessageSender struct {
}
func (s *streamMessageSender) Close() error {
return s.s.Close()
return inet.FullClose(s.s)
}
func (s *streamMessageSender) Reset() error {
@ -119,13 +119,13 @@ func (bsnet *impl) SendMessage(
return err
}
err = msgToStream(ctx, s, outgoing)
if err != nil {
if err = msgToStream(ctx, s, outgoing); err != nil {
s.Reset()
} else {
s.Close()
return err
}
return err
// Yes, return this error. We have no reason to believe that the block
// was actually *sent* unless we see the EOF.
return inet.FullClose(s)
}
func (bsnet *impl) SetDelegate(r Receiver) {

View File

@ -311,12 +311,6 @@
"name": "go-multibase",
"version": "0.2.6"
},
{
"author": "whyrusleeping",
"hash": "QmYDNqBAMWVMHKndYR35Sd8PfEVWBiDmpHYkuRJTunJDeJ",
"name": "go-libp2p-interface-conn",
"version": "0.4.13"
},
{
"author": "multiformats",
"hash": "QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb",