diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index e0ef36bf5..0cc406941 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -48,6 +48,7 @@ const ( unrestrictedApiAccessKwd = "unrestricted-api" writableKwd = "writable" enableFloodSubKwd = "enable-pubsub-experiment" + enableMultiplexKwd = "enable-mplex-experiment" // apiAddrKwd = "address-api" // swarmAddrKwd = "address-swarm" ) @@ -158,6 +159,7 @@ Headers. cmds.BoolOption(offlineKwd, "Run offline. Do not connect to the rest of the network but provide local API.").Default(false), cmds.BoolOption(migrateKwd, "If true, assume yes at the migrate prompt. If false, assume no."), cmds.BoolOption(enableFloodSubKwd, "Instantiate the ipfs daemon with the experimental pubsub feature enabled."), + cmds.BoolOption(enableMultiplexKwd, "Add the experimental 'go-multiplex' stream muxer to libp2p on construction."), // TODO: add way to override addresses. tricky part: updating the config if also --init. // cmds.StringOption(apiAddrKwd, "Address for the daemon rpc API (overrides config)"), @@ -288,6 +290,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) { offline, _, _ := req.Option(offlineKwd).Bool() pubsub, _, _ := req.Option(enableFloodSubKwd).Bool() + mplex, _, _ := req.Option(enableMultiplexKwd).Bool() // Start assembling node config ncfg := &core.BuildCfg{ @@ -296,6 +299,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) { Online: !offline, ExtraOpts: map[string]bool{ "pubsub": pubsub, + "mplex": mplex, }, //TODO(Kubuxu): refactor Online vs Offline by adding Permanent vs Ephemeral } diff --git a/core/builder.go b/core/builder.go index a126a7b34..49a40440b 100644 --- a/core/builder.go +++ b/core/builder.go @@ -197,7 +197,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { if cfg.Online { do := setupDiscoveryOption(rcfg.Discovery) - if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub")); err != nil { + if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub"), cfg.getOpt("mplex")); err != nil { return err } } else { diff --git a/core/core.go b/core/core.go index da795acd0..663516671 100644 --- a/core/core.go +++ b/core/core.go @@ -14,7 +14,10 @@ import ( "errors" "fmt" "io" + "io/ioutil" "net" + "os" + "strings" "time" bstore "github.com/ipfs/go-ipfs/blocks/blockstore" @@ -44,19 +47,24 @@ import ( mamask "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" b58 "gx/ipfs/QmT8rehPR3F6bmwL6zjUN8XpiDBFFpMP2myPdC6ApsWfJf/go-base58" + mssmux "gx/ipfs/QmTfjLsou9ic6L4KqCcmbLSZcdiFu8q1v6njKp121pbbXx/go-smux-multistream" ma "gx/ipfs/QmUAQaWbKxGCUTuoQVvvicbQNZ9APF5pDGWyAZSe93AtKH/go-multiaddr" floodsub "gx/ipfs/QmV5jot2GfVXmgvetHExJCa2hprebf3AKjprZtuwaXSr1v/floodsub" addrutil "gx/ipfs/QmVDnc2zvyQm8LhT72n22THcshvH7j3qPMnhvjerQER62T/go-addr-util" + spdy "gx/ipfs/QmWUNsat6Jb19nC5CiJCDXepTkxjdxi3eZqeoB6mrmmaGu/go-smux-spdystream" swarm "gx/ipfs/QmWfxnAiQ5TnnCgiX9ikVUKFNHRgGhbgKdx5DoKPELD7P4/go-libp2p-swarm" + mplex "gx/ipfs/QmXGevGDVTqeKdisBzaxEK4CJZqfxeXiVSWLaXaVWcG5on/go-smux-multiplex" metrics "gx/ipfs/QmY2otvyPM2sTaDsczo7Yuosg98sUMCJ9qx1gpPaAPTS9B/go-libp2p-metrics" u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" routing "gx/ipfs/QmbkGVaN9W6RYJK4Ws5FvMKXKDqdRQ5snhtaa92qP6L8eU/go-libp2p-routing" + yamux "gx/ipfs/Qmbn7RYyWzBVXiUp9jZ1dA4VADHy9DtS7iZLwfhEUQvm3U/go-smux-yamux" discovery "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/discovery" p2pbhost "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/host/basic" rhost "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/host/routed" ping "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/protocol/ping" cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" pstore "gx/ipfs/QmeXj9VAjmYQZxpmVz7VzccbJrpmr8qkCDSjfVNsPTWTYU/go-libp2p-peerstore" + smux "gx/ipfs/QmeZBgYBHvxMukGK5ojg28BCNLB9SeXqT7XXg6o7r2GbJy/go-stream-muxer" peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" ic "gx/ipfs/QmfWDLQjGjVe4fr5CoztYW2DYYjRysMJrFe1RCsXLPTf46/go-libp2p-crypto" ) @@ -129,7 +137,7 @@ type Mounts struct { Ipns mount.Mount } -func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub bool) error { +func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub, mplex bool) error { if n.PeerHost != nil { // already online. return errors.New("node already online") @@ -159,7 +167,9 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin n.Reporter = metrics.NewBandwidthCounter() } - peerhost, err := hostOption(ctx, n.Identity, n.Peerstore, n.Reporter, addrfilter) + tpt := makeSmuxTransport(mplex) + + peerhost, err := hostOption(ctx, n.Identity, n.Peerstore, n.Reporter, addrfilter, tpt) if err != nil { return err } @@ -207,6 +217,34 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin return n.Bootstrap(DefaultBootstrapConfig) } +func makeSmuxTransport(mplexExp bool) smux.Transport { + mstpt := mssmux.NewBlankTransport() + + ymxtpt := &yamux.Transport{ + AcceptBacklog: 8192, + ConnectionWriteTimeout: time.Second * 10, + KeepAliveInterval: time.Second * 30, + EnableKeepAlive: true, + MaxStreamWindowSize: uint32(1024 * 512), + LogOutput: ioutil.Discard, + } + + mstpt.AddTransport("/yamux/1.0.0", ymxtpt) + + mstpt.AddTransport("/spdy/3.1.0", spdy.Transport) + + if mplexExp { + mstpt.AddTransport("/mplex/6.7.0", mplex.DefaultTransport) + } + + // Allow muxer preference order overriding + if prefs := os.Getenv("LIBP2P_MUX_PREFS"); prefs != "" { + mstpt.OrderPreference = strings.Fields(prefs) + } + + return mstpt +} + func setupDiscoveryOption(d config.Discovery) DiscoveryOption { if d.MDNS.Enabled { return func(ctx context.Context, h p2phost.Host) (discovery.Service, error) { @@ -616,19 +654,21 @@ func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) { return listen, nil } -type HostOption func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet) (p2phost.Host, error) +type HostOption func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet, tpt smux.Transport) (p2phost.Host, error) var DefaultHostOption HostOption = constructPeerHost // isolates the complex initialization steps -func constructPeerHost(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet) (p2phost.Host, error) { +func constructPeerHost(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet, tpt smux.Transport) (p2phost.Host, error) { // no addresses to begin with. we'll start later. - network, err := swarm.NewNetwork(ctx, nil, id, ps, bwr) + swrm, err := swarm.NewSwarmWithProtector(ctx, nil, id, ps, nil, tpt, bwr) if err != nil { return nil, err } + network := (*swarm.Network)(swrm) + for _, f := range fs { network.Swarm().Filters.AddDialFilter(f) } diff --git a/core/mock/mock.go b/core/mock/mock.go index 2303afd3e..06e98b346 100644 --- a/core/mock/mock.go +++ b/core/mock/mock.go @@ -1,22 +1,23 @@ package coremock import ( + "context" "net" - context "context" - "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore" - syncds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/sync" - commands "github.com/ipfs/go-ipfs/commands" core "github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/repo" config "github.com/ipfs/go-ipfs/repo/config" ds2 "github.com/ipfs/go-ipfs/thirdparty/datastore2" testutil "github.com/ipfs/go-ipfs/thirdparty/testutil" + host "gx/ipfs/QmPTGbC34bPKaUm9wTxBo7zSCac7pDuG42ZmnXC718CKZZ/go-libp2p-host" + "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore" + syncds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/sync" metrics "gx/ipfs/QmY2otvyPM2sTaDsczo7Yuosg98sUMCJ9qx1gpPaAPTS9B/go-libp2p-metrics" mocknet "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/net/mock" pstore "gx/ipfs/QmeXj9VAjmYQZxpmVz7VzccbJrpmr8qkCDSjfVNsPTWTYU/go-libp2p-peerstore" + smux "gx/ipfs/QmeZBgYBHvxMukGK5ojg28BCNLB9SeXqT7XXg6o7r2GbJy/go-stream-muxer" peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" ) @@ -32,7 +33,7 @@ func NewMockNode() (*core.IpfsNode, error) { } func MockHostOption(mn mocknet.Mocknet) core.HostOption { - return func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet) (host.Host, error) { + return func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet, _ smux.Transport) (host.Host, error) { return mn.AddPeerWithPeerstore(id, ps) } } diff --git a/package.json b/package.json index 50ff060d0..20e3e5a06 100644 --- a/package.json +++ b/package.json @@ -287,6 +287,12 @@ "hash": "QmXuBJ7DR6k3rmUEKtvVMhwjmXDuJgXXPUt4LQXKBMsU93", "name": "go-os-helper", "version": "0.0.0" + }, + { + "author": "whyrusleeping", + "hash": "QmXGevGDVTqeKdisBzaxEK4CJZqfxeXiVSWLaXaVWcG5on", + "name": "go-smux-multiplex", + "version": "1.1.4" } ], "gxVersion": "0.4.0",