From b84a71de8cf58643faa0ffdaafa8ba4e0da2e2e3 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 7 Mar 2018 22:06:17 -0800 Subject: [PATCH] transport refactor update License: MIT Signed-off-by: Steven Allen --- cmd/ipfs/daemon.go | 7 +- cmd/seccat/seccat.go | 13 ++- cmd/seccat/util.go | 24 ++--- core/builder.go | 19 +++- core/commands/swarm.go | 40 ++++---- core/core.go | 141 ++++++++++---------------- core/corehttp/corehttp.go | 2 +- core/corehttp/metrics_test.go | 6 +- core/mock/mock.go | 7 +- exchange/bitswap/network/ipfs_impl.go | 12 +-- package.json | 6 -- 11 files changed, 125 insertions(+), 152 deletions(-) diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index e62c2f374..2c857f8bd 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -25,7 +25,6 @@ import ( cmds "gx/ipfs/QmSKYWC84fqkKB54Te5JMcov2MBVzucXaRGxFqByzzCbHe/go-ipfs-cmds" ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr" "gx/ipfs/QmX3QZ5jHEPidwUrymXV1iSCSUhdGxj15sm2gP4jKMef7B/client_golang/prometheus" - iconn "gx/ipfs/QmYDNqBAMWVMHKndYR35Sd8PfEVWBiDmpHYkuRJTunJDeJ/go-libp2p-interface-conn" mprome "gx/ipfs/Qma63DWYgaK1snYcNEv1dBfrZGc961V6frGQiVBGc4TU6h/go-metrics-prometheus" "gx/ipfs/QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM/go-ipfs-cmdkit" ) @@ -215,7 +214,6 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment if unencrypted { log.Warningf(`Running with --%s: All connections are UNENCRYPTED. You will not be able to connect to regular encrypted networks.`, unencryptTransportKwd) - iconn.EncryptConnections = false } // first, whether user has provided the initialization flag. we may be @@ -292,6 +290,7 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment Repo: repo, Permanent: true, // It is temporary way to signify that node is permanent Online: !offline, + DisableEncryptedConnections: unencrypted, ExtraOpts: map[string]bool{ "pubsub": pubsub, "ipnsps": ipnsps, @@ -474,7 +473,7 @@ func serveHTTPApi(req *cmds.Request, cctx *oldcmds.Context) (<-chan error, error errc := make(chan error) go func() { - errc <- corehttp.Serve(node, apiLis.NetListener(), opts...) + errc <- corehttp.Serve(node, manet.NetListener(apiLis), opts...) close(errc) }() return errc, nil @@ -561,7 +560,7 @@ func serveHTTPGateway(req *cmds.Request, cctx *oldcmds.Context) (<-chan error, e errc := make(chan error) go func() { - errc <- corehttp.Serve(node, gwLis.NetListener(), opts...) + errc <- corehttp.Serve(node, manet.NetListener(gwLis), opts...) close(errc) }() return errc, nil diff --git a/cmd/seccat/seccat.go b/cmd/seccat/seccat.go index f625ec1a3..2ef9eb3b1 100644 --- a/cmd/seccat/seccat.go +++ b/cmd/seccat/seccat.go @@ -152,17 +152,20 @@ func connect(args args) error { } // log everything that goes through conn - rwc := &logRW{n: "conn", rw: conn} + rwc := &logConn{n: "conn", Conn: conn} // OK, let's setup the channel. sk := ps.PrivKey(p) - sg := secio.SessionGenerator{LocalID: p, PrivateKey: sk} - sess, err := sg.NewSession(context.TODO(), rwc) + sg, err := secio.New(sk) if err != nil { return err } - out("remote peer id: %s", sess.RemotePeer()) - netcat(sess.ReadWriter().(io.ReadWriteCloser)) + sconn, err := sg.SecureInbound(context.TODO(), rwc) + if err != nil { + return err + } + out("remote peer id: %s", sconn.RemotePeer()) + netcat(sconn) return nil } diff --git a/cmd/seccat/util.go b/cmd/seccat/util.go index b4b86bd70..981c1f725 100644 --- a/cmd/seccat/util.go +++ b/cmd/seccat/util.go @@ -2,7 +2,7 @@ package main import ( "fmt" - "io" + "net" "os" logging "gx/ipfs/QmTG23dvpBCBjqQwyDxV8CQT6jmS4PSftNr1VqHhE3MLy7/go-log" @@ -24,28 +24,24 @@ func out(format string, vals ...interface{}) { } } -type logRW struct { - n string - rw io.ReadWriter +type logConn struct { + net.Conn + n string } -func (r *logRW) Read(buf []byte) (int, error) { - n, err := r.rw.Read(buf) +func (r *logConn) Read(buf []byte) (int, error) { + n, err := r.Conn.Read(buf) if n > 0 { log.Debugf("%s read: %v", r.n, buf) } return n, err } -func (r *logRW) Write(buf []byte) (int, error) { +func (r *logConn) Write(buf []byte) (int, error) { log.Debugf("%s write: %v", r.n, buf) - return r.rw.Write(buf) + return r.Conn.Write(buf) } -func (r *logRW) Close() error { - c, ok := r.rw.(io.Closer) - if ok { - return c.Close() - } - return nil +func (r *logConn) Close() error { + return r.Conn.Close() } diff --git a/core/builder.go b/core/builder.go index 3cb871a47..97a5a836f 100644 --- a/core/builder.go +++ b/core/builder.go @@ -21,7 +21,9 @@ import ( metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface" goprocessctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context" + libp2p "gx/ipfs/QmY6iAoG9DVgZwh5ZRcQEpa2uErAe1Hbei8qXPCjpDS9Ge/go-libp2p" offline "gx/ipfs/QmYk9mQ4iByLLFzZPGWMnjJof3DQ3QneFFR6ZtNAXd8UvS/go-ipfs-exchange-offline" + p2phost "gx/ipfs/QmaSfSMvc1VPZ8JbMponFs4WHvF9FgEruF56opm5E1RgQA/go-libp2p-host" bstore "gx/ipfs/QmayRSLCiM2gWR7Kay8vqu3Yy5mf7yPqocF9ZRgDUPYMcc/go-ipfs-blockstore" peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer" pstore "gx/ipfs/QmdeiKhUy1TVGBaKxt7y1QmBDLBdisSrLJ1x58Eoj4PXUh/go-libp2p-peerstore" @@ -42,6 +44,10 @@ type BuildCfg struct { // that will improve performance in long run Permanent bool + // DisableEncryptedConnections disables connection encryption *entirely*. + // DO NOT SET THIS UNLESS YOU'RE TESTING. + DisableEncryptedConnections bool + // If NilRepo is set, a repo backed by a nil datastore will be constructed NilRepo bool @@ -126,6 +132,7 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) { if err != nil { return nil, err } + ctx = metrics.CtxScope(ctx, "ipfs") n := &IpfsNode{ @@ -214,9 +221,19 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { bs.HashOnRead(true) } + hostOption := cfg.Host + if cfg.DisableEncryptedConnections { + innerHostOption := hostOption + hostOption = func(ctx context.Context, id peer.ID, ps pstore.Peerstore, options ...libp2p.Option) (p2phost.Host, error) { + return innerHostOption(ctx, id, ps, append(options, libp2p.NoSecurity)...) + } + log.Warningf(`Your IPFS node has been configured to run WITHOUT ENCRYPTED CONNECTIONS. + You will not be able to connect to any nodes configured to use encrypted connections`) + } + if cfg.Online { do := setupDiscoveryOption(rcfg.Discovery) - if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub"), cfg.getOpt("ipnsps"), cfg.getOpt("mplex")); err != nil { + if err := n.startOnlineServices(ctx, cfg.Routing, hostOption, do, cfg.getOpt("pubsub"), cfg.getOpt("ipnsps"), cfg.getOpt("mplex")); err != nil { return err } } else { diff --git a/core/commands/swarm.go b/core/commands/swarm.go index ddc93a2f9..ecb8ea34a 100644 --- a/core/commands/swarm.go +++ b/core/commands/swarm.go @@ -90,10 +90,13 @@ var swarmPeersCmd = &cmds.Command{ Peer: pid.Pretty(), } - swcon, ok := c.(*swarm.Conn) - if ok { - ci.Muxer = fmt.Sprintf("%T", swcon.StreamConn().Conn()) - } + /* + // FIXME(steb): + swcon, ok := c.(*swarm.Conn) + if ok { + ci.Muxer = fmt.Sprintf("%T", swcon.StreamConn().Conn()) + } + */ if verbose || latency { lat := n.Peerstore.LatencyEWMA(pid) @@ -104,11 +107,7 @@ var swarmPeersCmd = &cmds.Command{ } } if verbose || streams { - strs, err := c.GetStreams() - if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return - } + strs := c.GetStreams() for _, s := range strs { ci.Streams = append(ci.Streams, streamInfo{Protocol: string(s.Protocol())}) @@ -384,14 +383,13 @@ ipfs swarm connect /ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3 return } - snet, ok := n.PeerHost.Network().(*swarm.Network) + // FIXME(steb): Nasty + swrm, ok := n.PeerHost.Network().(*swarm.Swarm) if !ok { res.SetError(fmt.Errorf("peerhost network was not swarm"), cmdkit.ErrNormal) return } - swrm := snet.Swarm() - pis, err := peersWithAddresses(addrs) if err != nil { res.SetError(err, cmdkit.ErrNormal) @@ -574,14 +572,15 @@ Filters default to those specified under the "Swarm.AddrFilters" config key. return } - snet, ok := n.PeerHost.Network().(*swarm.Network) + // FIXME(steb) + swrm, ok := n.PeerHost.Network().(*swarm.Swarm) if !ok { res.SetError(errors.New("failed to cast network to swarm network"), cmdkit.ErrNormal) return } var output []string - for _, f := range snet.Filters.Filters() { + for _, f := range swrm.Filters.Filters() { s, err := mafilter.ConvertIPNet(f) if err != nil { res.SetError(err, cmdkit.ErrNormal) @@ -621,7 +620,8 @@ add your filters to the ipfs config file. return } - snet, ok := n.PeerHost.Network().(*swarm.Network) + // FIXME(steb) + swrm, ok := n.PeerHost.Network().(*swarm.Swarm) if !ok { res.SetError(errors.New("failed to cast network to swarm network"), cmdkit.ErrNormal) return @@ -651,7 +651,7 @@ add your filters to the ipfs config file. return } - snet.Filters.AddDialFilter(mask) + swrm.Filters.AddDialFilter(mask) } added, err := filtersAdd(r, cfg, req.Arguments()) @@ -693,7 +693,7 @@ remove your filters from the ipfs config file. return } - snet, ok := n.PeerHost.Network().(*swarm.Network) + swrm, ok := n.PeerHost.Network().(*swarm.Swarm) if !ok { res.SetError(errors.New("failed to cast network to swarm network"), cmdkit.ErrNormal) return @@ -712,9 +712,9 @@ remove your filters from the ipfs config file. } if req.Arguments()[0] == "all" || req.Arguments()[0] == "*" { - fs := snet.Filters.Filters() + fs := swrm.Filters.Filters() for _, f := range fs { - snet.Filters.Remove(f) + swrm.Filters.Remove(f) } removed, err := filtersRemoveAll(r, cfg) @@ -735,7 +735,7 @@ remove your filters from the ipfs config file. return } - snet.Filters.Remove(mask) + swrm.Filters.Remove(mask) } removed, err := filtersRemove(r, cfg, req.Arguments()) diff --git a/core/core.go b/core/core.go index 67b6eaa31..68c971637 100644 --- a/core/core.go +++ b/core/core.go @@ -16,7 +16,6 @@ import ( "fmt" "io" "io/ioutil" - "net" "os" "strings" "time" @@ -41,18 +40,16 @@ import ( u "gx/ipfs/QmNiJuT8Ja3hMVpBHXv3Q6dwmperaQ6JjLtpMQgMCD7xvx/go-ipfs-util" circuit "gx/ipfs/QmR5sXZi68rm9m2E3KiXj6hE5m3GeLaDjbLPUeV6W3MLR8/go-libp2p-circuit" floodsub "gx/ipfs/QmRMgHdiLHJvySrXbtLBehr1W1yTQyuNmZG8HghG54ZPDz/go-libp2p-floodsub" - swarm "gx/ipfs/QmRpKdg1xs4Yyrn9yrVYRBp7AQqyRxMLpD6Jgp1eZAGqEr/go-libp2p-swarm" goprocess "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" pnet "gx/ipfs/QmSGoP33Ufev1UDsUuHco8rfhVTzxfq6smXhwhN16c5CWd/go-libp2p-pnet" mamask "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter" logging "gx/ipfs/QmTG23dvpBCBjqQwyDxV8CQT6jmS4PSftNr1VqHhE3MLy7/go-log" - addrutil "gx/ipfs/QmTGSre9j1otFgsr1opCUQDXTPSM6BTZnMWwPeA5nYJM7w/go-addr-util" record "gx/ipfs/QmTUyK82BVPA6LmSzEJpfEunk9uBaQzWtMsNP917tVj4sT/go-libp2p-record" routing "gx/ipfs/QmUHRKTeaoASDvDj7cTAXsmjAY7KQ13ErtzkQHZQq6uFUz/go-libp2p-routing" metrics "gx/ipfs/QmVvu4bS5QLfS19ePkp5Wgzn2ZUma5oXTT9BgDFyQLxUZF/go-libp2p-metrics" psrouter "gx/ipfs/QmWKLW1C2jmGAEzX8jNpCTii6n2ScGxytnoRMRdNTK5Knt/go-libp2p-pubsub-router" ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr" - mssmux "gx/ipfs/QmWzjXAyBTygw6CeCTUnhJzhFucfxY5FJivSoiGuiSbPjS/go-smux-multistream" + libp2p "gx/ipfs/QmY6iAoG9DVgZwh5ZRcQEpa2uErAe1Hbei8qXPCjpDS9Ge/go-libp2p" discovery "gx/ipfs/QmY6iAoG9DVgZwh5ZRcQEpa2uErAe1Hbei8qXPCjpDS9Ge/go-libp2p/p2p/discovery" p2pbhost "gx/ipfs/QmY6iAoG9DVgZwh5ZRcQEpa2uErAe1Hbei8qXPCjpDS9Ge/go-libp2p/p2p/host/basic" rhost "gx/ipfs/QmY6iAoG9DVgZwh5ZRcQEpa2uErAe1Hbei8qXPCjpDS9Ge/go-libp2p/p2p/host/routed" @@ -68,7 +65,6 @@ import ( peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer" cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid" yamux "gx/ipfs/QmcsgrV3nCAKjiHKZhKVXWc4oY3WBECJCqahXEMpHeMrev/go-smux-yamux" - ipnet "gx/ipfs/Qmd3oYWVLCVWryDV6Pobv6whZcvDXAHqS3chemZ658y4a8/go-libp2p-interface-pnet" exchange "gx/ipfs/QmdcAXgEHUueP4A7b5hjabKn2EooeHgMreMvFC249dGCgc/go-ipfs-exchange-interface" pstore "gx/ipfs/QmdeiKhUy1TVGBaKxt7y1QmBDLBdisSrLJ1x58Eoj4PXUh/go-libp2p-peerstore" ic "gx/ipfs/Qme1knMqwt1hKZbc1BmQFmnm9f36nyQGwXxPGVpVJ9rMK5/go-libp2p-crypto" @@ -173,30 +169,29 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin if err != nil { return err } - var addrfilter []*net.IPNet + + var libp2pOpts []libp2p.Option for _, s := range cfg.Swarm.AddrFilters { f, err := mamask.NewMask(s) if err != nil { return fmt.Errorf("incorrectly formatted address filter in config: %s", s) } - addrfilter = append(addrfilter, f) + libp2pOpts = append(libp2pOpts, libp2p.FilterAddresses(f)) } if !cfg.Swarm.DisableBandwidthMetrics { // Set reporter n.Reporter = metrics.NewBandwidthCounter() + libp2pOpts = append(libp2pOpts, libp2p.BandwidthReporter(n.Reporter)) } - tpt := makeSmuxTransport(mplex) - swarmkey, err := n.Repo.SwarmKey() if err != nil { return err } - var protec ipnet.Protector if swarmkey != nil { - protec, err = pnet.NewProtector(bytes.NewReader(swarmkey)) + protec, err := pnet.NewProtector(bytes.NewReader(swarmkey)) if err != nil { return fmt.Errorf("failed to configure private network: %s", err) } @@ -219,27 +214,39 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin } } }() + + libp2pOpts = append(libp2pOpts, libp2p.PrivateNetwork(protec)) } addrsFactory, err := makeAddrsFactory(cfg.Addresses) if err != nil { return err } + if !cfg.Swarm.DisableRelay { + addrsFactory = composeAddrsFactory(addrsFactory, filterRelayAddrs) + } + libp2pOpts = append(libp2pOpts, libp2p.AddrsFactory(addrsFactory)) - connmgr, err := constructConnMgr(cfg.Swarm.ConnMgr) + connm, err := constructConnMgr(cfg.Swarm.ConnMgr) if err != nil { return err } + libp2pOpts = append(libp2pOpts, libp2p.ConnectionManager(connm)) - hostopts := &ConstructPeerHostOpts{ - AddrsFactory: addrsFactory, - DisableNatPortMap: cfg.Swarm.DisableNatPortMap, - DisableRelay: cfg.Swarm.DisableRelay, - EnableRelayHop: cfg.Swarm.EnableRelayHop, - ConnectionManager: connmgr, + libp2pOpts = append(libp2pOpts, makeSmuxTransportOption(mplex)) + + if !cfg.Swarm.DisableNatPortMap { + libp2pOpts = append(libp2pOpts, libp2p.NATPortMap()) } - peerhost, err := hostOption(ctx, n.Identity, n.Peerstore, n.Reporter, - addrfilter, tpt, protec, hostopts) + if !cfg.Swarm.DisableRelay { + var opts []circuit.RelayOpt + if cfg.Swarm.EnableRelayHop { + opts = append(opts, circuit.OptHop) + } + libp2pOpts = append(libp2pOpts, libp2p.EnableRelay(opts...)) + } + + peerhost, err := hostOption(ctx, n.Identity, n.Peerstore, libp2pOpts...) if err != nil { return err @@ -372,8 +379,9 @@ func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) { }, nil } -func makeSmuxTransport(mplexExp bool) smux.Transport { - mstpt := mssmux.NewBlankTransport() +func makeSmuxTransportOption(mplexExp bool) libp2p.Option { + const yamuxID = "/yamux/1.0.0" + const mplexID = "/mplex/6.7.0" ymxtpt := &yamux.Transport{ AcceptBacklog: 512, @@ -388,18 +396,29 @@ func makeSmuxTransport(mplexExp bool) smux.Transport { ymxtpt.LogOutput = os.Stderr } - mstpt.AddTransport("/yamux/1.0.0", ymxtpt) - + muxers := map[string]smux.Transport{yamuxID: ymxtpt} if mplexExp { - mstpt.AddTransport("/mplex/6.7.0", mplex.DefaultTransport) + muxers[mplexID] = mplex.DefaultTransport } // Allow muxer preference order overriding + order := []string{yamuxID, mplexID} if prefs := os.Getenv("LIBP2P_MUX_PREFS"); prefs != "" { - mstpt.OrderPreference = strings.Fields(prefs) + order = strings.Fields(prefs) } - return mstpt + opts := make([]libp2p.Option, 0, len(order)) + for _, id := range order { + tpt, ok := muxers[id] + if !ok { + log.Warning("unknown or duplicate muxer in LIBP2P_MUX_PREFS: %s", id) + continue + } + delete(muxers, id) + opts = append(opts, libp2p.Muxer(id, tpt)) + } + + return libp2p.ChainOptions(opts...) } func setupDiscoveryOption(d config.Discovery) DiscoveryOption { @@ -853,62 +872,18 @@ type ConstructPeerHostOpts struct { ConnectionManager ifconnmgr.ConnManager } -type HostOption func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet, tpt smux.Transport, protc ipnet.Protector, opts *ConstructPeerHostOpts) (p2phost.Host, error) +type HostOption func(ctx context.Context, id peer.ID, ps pstore.Peerstore, options ...libp2p.Option) (p2phost.Host, error) var DefaultHostOption HostOption = constructPeerHost // isolates the complex initialization steps -func constructPeerHost(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet, tpt smux.Transport, protec ipnet.Protector, opts *ConstructPeerHostOpts) (p2phost.Host, error) { - - // no addresses to begin with. we'll start later. - swrm, err := swarm.NewSwarmWithProtector(ctx, nil, id, ps, protec, tpt, bwr) - if err != nil { - return nil, err +func constructPeerHost(ctx context.Context, id peer.ID, ps pstore.Peerstore, options ...libp2p.Option) (p2phost.Host, error) { + pkey := ps.PrivKey(id) + if pkey == nil { + return nil, fmt.Errorf("missing private key for node ID: %s", id.Pretty()) } - - network := (*swarm.Network)(swrm) - - for _, f := range fs { - network.Swarm().Filters.AddDialFilter(f) - } - - hostOpts := []interface{}{bwr} - if !opts.DisableNatPortMap { - hostOpts = append(hostOpts, p2pbhost.NATPortMap) - } - if opts.ConnectionManager != nil { - hostOpts = append(hostOpts, opts.ConnectionManager) - } - - addrsFactory := opts.AddrsFactory - if !opts.DisableRelay { - if addrsFactory != nil { - addrsFactory = composeAddrsFactory(addrsFactory, filterRelayAddrs) - } else { - addrsFactory = filterRelayAddrs - } - } - - if addrsFactory != nil { - hostOpts = append(hostOpts, addrsFactory) - } - - host := p2pbhost.New(network, hostOpts...) - - if !opts.DisableRelay { - var relayOpts []circuit.RelayOpt - if opts.EnableRelayHop { - relayOpts = append(relayOpts, circuit.OptHop) - } - - err := circuit.AddRelayTransport(ctx, host, relayOpts...) - if err != nil { - host.Close() - return nil, err - } - } - - return host, nil + options = append([]libp2p.Option{libp2p.Identity(pkey), libp2p.Peerstore(ps)}, options...) + return libp2p.New(ctx, options...) } func filterRelayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { @@ -936,16 +911,8 @@ func startListening(host p2phost.Host, cfg *config.Config) error { return err } - // make sure we error out if our config does not have addresses we can use - log.Debugf("Config.Addresses.Swarm:%s", listenAddrs) - filteredAddrs := addrutil.FilterUsableAddrs(listenAddrs) - log.Debugf("Config.Addresses.Swarm:%s (filtered)", filteredAddrs) - if len(filteredAddrs) < 1 { - return fmt.Errorf("addresses in config not usable: %s", listenAddrs) - } - // Actually start listening: - if err := host.Network().Listen(filteredAddrs...); err != nil { + if err := host.Network().Listen(listenAddrs...); err != nil { return err } diff --git a/core/corehttp/corehttp.go b/core/corehttp/corehttp.go index 1b88f39b4..34ce9651c 100644 --- a/core/corehttp/corehttp.go +++ b/core/corehttp/corehttp.go @@ -61,7 +61,7 @@ func ListenAndServe(n *core.IpfsNode, listeningMultiAddr string, options ...Serv addr = list.Multiaddr() fmt.Printf("API server listening on %s\n", addr) - return Serve(n, list.NetListener(), options...) + return Serve(n, manet.NetListener(list), options...) } func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error { diff --git a/core/corehttp/metrics_test.go b/core/corehttp/metrics_test.go index 997506e5e..ded143888 100644 --- a/core/corehttp/metrics_test.go +++ b/core/corehttp/metrics_test.go @@ -7,9 +7,9 @@ import ( core "github.com/ipfs/go-ipfs/core" + swarmt "gx/ipfs/QmRpKdg1xs4Yyrn9yrVYRBp7AQqyRxMLpD6Jgp1eZAGqEr/go-libp2p-swarm/testing" inet "gx/ipfs/QmXoz9o2PT3tEzf7hicegwex5UgVP54n3k82K7jrWFyN86/go-libp2p-net" bhost "gx/ipfs/QmY6iAoG9DVgZwh5ZRcQEpa2uErAe1Hbei8qXPCjpDS9Ge/go-libp2p/p2p/host/basic" - testutil "gx/ipfs/Qma2UuHusnaFV24DgeZ5hyrM9uc4UdyVaZbtn2FQsPRhES/go-libp2p-netutil" ) // This test is based on go-libp2p/p2p/net/swarm.TestConnectednessCorrect @@ -20,11 +20,11 @@ func TestPeersTotal(t *testing.T) { hosts := make([]*bhost.BasicHost, 4) for i := 0; i < 4; i++ { - hosts[i] = bhost.New(testutil.GenSwarmNetwork(t, ctx)) + hosts[i] = bhost.New(swarmt.GenSwarm(t, ctx)) } dial := func(a, b inet.Network) { - testutil.DivulgeAddresses(b, a) + swarmt.DivulgeAddresses(b, a) if _, err := a.DialPeer(ctx, b.LocalPeer()); err != nil { t.Fatalf("Failed to dial: %s", err) } diff --git a/core/mock/mock.go b/core/mock/mock.go index 96e1480e1..69f2b85b1 100644 --- a/core/mock/mock.go +++ b/core/mock/mock.go @@ -2,7 +2,6 @@ package coremock import ( "context" - "net" commands "github.com/ipfs/go-ipfs/commands" core "github.com/ipfs/go-ipfs/core" @@ -10,12 +9,10 @@ import ( config "github.com/ipfs/go-ipfs/repo/config" testutil "gx/ipfs/QmUJzxQQ2kzwQubsMqBTr1NGDpLfh7pGA2E1oaJULcKDPq/go-testutil" - metrics "gx/ipfs/QmVvu4bS5QLfS19ePkp5Wgzn2ZUma5oXTT9BgDFyQLxUZF/go-libp2p-metrics" + libp2p "gx/ipfs/QmY6iAoG9DVgZwh5ZRcQEpa2uErAe1Hbei8qXPCjpDS9Ge/go-libp2p" mocknet "gx/ipfs/QmY6iAoG9DVgZwh5ZRcQEpa2uErAe1Hbei8qXPCjpDS9Ge/go-libp2p/p2p/net/mock" - smux "gx/ipfs/QmY9JXR3FupnYAYJWK9aMr9bCpqWKcToQ1tz8DVGTrHpHw/go-stream-muxer" host "gx/ipfs/QmaSfSMvc1VPZ8JbMponFs4WHvF9FgEruF56opm5E1RgQA/go-libp2p-host" peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer" - ipnet "gx/ipfs/Qmd3oYWVLCVWryDV6Pobv6whZcvDXAHqS3chemZ658y4a8/go-libp2p-interface-pnet" pstore "gx/ipfs/QmdeiKhUy1TVGBaKxt7y1QmBDLBdisSrLJ1x58Eoj4PXUh/go-libp2p-peerstore" datastore "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore" syncds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore/sync" @@ -33,7 +30,7 @@ func NewMockNode() (*core.IpfsNode, error) { } func MockHostOption(mn mocknet.Mocknet) core.HostOption { - return func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet, _ smux.Transport, _ ipnet.Protector, _ *core.ConstructPeerHostOpts) (host.Host, error) { + return func(ctx context.Context, id peer.ID, ps pstore.Peerstore, _ ...libp2p.Option) (host.Host, error) { return mn.AddPeerWithPeerstore(id, ps) } } diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index e2a0612a7..9388a65f4 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -54,7 +54,7 @@ type streamMessageSender struct { } func (s *streamMessageSender) Close() error { - return s.s.Close() + return inet.FullClose(s.s) } func (s *streamMessageSender) Reset() error { @@ -119,13 +119,13 @@ func (bsnet *impl) SendMessage( return err } - err = msgToStream(ctx, s, outgoing) - if err != nil { + if err = msgToStream(ctx, s, outgoing); err != nil { s.Reset() - } else { - s.Close() + return err } - return err + // Yes, return this error. We have no reason to believe that the block + // was actually *sent* unless we see the EOF. + return inet.FullClose(s) } func (bsnet *impl) SetDelegate(r Receiver) { diff --git a/package.json b/package.json index 823298f90..babb13f73 100644 --- a/package.json +++ b/package.json @@ -311,12 +311,6 @@ "name": "go-multibase", "version": "0.2.6" }, - { - "author": "whyrusleeping", - "hash": "QmYDNqBAMWVMHKndYR35Sd8PfEVWBiDmpHYkuRJTunJDeJ", - "name": "go-libp2p-interface-conn", - "version": "0.4.13" - }, { "author": "multiformats", "hash": "QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb",