diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index 2aed209ad..6d3d920d2 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -20,6 +20,7 @@ import ( coreapi "github.com/ipfs/go-ipfs/core/coreapi" corehttp "github.com/ipfs/go-ipfs/core/corehttp" corerepo "github.com/ipfs/go-ipfs/core/corerepo" + "github.com/ipfs/go-ipfs/core/node" nodeMount "github.com/ipfs/go-ipfs/fuse/node" fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo" migrate "github.com/ipfs/go-ipfs/repo/fsrepo/migrations" @@ -323,11 +324,11 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment case routingOptionSupernodeKwd: return errors.New("supernode routing was never fully implemented and has been removed") case routingOptionDHTClientKwd: - ncfg.Routing = core.DHTClientOption + ncfg.Routing = node.DHTClientOption case routingOptionDHTKwd: - ncfg.Routing = core.DHTOption + ncfg.Routing = node.DHTOption case routingOptionNoneKwd: - ncfg.Routing = core.NilRouterOption + ncfg.Routing = node.NilRouterOption default: return fmt.Errorf("unrecognized routing option: %s", routingOption) } @@ -372,7 +373,7 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment if err != nil { return err } - node.Process().AddChild(goprocess.WithTeardown(cctx.Plugins.Close)) + node.Process.AddChild(goprocess.WithTeardown(cctx.Plugins.Close)) // construct api endpoint - every time apiErrc, err := serveHTTPApi(req, cctx) diff --git a/core/bootstrap.go b/core/bootstrap/bootstrap.go similarity index 76% rename from core/bootstrap.go rename to core/bootstrap/bootstrap.go index 5ff4f8e14..d7c107690 100644 --- a/core/bootstrap.go +++ b/core/bootstrap/bootstrap.go @@ -1,4 +1,4 @@ -package core +package bootstrap import ( "context" @@ -9,19 +9,21 @@ import ( "sync" "time" - math2 "github.com/ipfs/go-ipfs/thirdparty/math2" - lgbl "github.com/libp2p/go-libp2p-loggables" - config "github.com/ipfs/go-ipfs-config" - goprocess "github.com/jbenet/goprocess" - procctx "github.com/jbenet/goprocess/context" - periodicproc "github.com/jbenet/goprocess/periodic" - host "github.com/libp2p/go-libp2p-host" - inet "github.com/libp2p/go-libp2p-net" - peer "github.com/libp2p/go-libp2p-peer" - pstore "github.com/libp2p/go-libp2p-peerstore" + logging "github.com/ipfs/go-log" + "github.com/jbenet/goprocess" + "github.com/jbenet/goprocess/context" + "github.com/jbenet/goprocess/periodic" + "github.com/libp2p/go-libp2p-host" + "github.com/libp2p/go-libp2p-loggables" + "github.com/libp2p/go-libp2p-net" + "github.com/libp2p/go-libp2p-peer" + "github.com/libp2p/go-libp2p-peerstore" + "github.com/libp2p/go-libp2p-routing" ) +var log = logging.Logger("bootstrap") + // ErrNotEnoughBootstrapPeers signals that we do not have enough bootstrap // peers to bootstrap correctly. var ErrNotEnoughBootstrapPeers = errors.New("not enough bootstrap peers to bootstrap") @@ -29,7 +31,6 @@ var ErrNotEnoughBootstrapPeers = errors.New("not enough bootstrap peers to boots // BootstrapConfig specifies parameters used in an IpfsNode's network // bootstrapping process. type BootstrapConfig struct { - // MinPeerThreshold governs whether to bootstrap more connections. If the // node has less open connections than this number, it will open connections // to the bootstrap nodes. From there, the routing system should be able @@ -50,7 +51,7 @@ type BootstrapConfig struct { // BootstrapPeers is a function that returns a set of bootstrap peers // for the bootstrap process to use. This makes it possible for clients // to control the peers the process uses at any moment. - BootstrapPeers func() []pstore.PeerInfo + BootstrapPeers func() []peerstore.PeerInfo } // DefaultBootstrapConfig specifies default sane parameters for bootstrapping. @@ -60,9 +61,9 @@ var DefaultBootstrapConfig = BootstrapConfig{ ConnectionTimeout: (30 * time.Second) / 3, // Perod / 3 } -func BootstrapConfigWithPeers(pis []pstore.PeerInfo) BootstrapConfig { +func BootstrapConfigWithPeers(pis []peerstore.PeerInfo) BootstrapConfig { cfg := DefaultBootstrapConfig - cfg.BootstrapPeers = func() []pstore.PeerInfo { + cfg.BootstrapPeers = func() []peerstore.PeerInfo { return pis } return cfg @@ -72,7 +73,7 @@ func BootstrapConfigWithPeers(pis []pstore.PeerInfo) BootstrapConfig { // check the number of open connections and -- if there are too few -- initiate // connections to well-known bootstrap peers. It also kicks off subsystem // bootstrapping (i.e. routing). -func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) { +func Bootstrap(id peer.ID, host host.Host, rt routing.IpfsRouting, cfg BootstrapConfig) (io.Closer, error) { // make a signal to wait for one bootstrap round to complete. doneWithRound := make(chan struct{}) @@ -85,12 +86,12 @@ func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) { // the periodic bootstrap function -- the connection supervisor periodic := func(worker goprocess.Process) { - ctx := procctx.OnClosingContext(worker) - defer log.EventBegin(ctx, "periodicBootstrap", n.Identity).Done() + ctx := goprocessctx.OnClosingContext(worker) + defer log.EventBegin(ctx, "periodicBootstrap", id).Done() - if err := bootstrapRound(ctx, n.PeerHost, cfg); err != nil { - log.Event(ctx, "bootstrapError", n.Identity, lgbl.Error(err)) - log.Debugf("%s bootstrap error: %s", n.Identity, err) + if err := bootstrapRound(ctx, host, cfg); err != nil { + log.Event(ctx, "bootstrapError", id, loggables.Error(err)) + log.Debugf("%s bootstrap error: %s", id, err) } <-doneWithRound @@ -101,9 +102,9 @@ func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) { proc.Go(periodic) // run one right now. // kick off Routing.Bootstrap - if n.Routing != nil { - ctx := procctx.OnClosingContext(proc) - if err := n.Routing.Bootstrap(ctx); err != nil { + if rt != nil { + ctx := goprocessctx.OnClosingContext(proc) + if err := rt.Bootstrap(ctx); err != nil { proc.Close() return nil, err } @@ -134,9 +135,9 @@ func bootstrapRound(ctx context.Context, host host.Host, cfg BootstrapConfig) er numToDial := cfg.MinPeerThreshold - len(connected) // filter out bootstrap nodes we are already connected to - var notConnected []pstore.PeerInfo + var notConnected []peerstore.PeerInfo for _, p := range peers { - if host.Network().Connectedness(p.ID) != inet.Connected { + if host.Network().Connectedness(p.ID) != net.Connected { notConnected = append(notConnected, p) } } @@ -155,7 +156,7 @@ func bootstrapRound(ctx context.Context, host host.Host, cfg BootstrapConfig) er return bootstrapConnect(ctx, host, randSubset) } -func bootstrapConnect(ctx context.Context, ph host.Host, peers []pstore.PeerInfo) error { +func bootstrapConnect(ctx context.Context, ph host.Host, peers []peerstore.PeerInfo) error { if len(peers) < 1 { return ErrNotEnoughBootstrapPeers } @@ -170,12 +171,12 @@ func bootstrapConnect(ctx context.Context, ph host.Host, peers []pstore.PeerInfo // Also, performed asynchronously for dial speed. wg.Add(1) - go func(p pstore.PeerInfo) { + go func(p peerstore.PeerInfo) { defer wg.Done() defer log.EventBegin(ctx, "bootstrapDial", ph.ID(), p.ID).Done() log.Debugf("%s bootstrapping to %s", ph.ID(), p.ID) - ph.Peerstore().AddAddrs(p.ID, p.Addrs, pstore.PermanentAddrTTL) + ph.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL) if err := ph.Connect(ctx, p); err != nil { log.Event(ctx, "bootstrapDialFailed", p.ID) log.Debugf("failed to bootstrap with %v: %s", p.ID, err) @@ -204,12 +205,26 @@ func bootstrapConnect(ctx context.Context, ph host.Host, peers []pstore.PeerInfo return nil } -func toPeerInfos(bpeers []config.BootstrapPeer) []pstore.PeerInfo { - pinfos := make(map[peer.ID]*pstore.PeerInfo) +func randomSubsetOfPeers(in []peerstore.PeerInfo, max int) []peerstore.PeerInfo { + if max > len(in) { + max = len(in) + } + + out := make([]peerstore.PeerInfo, max) + for i, val := range rand.Perm(len(in))[:max] { + out[i] = in[val] + } + return out +} + +type Peers []config.BootstrapPeer + +func (bpeers Peers) ToPeerInfos() []peerstore.PeerInfo { + pinfos := make(map[peer.ID]*peerstore.PeerInfo) for _, bootstrap := range bpeers { pinfo, ok := pinfos[bootstrap.ID()] if !ok { - pinfo = new(pstore.PeerInfo) + pinfo = new(peerstore.PeerInfo) pinfos[bootstrap.ID()] = pinfo pinfo.ID = bootstrap.ID() } @@ -217,22 +232,10 @@ func toPeerInfos(bpeers []config.BootstrapPeer) []pstore.PeerInfo { pinfo.Addrs = append(pinfo.Addrs, bootstrap.Transport()) } - var peers []pstore.PeerInfo + var peers []peerstore.PeerInfo for _, pinfo := range pinfos { peers = append(peers, *pinfo) } return peers } - -func randomSubsetOfPeers(in []pstore.PeerInfo, max int) []pstore.PeerInfo { - n := math2.IntMin(max, len(in)) - var out []pstore.PeerInfo - for _, val := range rand.Perm(len(in)) { - out = append(out, in[val]) - if len(out) >= n { - break - } - } - return out -} diff --git a/core/bootstrap_test.go b/core/bootstrap/bootstrap_test.go similarity index 95% rename from core/bootstrap_test.go rename to core/bootstrap/bootstrap_test.go index 51e85d8aa..0c7799858 100644 --- a/core/bootstrap_test.go +++ b/core/bootstrap/bootstrap_test.go @@ -1,4 +1,4 @@ -package core +package bootstrap import ( "fmt" @@ -49,7 +49,7 @@ func TestMultipleAddrsPerPeer(t *testing.T) { bsps = append(bsps, bsp1, bsp2) } - pinfos := toPeerInfos(bsps) + pinfos := Peers.ToPeerInfos(bsps) if len(pinfos) != len(bsps)/2 { t.Fatal("expected fewer peers") } diff --git a/core/builder.go b/core/builder.go index 1fdec9944..60cf2cb23 100644 --- a/core/builder.go +++ b/core/builder.go @@ -2,292 +2,53 @@ package core import ( "context" - "crypto/rand" - "encoding/base64" - "errors" - "github.com/ipfs/go-ipfs/provider" - "os" - "syscall" - "time" - filestore "github.com/ipfs/go-ipfs/filestore" - namesys "github.com/ipfs/go-ipfs/namesys" - pin "github.com/ipfs/go-ipfs/pin" - repo "github.com/ipfs/go-ipfs/repo" - cidv0v1 "github.com/ipfs/go-ipfs/thirdparty/cidv0v1" - "github.com/ipfs/go-ipfs/thirdparty/verifbs" + "github.com/ipfs/go-metrics-interface" + "go.uber.org/fx" - bserv "github.com/ipfs/go-blockservice" - ds "github.com/ipfs/go-datastore" - retry "github.com/ipfs/go-datastore/retrystore" - dsync "github.com/ipfs/go-datastore/sync" - bstore "github.com/ipfs/go-ipfs-blockstore" - cfg "github.com/ipfs/go-ipfs-config" - offline "github.com/ipfs/go-ipfs-exchange-offline" - offroute "github.com/ipfs/go-ipfs-routing/offline" - ipns "github.com/ipfs/go-ipns" - dag "github.com/ipfs/go-merkledag" - metrics "github.com/ipfs/go-metrics-interface" - resolver "github.com/ipfs/go-path/resolver" - uio "github.com/ipfs/go-unixfs/io" - goprocessctx "github.com/jbenet/goprocess/context" - libp2p "github.com/libp2p/go-libp2p" - ci "github.com/libp2p/go-libp2p-crypto" - p2phost "github.com/libp2p/go-libp2p-host" - peer "github.com/libp2p/go-libp2p-peer" - pstore "github.com/libp2p/go-libp2p-peerstore" - pstoremem "github.com/libp2p/go-libp2p-peerstore/pstoremem" - record "github.com/libp2p/go-libp2p-record" + "github.com/ipfs/go-ipfs/core/bootstrap" + "github.com/ipfs/go-ipfs/core/node" ) -type BuildCfg struct { - // If online is set, the node will have networking enabled - Online bool - - // ExtraOpts is a map of extra options used to configure the ipfs nodes creation - ExtraOpts map[string]bool - - // If permanent then node should run more expensive processes - // that will improve performance in long run - Permanent bool - - // DisableEncryptedConnections disables connection encryption *entirely*. - // DO NOT SET THIS UNLESS YOU'RE TESTING. - DisableEncryptedConnections bool - - // If NilRepo is set, a repo backed by a nil datastore will be constructed - NilRepo bool - - Routing RoutingOption - Host HostOption - Repo repo.Repo -} - -func (cfg *BuildCfg) getOpt(key string) bool { - if cfg.ExtraOpts == nil { - return false - } - - return cfg.ExtraOpts[key] -} - -func (cfg *BuildCfg) fillDefaults() error { - if cfg.Repo != nil && cfg.NilRepo { - return errors.New("cannot set a repo and specify nilrepo at the same time") - } - - if cfg.Repo == nil { - var d ds.Datastore - if cfg.NilRepo { - d = ds.NewNullDatastore() - } else { - d = ds.NewMapDatastore() - } - r, err := defaultRepo(dsync.MutexWrap(d)) - if err != nil { - return err - } - cfg.Repo = r - } - - if cfg.Routing == nil { - cfg.Routing = DHTOption - } - - if cfg.Host == nil { - cfg.Host = DefaultHostOption - } - - return nil -} - -func defaultRepo(dstore repo.Datastore) (repo.Repo, error) { - c := cfg.Config{} - priv, pub, err := ci.GenerateKeyPairWithReader(ci.RSA, 1024, rand.Reader) - if err != nil { - return nil, err - } - - pid, err := peer.IDFromPublicKey(pub) - if err != nil { - return nil, err - } - - privkeyb, err := priv.Bytes() - if err != nil { - return nil, err - } - - c.Bootstrap = cfg.DefaultBootstrapAddresses - c.Addresses.Swarm = []string{"/ip4/0.0.0.0/tcp/4001"} - c.Identity.PeerID = pid.Pretty() - c.Identity.PrivKey = base64.StdEncoding.EncodeToString(privkeyb) - - return &repo.Mock{ - D: dstore, - C: c, - }, nil -} +type BuildCfg = node.BuildCfg // Alias for compatibility until we properly refactor the constructor interface // NewNode constructs and returns an IpfsNode using the given cfg. func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) { - if cfg == nil { - cfg = new(BuildCfg) - } - - err := cfg.fillDefaults() - if err != nil { - return nil, err - } - ctx = metrics.CtxScope(ctx, "ipfs") n := &IpfsNode{ - IsOnline: cfg.Online, - Repo: cfg.Repo, - ctx: ctx, - Peerstore: pstoremem.NewPeerstore(), + ctx: ctx, } - n.RecordValidator = record.NamespacedValidator{ - "pk": record.PublicKeyValidator{}, - "ipns": ipns.Validator{KeyBook: n.Peerstore}, + app := fx.New( + node.IPFS(ctx, cfg), + + fx.NopLogger, + fx.Extract(n), + ) + + go func() { + // Note that some services use contexts to signal shutting down, which is + // very suboptimal. This needs to be here until that's addressed somehow + <-ctx.Done() + app.Stop(context.Background()) + }() + + n.IsOnline = cfg.Online + n.app = app + + if app.Err() != nil { + return nil, app.Err() } - // TODO: this is a weird circular-ish dependency, rework it - n.proc = goprocessctx.WithContextAndTeardown(ctx, n.teardown) - - if err := setupNode(ctx, n, cfg); err != nil { - n.Close() + if err := app.Start(ctx); err != nil { return nil, err } - return n, nil -} - -func isTooManyFDError(err error) bool { - perr, ok := err.(*os.PathError) - if ok && perr.Err == syscall.EMFILE { - return true - } - - return false -} - -func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { - // setup local identity - if err := n.loadID(); err != nil { - return err - } - - // load the private key (if present) - if err := n.loadPrivateKey(); err != nil { - return err - } - - rds := &retry.Datastore{ - Batching: n.Repo.Datastore(), - Delay: time.Millisecond * 200, - Retries: 6, - TempErrFunc: isTooManyFDError, - } - - // hash security - bs := bstore.NewBlockstore(rds) - bs = &verifbs.VerifBS{Blockstore: bs} - - opts := bstore.DefaultCacheOpts() - conf, err := n.Repo.Config() - if err != nil { - return err - } - - // TEMP: setting global sharding switch here - uio.UseHAMTSharding = conf.Experimental.ShardingEnabled - - opts.HasBloomFilterSize = conf.Datastore.BloomFilterSize - if !cfg.Permanent { - opts.HasBloomFilterSize = 0 - } - - if !cfg.NilRepo { - bs, err = bstore.CachedBlockstore(ctx, bs, opts) - if err != nil { - return err - } - } - - bs = bstore.NewIdStore(bs) - - bs = cidv0v1.NewBlockstore(bs) - - n.BaseBlocks = bs - n.GCLocker = bstore.NewGCLocker() - n.Blockstore = bstore.NewGCBlockstore(bs, n.GCLocker) - - if conf.Experimental.FilestoreEnabled || conf.Experimental.UrlstoreEnabled { - // hash security - n.Filestore = filestore.NewFilestore(bs, n.Repo.FileManager()) - n.Blockstore = bstore.NewGCBlockstore(n.Filestore, n.GCLocker) - n.Blockstore = &verifbs.VerifBSGC{GCBlockstore: n.Blockstore} - } - - rcfg, err := n.Repo.Config() - if err != nil { - return err - } - - if rcfg.Datastore.HashOnRead { - bs.HashOnRead(true) - } - - hostOption := cfg.Host - if cfg.DisableEncryptedConnections { - innerHostOption := hostOption - hostOption = func(ctx context.Context, id peer.ID, ps pstore.Peerstore, options ...libp2p.Option) (p2phost.Host, error) { - return innerHostOption(ctx, id, ps, append(options, libp2p.NoSecurity)...) - } - log.Warningf(`Your IPFS node has been configured to run WITHOUT ENCRYPTED CONNECTIONS. - You will not be able to connect to any nodes configured to use encrypted connections`) - } - - if cfg.Online { - do := setupDiscoveryOption(rcfg.Discovery) - if err := n.startOnlineServices(ctx, cfg.Routing, hostOption, do, cfg.getOpt("pubsub"), cfg.getOpt("ipnsps"), cfg.getOpt("mplex")); err != nil { - return err - } - } else { - n.Exchange = offline.Exchange(n.Blockstore) - n.Routing = offroute.NewOfflineRouter(n.Repo.Datastore(), n.RecordValidator) - n.Namesys = namesys.NewNameSystem(n.Routing, n.Repo.Datastore(), 0) - } - - n.Blocks = bserv.New(n.Blockstore, n.Exchange) - n.DAG = dag.NewDAGService(n.Blocks) - - internalDag := dag.NewDAGService(bserv.New(n.Blockstore, offline.Exchange(n.Blockstore))) - n.Pinning, err = pin.LoadPinner(n.Repo.Datastore(), n.DAG, internalDag) - if err != nil { - // TODO: we should move towards only running 'NewPinner' explicitly on - // node init instead of implicitly here as a result of the pinner keys - // not being found in the datastore. - // this is kinda sketchy and could cause data loss - n.Pinning = pin.NewPinner(n.Repo.Datastore(), n.DAG, internalDag) - } - n.Resolver = resolver.NewBasicResolver(n.DAG) - - // Provider - queue, err := provider.NewQueue(ctx, "provider-v1", n.Repo.Datastore()) - if err != nil { - return err - } - n.Provider = provider.NewProvider(ctx, queue, n.Routing) - - if cfg.Online { - if err := n.startLateOnlineServices(ctx); err != nil { - return err - } - } - - return n.loadFilesRoot() + // TODO: How soon will bootstrap move to libp2p? + if !cfg.Online { + return n, nil + } + + return n, n.Bootstrap(bootstrap.DefaultBootstrapConfig) } diff --git a/core/commands/refs.go b/core/commands/refs.go index d63786f87..9a006cb8d 100644 --- a/core/commands/refs.go +++ b/core/commands/refs.go @@ -9,6 +9,7 @@ import ( core "github.com/ipfs/go-ipfs/core" cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv" + "github.com/ipfs/go-ipfs/namesys/resolve" cid "github.com/ipfs/go-cid" cidenc "github.com/ipfs/go-cidutil/cidenc" @@ -173,7 +174,7 @@ func objectsForPaths(ctx context.Context, n *core.IpfsNode, paths []string) ([]i return nil, err } - o, err := core.Resolve(ctx, n.Namesys, n.Resolver, p) + o, err := resolve.Resolve(ctx, n.Namesys, n.Resolver, p) if err != nil { return nil, err } diff --git a/core/commands/shutdown.go b/core/commands/shutdown.go index 4a6d44dd4..0586e3da4 100644 --- a/core/commands/shutdown.go +++ b/core/commands/shutdown.go @@ -21,7 +21,7 @@ var daemonShutdownCmd = &cmds.Command{ return cmdkit.Errorf(cmdkit.ErrClient, "daemon not running") } - if err := nd.Process().Close(); err != nil { + if err := nd.Close(); err != nil { log.Error("error while shutting down ipfs daemon:", err) } diff --git a/core/commands/tar.go b/core/commands/tar.go index 488193f18..d10c97fdd 100644 --- a/core/commands/tar.go +++ b/core/commands/tar.go @@ -4,8 +4,8 @@ import ( "fmt" "io" - "github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/core/commands/cmdenv" + "github.com/ipfs/go-ipfs/namesys/resolve" tar "github.com/ipfs/go-ipfs/tar" "github.com/ipfs/go-ipfs-cmdkit" @@ -97,7 +97,7 @@ var tarCatCmd = &cmds.Command{ return err } - root, err := core.Resolve(req.Context, nd.Namesys, nd.Resolver, p) + root, err := resolve.Resolve(req.Context, nd.Namesys, nd.Resolver, p) if err != nil { return err } diff --git a/core/core.go b/core/core.go index 99379b85c..23aa84e77 100644 --- a/core/core.go +++ b/core/core.go @@ -10,81 +10,49 @@ interfaces and how core/... fits into the bigger IPFS picture, see: package core import ( - "bytes" "context" - "errors" - "fmt" "io" - "io/ioutil" - "os" - "strings" - "time" + + "go.uber.org/fx" version "github.com/ipfs/go-ipfs" + "github.com/ipfs/go-ipfs/core/bootstrap" + "github.com/ipfs/go-ipfs/core/node" rp "github.com/ipfs/go-ipfs/exchange/reprovide" - filestore "github.com/ipfs/go-ipfs/filestore" - mount "github.com/ipfs/go-ipfs/fuse/mount" - namesys "github.com/ipfs/go-ipfs/namesys" + "github.com/ipfs/go-ipfs/filestore" + "github.com/ipfs/go-ipfs/fuse/mount" + "github.com/ipfs/go-ipfs/namesys" ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher" - p2p "github.com/ipfs/go-ipfs/p2p" - pin "github.com/ipfs/go-ipfs/pin" - provider "github.com/ipfs/go-ipfs/provider" - repo "github.com/ipfs/go-ipfs/repo" + "github.com/ipfs/go-ipfs/p2p" + "github.com/ipfs/go-ipfs/pin" + "github.com/ipfs/go-ipfs/provider" + "github.com/ipfs/go-ipfs/repo" - bitswap "github.com/ipfs/go-bitswap" - bsnet "github.com/ipfs/go-bitswap/network" bserv "github.com/ipfs/go-blockservice" - cid "github.com/ipfs/go-cid" - ds "github.com/ipfs/go-datastore" bstore "github.com/ipfs/go-ipfs-blockstore" - config "github.com/ipfs/go-ipfs-config" exchange "github.com/ipfs/go-ipfs-exchange-interface" - nilrouting "github.com/ipfs/go-ipfs-routing/none" - u "github.com/ipfs/go-ipfs-util" ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log" - merkledag "github.com/ipfs/go-merkledag" - mfs "github.com/ipfs/go-mfs" - resolver "github.com/ipfs/go-path/resolver" - ft "github.com/ipfs/go-unixfs" - goprocess "github.com/jbenet/goprocess" - libp2p "github.com/libp2p/go-libp2p" + "github.com/ipfs/go-mfs" + "github.com/ipfs/go-path/resolver" + "github.com/jbenet/goprocess" autonat "github.com/libp2p/go-libp2p-autonat-svc" - circuit "github.com/libp2p/go-libp2p-circuit" - connmgr "github.com/libp2p/go-libp2p-connmgr" ic "github.com/libp2p/go-libp2p-crypto" p2phost "github.com/libp2p/go-libp2p-host" ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr" dht "github.com/libp2p/go-libp2p-kad-dht" - dhtopts "github.com/libp2p/go-libp2p-kad-dht/opts" metrics "github.com/libp2p/go-libp2p-metrics" peer "github.com/libp2p/go-libp2p-peer" pstore "github.com/libp2p/go-libp2p-peerstore" - pnet "github.com/libp2p/go-libp2p-pnet" pubsub "github.com/libp2p/go-libp2p-pubsub" psrouter "github.com/libp2p/go-libp2p-pubsub-router" - quic "github.com/libp2p/go-libp2p-quic-transport" record "github.com/libp2p/go-libp2p-record" routing "github.com/libp2p/go-libp2p-routing" - rhelpers "github.com/libp2p/go-libp2p-routing-helpers" - discovery "github.com/libp2p/go-libp2p/p2p/discovery" + "github.com/libp2p/go-libp2p/p2p/discovery" p2pbhost "github.com/libp2p/go-libp2p/p2p/host/basic" - rhost "github.com/libp2p/go-libp2p/p2p/host/routed" - identify "github.com/libp2p/go-libp2p/p2p/protocol/identify" - mafilter "github.com/libp2p/go-maddr-filter" - smux "github.com/libp2p/go-stream-muxer" - ma "github.com/multiformats/go-multiaddr" - mplex "github.com/whyrusleeping/go-smux-multiplex" - yamux "github.com/whyrusleeping/go-smux-yamux" - mamask "github.com/whyrusleeping/multiaddr-filter" + "github.com/libp2p/go-libp2p/p2p/protocol/identify" ) -const IpnsValidatorTag = "ipns" - -const kReprovideFrequency = time.Hour * 12 -const discoveryConnTimeout = time.Second * 30 -const DefaultIpnsCacheSize = 128 - var log = logging.Logger("core") func init() { @@ -100,47 +68,49 @@ type IpfsNode struct { Repo repo.Repo // Local node - Pinning pin.Pinner // the pinning manager - Mounts Mounts // current mount state, if any. - PrivateKey ic.PrivKey // the local node's private Key - PNetFingerprint []byte // fingerprint of private network + Pinning pin.Pinner // the pinning manager + Mounts Mounts `optional:"true"` // current mount state, if any. + PrivateKey ic.PrivKey // the local node's private Key + PNetFingerprint node.PNetFingerprint `optional:"true"` // fingerprint of private network // Services - Peerstore pstore.Peerstore // storage for other Peer instances + Peerstore pstore.Peerstore `optional:"true"` // storage for other Peer instances Blockstore bstore.GCBlockstore // the block store (lower level) Filestore *filestore.Filestore // the filestore blockstore - BaseBlocks bstore.Blockstore // the raw blockstore, no filestore wrapping + BaseBlocks node.BaseBlocks // the raw blockstore, no filestore wrapping GCLocker bstore.GCLocker // the locker used to protect the blockstore during gc Blocks bserv.BlockService // the block service, get/add blocks. DAG ipld.DAGService // the merkle dag service, get/add objects. Resolver *resolver.Resolver // the path resolution system - Reporter metrics.Reporter - Discovery discovery.Service + Reporter metrics.Reporter `optional:"true"` + Discovery discovery.Service `optional:"true"` FilesRoot *mfs.Root RecordValidator record.Validator // Online - PeerHost p2phost.Host // the network host (server+client) - Bootstrapper io.Closer // the periodic bootstrapper - Routing routing.IpfsRouting // the routing system. recommend ipfs-dht + PeerHost p2phost.Host `optional:"true"` // the network host (server+client) + Bootstrapper io.Closer `optional:"true"` // the periodic bootstrapper + Routing routing.IpfsRouting `optional:"true"` // the routing system. recommend ipfs-dht Exchange exchange.Interface // the block exchange + strategy (bitswap) Namesys namesys.NameSystem // the name system, resolves paths to hashes Provider provider.Provider // the value provider system - Reprovider *rp.Reprovider // the value reprovider system - IpnsRepub *ipnsrp.Republisher + Reprovider *rp.Reprovider `optional:"true"` // the value reprovider system + IpnsRepub *ipnsrp.Republisher `optional:"true"` - AutoNAT *autonat.AutoNATService - PubSub *pubsub.PubSub - PSRouter *psrouter.PubsubValueStore - DHT *dht.IpfsDHT - P2P *p2p.P2P + AutoNAT *autonat.AutoNATService `optional:"true"` + PubSub *pubsub.PubSub `optional:"true"` + PSRouter *psrouter.PubsubValueStore `optional:"true"` + DHT *dht.IpfsDHT `optional:"true"` + P2P *p2p.P2P `optional:"true"` - proc goprocess.Process - ctx context.Context + Process goprocess.Process + ctx context.Context + + app *fx.App // Flags - IsOnline bool // Online is set when networking is enabled. - IsDaemon bool // Daemon is set when running on a long-running daemon. + IsOnline bool `optional:"true"` // Online is set when networking is enabled. + IsDaemon bool `optional:"true"` // Daemon is set when running on a long-running daemon. } // Mounts defines what the node's mount state is. This should @@ -151,508 +121,9 @@ type Mounts struct { Ipns mount.Mount } -func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub, ipnsps, mplex bool) error { - if n.PeerHost != nil { // already online. - return errors.New("node already online") - } - - if n.PrivateKey == nil { - return fmt.Errorf("private key not available") - } - - // get undialable addrs from config - cfg, err := n.Repo.Config() - if err != nil { - return err - } - - var libp2pOpts []libp2p.Option - for _, s := range cfg.Swarm.AddrFilters { - f, err := mamask.NewMask(s) - if err != nil { - return fmt.Errorf("incorrectly formatted address filter in config: %s", s) - } - libp2pOpts = append(libp2pOpts, libp2p.FilterAddresses(f)) - } - - if !cfg.Swarm.DisableBandwidthMetrics { - // Set reporter - n.Reporter = metrics.NewBandwidthCounter() - libp2pOpts = append(libp2pOpts, libp2p.BandwidthReporter(n.Reporter)) - } - - swarmkey, err := n.Repo.SwarmKey() - if err != nil { - return err - } - - if swarmkey != nil { - protec, err := pnet.NewProtector(bytes.NewReader(swarmkey)) - if err != nil { - return fmt.Errorf("failed to configure private network: %s", err) - } - n.PNetFingerprint = protec.Fingerprint() - go func() { - t := time.NewTicker(30 * time.Second) - <-t.C // swallow one tick - for { - select { - case <-t.C: - if ph := n.PeerHost; ph != nil { - if len(ph.Network().Peers()) == 0 { - log.Warning("We are in private network and have no peers.") - log.Warning("This might be configuration mistake.") - } - } - case <-n.Process().Closing(): - t.Stop() - return - } - } - }() - - libp2pOpts = append(libp2pOpts, libp2p.PrivateNetwork(protec)) - } - - addrsFactory, err := makeAddrsFactory(cfg.Addresses) - if err != nil { - return err - } - if !cfg.Swarm.DisableRelay { - addrsFactory = composeAddrsFactory(addrsFactory, filterRelayAddrs) - } - libp2pOpts = append(libp2pOpts, libp2p.AddrsFactory(addrsFactory)) - - connm, err := constructConnMgr(cfg.Swarm.ConnMgr) - if err != nil { - return err - } - libp2pOpts = append(libp2pOpts, libp2p.ConnectionManager(connm)) - - libp2pOpts = append(libp2pOpts, makeSmuxTransportOption(mplex)) - - if !cfg.Swarm.DisableNatPortMap { - libp2pOpts = append(libp2pOpts, libp2p.NATPortMap()) - } - - // disable the default listen addrs - libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs) - - if cfg.Swarm.DisableRelay { - // Enabled by default. - libp2pOpts = append(libp2pOpts, libp2p.DisableRelay()) - } else { - relayOpts := []circuit.RelayOpt{circuit.OptDiscovery} - if cfg.Swarm.EnableRelayHop { - relayOpts = append(relayOpts, circuit.OptHop) - } - libp2pOpts = append(libp2pOpts, libp2p.EnableRelay(relayOpts...)) - } - - // explicitly enable the default transports - libp2pOpts = append(libp2pOpts, libp2p.DefaultTransports) - - if cfg.Experimental.QUIC { - libp2pOpts = append(libp2pOpts, libp2p.Transport(quic.NewTransport)) - } - - // enable routing - libp2pOpts = append(libp2pOpts, libp2p.Routing(func(h p2phost.Host) (routing.PeerRouting, error) { - r, err := routingOption(ctx, h, n.Repo.Datastore(), n.RecordValidator) - n.Routing = r - return r, err - })) - - // enable autorelay - if cfg.Swarm.EnableAutoRelay { - libp2pOpts = append(libp2pOpts, libp2p.EnableAutoRelay()) - } - - peerhost, err := hostOption(ctx, n.Identity, n.Peerstore, libp2pOpts...) - - if err != nil { - return err - } - - n.PeerHost = peerhost - - if err := n.startOnlineServicesWithHost(ctx, routingOption, pubsub, ipnsps); err != nil { - return err - } - - // Ok, now we're ready to listen. - if err := startListening(n.PeerHost, cfg); err != nil { - return err - } - - n.P2P = p2p.NewP2P(n.Identity, n.PeerHost, n.Peerstore) - - // setup local discovery - if do != nil { - service, err := do(ctx, n.PeerHost) - if err != nil { - log.Error("mdns error: ", err) - } else { - service.RegisterNotifee(n) - n.Discovery = service - } - } - - return n.Bootstrap(DefaultBootstrapConfig) -} - -func constructConnMgr(cfg config.ConnMgr) (ifconnmgr.ConnManager, error) { - switch cfg.Type { - case "": - // 'default' value is the basic connection manager - return connmgr.NewConnManager(config.DefaultConnMgrLowWater, config.DefaultConnMgrHighWater, config.DefaultConnMgrGracePeriod), nil - case "none": - return nil, nil - case "basic": - grace, err := time.ParseDuration(cfg.GracePeriod) - if err != nil { - return nil, fmt.Errorf("parsing Swarm.ConnMgr.GracePeriod: %s", err) - } - - return connmgr.NewConnManager(cfg.LowWater, cfg.HighWater, grace), nil - default: - return nil, fmt.Errorf("unrecognized ConnMgr.Type: %q", cfg.Type) - } -} - -func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error { - cfg, err := n.Repo.Config() - if err != nil { - return err - } - - // Provider - - n.Provider.Run() - - // Reprovider - - var keyProvider rp.KeyChanFunc - - switch cfg.Reprovider.Strategy { - case "all": - fallthrough - case "": - keyProvider = rp.NewBlockstoreProvider(n.Blockstore) - case "roots": - keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, true) - case "pinned": - keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, false) - default: - return fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy) - } - n.Reprovider = rp.NewReprovider(ctx, n.Routing, keyProvider) - - reproviderInterval := kReprovideFrequency - if cfg.Reprovider.Interval != "" { - dur, err := time.ParseDuration(cfg.Reprovider.Interval) - if err != nil { - return err - } - - reproviderInterval = dur - } - - go n.Reprovider.Run(reproviderInterval) - - return nil -} - -func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) { - var annAddrs []ma.Multiaddr - for _, addr := range cfg.Announce { - maddr, err := ma.NewMultiaddr(addr) - if err != nil { - return nil, err - } - annAddrs = append(annAddrs, maddr) - } - - filters := mafilter.NewFilters() - noAnnAddrs := map[string]bool{} - for _, addr := range cfg.NoAnnounce { - f, err := mamask.NewMask(addr) - if err == nil { - filters.AddDialFilter(f) - continue - } - maddr, err := ma.NewMultiaddr(addr) - if err != nil { - return nil, err - } - noAnnAddrs[maddr.String()] = true - } - - return func(allAddrs []ma.Multiaddr) []ma.Multiaddr { - var addrs []ma.Multiaddr - if len(annAddrs) > 0 { - addrs = annAddrs - } else { - addrs = allAddrs - } - - var out []ma.Multiaddr - for _, maddr := range addrs { - // check for exact matches - ok := noAnnAddrs[maddr.String()] - // check for /ipcidr matches - if !ok && !filters.AddrBlocked(maddr) { - out = append(out, maddr) - } - } - return out - }, nil -} - -func makeSmuxTransportOption(mplexExp bool) libp2p.Option { - const yamuxID = "/yamux/1.0.0" - const mplexID = "/mplex/6.7.0" - - ymxtpt := &yamux.Transport{ - AcceptBacklog: 512, - ConnectionWriteTimeout: time.Second * 10, - KeepAliveInterval: time.Second * 30, - EnableKeepAlive: true, - MaxStreamWindowSize: uint32(16 * 1024 * 1024), // 16MiB - LogOutput: ioutil.Discard, - } - - if os.Getenv("YAMUX_DEBUG") != "" { - ymxtpt.LogOutput = os.Stderr - } - - muxers := map[string]smux.Transport{yamuxID: ymxtpt} - if mplexExp { - muxers[mplexID] = mplex.DefaultTransport - } - - // Allow muxer preference order overriding - order := []string{yamuxID, mplexID} - if prefs := os.Getenv("LIBP2P_MUX_PREFS"); prefs != "" { - order = strings.Fields(prefs) - } - - opts := make([]libp2p.Option, 0, len(order)) - for _, id := range order { - tpt, ok := muxers[id] - if !ok { - log.Warning("unknown or duplicate muxer in LIBP2P_MUX_PREFS: %s", id) - continue - } - delete(muxers, id) - opts = append(opts, libp2p.Muxer(id, tpt)) - } - - return libp2p.ChainOptions(opts...) -} - -func setupDiscoveryOption(d config.Discovery) DiscoveryOption { - if d.MDNS.Enabled { - return func(ctx context.Context, h p2phost.Host) (discovery.Service, error) { - if d.MDNS.Interval == 0 { - d.MDNS.Interval = 5 - } - return discovery.NewMdnsService(ctx, h, time.Duration(d.MDNS.Interval)*time.Second, discovery.ServiceTag) - } - } - return nil -} - -// HandlePeerFound attempts to connect to peer from `PeerInfo`, if it fails -// logs a warning log. -func (n *IpfsNode) HandlePeerFound(p pstore.PeerInfo) { - log.Warning("trying peer info: ", p) - ctx, cancel := context.WithTimeout(n.Context(), discoveryConnTimeout) - defer cancel() - if err := n.PeerHost.Connect(ctx, p); err != nil { - log.Warning("Failed to connect to peer found by discovery: ", err) - } -} - -// startOnlineServicesWithHost is the set of services which need to be -// initialized with the host and _before_ we start listening. -func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, routingOption RoutingOption, enablePubsub bool, enableIpnsps bool) error { - cfg, err := n.Repo.Config() - if err != nil { - return err - } - - if cfg.Swarm.EnableAutoNATService { - var opts []libp2p.Option - if cfg.Experimental.QUIC { - opts = append(opts, libp2p.DefaultTransports, libp2p.Transport(quic.NewTransport)) - } - - svc, err := autonat.NewAutoNATService(ctx, n.PeerHost, opts...) - if err != nil { - return err - } - n.AutoNAT = svc - } - - if enablePubsub || enableIpnsps { - var service *pubsub.PubSub - - var pubsubOptions []pubsub.Option - if cfg.Pubsub.DisableSigning { - pubsubOptions = append(pubsubOptions, pubsub.WithMessageSigning(false)) - } - - if cfg.Pubsub.StrictSignatureVerification { - pubsubOptions = append(pubsubOptions, pubsub.WithStrictSignatureVerification(true)) - } - - switch cfg.Pubsub.Router { - case "": - fallthrough - case "floodsub": - service, err = pubsub.NewFloodSub(ctx, n.PeerHost, pubsubOptions...) - - case "gossipsub": - service, err = pubsub.NewGossipSub(ctx, n.PeerHost, pubsubOptions...) - - default: - err = fmt.Errorf("Unknown pubsub router %s", cfg.Pubsub.Router) - } - - if err != nil { - return err - } - n.PubSub = service - } - - // this code is necessary just for tests: mock network constructions - // ignore the libp2p constructor options that actually construct the routing! - if n.Routing == nil { - r, err := routingOption(ctx, n.PeerHost, n.Repo.Datastore(), n.RecordValidator) - if err != nil { - return err - } - n.Routing = r - n.PeerHost = rhost.Wrap(n.PeerHost, n.Routing) - } - - // TODO: I'm not a fan of type assertions like this but the - // `RoutingOption` system doesn't currently provide access to the - // IpfsNode. - // - // Ideally, we'd do something like: - // - // 1. Add some fancy method to introspect into tiered routers to extract - // things like the pubsub router or the DHT (complicated, messy, - // probably not worth it). - // 2. Pass the IpfsNode into the RoutingOption (would also remove the - // PSRouter case below. - // 3. Introduce some kind of service manager? (my personal favorite but - // that requires a fair amount of work). - if dht, ok := n.Routing.(*dht.IpfsDHT); ok { - n.DHT = dht - } - - if enableIpnsps { - n.PSRouter = psrouter.NewPubsubValueStore( - ctx, - n.PeerHost, - n.Routing, - n.PubSub, - n.RecordValidator, - ) - n.Routing = rhelpers.Tiered{ - Routers: []routing.IpfsRouting{ - // Always check pubsub first. - &rhelpers.Compose{ - ValueStore: &rhelpers.LimitedValueStore{ - ValueStore: n.PSRouter, - Namespaces: []string{"ipns"}, - }, - }, - n.Routing, - }, - Validator: n.RecordValidator, - } - } - - // setup exchange service - bitswapNetwork := bsnet.NewFromIpfsHost(n.PeerHost, n.Routing) - n.Exchange = bitswap.New(ctx, bitswapNetwork, n.Blockstore) - - size, err := n.getCacheSize() - if err != nil { - return err - } - - // setup name system - n.Namesys = namesys.NewNameSystem(n.Routing, n.Repo.Datastore(), size) - - // setup ipns republishing - return n.setupIpnsRepublisher() -} - -// getCacheSize returns cache life and cache size -func (n *IpfsNode) getCacheSize() (int, error) { - cfg, err := n.Repo.Config() - if err != nil { - return 0, err - } - - cs := cfg.Ipns.ResolveCacheSize - if cs == 0 { - cs = DefaultIpnsCacheSize - } - if cs < 0 { - return 0, fmt.Errorf("cannot specify negative resolve cache size") - } - return cs, nil -} - -func (n *IpfsNode) setupIpnsRepublisher() error { - cfg, err := n.Repo.Config() - if err != nil { - return err - } - - n.IpnsRepub = ipnsrp.NewRepublisher(n.Namesys, n.Repo.Datastore(), n.PrivateKey, n.Repo.Keystore()) - - if cfg.Ipns.RepublishPeriod != "" { - d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod) - if err != nil { - return fmt.Errorf("failure to parse config setting IPNS.RepublishPeriod: %s", err) - } - - if !u.Debug && (d < time.Minute || d > (time.Hour*24)) { - return fmt.Errorf("config setting IPNS.RepublishPeriod is not between 1min and 1day: %s", d) - } - - n.IpnsRepub.Interval = d - } - - if cfg.Ipns.RecordLifetime != "" { - d, err := time.ParseDuration(cfg.Ipns.RecordLifetime) - if err != nil { - return fmt.Errorf("failure to parse config setting IPNS.RecordLifetime: %s", err) - } - - n.IpnsRepub.RecordLifetime = d - } - - n.Process().Go(n.IpnsRepub.Run) - - return nil -} - -// Process returns the Process object -func (n *IpfsNode) Process() goprocess.Process { - return n.proc -} - -// Close calls Close() on the Process object +// Close calls Close() on the App object func (n *IpfsNode) Close() error { - return n.proc.Close() + return n.app.Stop(n.ctx) } // Context returns the IpfsNode context @@ -663,70 +134,8 @@ func (n *IpfsNode) Context() context.Context { return n.ctx } -// teardown closes owned children. If any errors occur, this function returns -// the first error. -func (n *IpfsNode) teardown() error { - log.Debug("core is shutting down...") - // owned objects are closed in this teardown to ensure that they're closed - // regardless of which constructor was used to add them to the node. - var closers []io.Closer - - // NOTE: The order that objects are added(closed) matters, if an object - // needs to use another during its shutdown/cleanup process, it should be - // closed before that other object - - if n.Provider != nil { - closers = append(closers, n.Provider) - } - - if n.FilesRoot != nil { - closers = append(closers, n.FilesRoot) - } - - if n.Exchange != nil { - closers = append(closers, n.Exchange) - } - - if n.Mounts.Ipfs != nil && !n.Mounts.Ipfs.IsActive() { - closers = append(closers, mount.Closer(n.Mounts.Ipfs)) - } - if n.Mounts.Ipns != nil && !n.Mounts.Ipns.IsActive() { - closers = append(closers, mount.Closer(n.Mounts.Ipns)) - } - - if n.DHT != nil { - closers = append(closers, n.DHT.Process()) - } - - if n.Blocks != nil { - closers = append(closers, n.Blocks) - } - - if n.Bootstrapper != nil { - closers = append(closers, n.Bootstrapper) - } - - if n.PeerHost != nil { - closers = append(closers, n.PeerHost) - } - - // Repo closed last, most things need to preserve state here - closers = append(closers, n.Repo) - - var errs []error - for _, closer := range closers { - if err := closer.Close(); err != nil { - errs = append(errs, err) - } - } - if len(errs) > 0 { - return errs[0] - } - return nil -} - // Bootstrap will set and call the IpfsNodes bootstrap function. -func (n *IpfsNode) Bootstrap(cfg BootstrapConfig) error { +func (n *IpfsNode) Bootstrap(cfg bootstrap.BootstrapConfig) error { // TODO what should return value be when in offlineMode? if n.Routing == nil { return nil @@ -750,84 +159,10 @@ func (n *IpfsNode) Bootstrap(cfg BootstrapConfig) error { } var err error - n.Bootstrapper, err = Bootstrap(n, cfg) + n.Bootstrapper, err = bootstrap.Bootstrap(n.Identity, n.PeerHost, n.Routing, cfg) return err } -func (n *IpfsNode) loadID() error { - if n.Identity != "" { - return errors.New("identity already loaded") - } - - cfg, err := n.Repo.Config() - if err != nil { - return err - } - - cid := cfg.Identity.PeerID - if cid == "" { - return errors.New("identity was not set in config (was 'ipfs init' run?)") - } - if len(cid) == 0 { - return errors.New("no peer ID in config! (was 'ipfs init' run?)") - } - - id, err := peer.IDB58Decode(cid) - if err != nil { - return fmt.Errorf("peer ID invalid: %s", err) - } - - n.Identity = id - return nil -} - -// GetKey will return a key from the Keystore with name `name`. -func (n *IpfsNode) GetKey(name string) (ic.PrivKey, error) { - if name == "self" { - if n.PrivateKey == nil { - return nil, fmt.Errorf("private key not available") - } - return n.PrivateKey, nil - } else { - return n.Repo.Keystore().Get(name) - } -} - -// loadPrivateKey loads the private key *if* available -func (n *IpfsNode) loadPrivateKey() error { - if n.Identity == "" || n.Peerstore == nil { - return errors.New("loaded private key out of order") - } - - if n.PrivateKey != nil { - log.Warning("private key already loaded") - return nil - } - - cfg, err := n.Repo.Config() - if err != nil { - return err - } - - if cfg.Identity.PrivKey == "" { - return nil - } - - sk, err := loadPrivateKey(&cfg.Identity, n.Identity) - if err != nil { - return err - } - - if err := n.Peerstore.AddPrivKey(n.Identity, sk); err != nil { - return err - } - if err := n.Peerstore.AddPubKey(n.Identity, sk.GetPublic()); err != nil { - return err - } - n.PrivateKey = sk - return nil -} - func (n *IpfsNode) loadBootstrapPeers() ([]pstore.PeerInfo, error) { cfg, err := n.Repo.Config() if err != nil { @@ -838,84 +173,7 @@ func (n *IpfsNode) loadBootstrapPeers() ([]pstore.PeerInfo, error) { if err != nil { return nil, err } - return toPeerInfos(parsed), nil -} - -func (n *IpfsNode) loadFilesRoot() error { - dsk := ds.NewKey("/local/filesroot") - pf := func(ctx context.Context, c cid.Cid) error { - return n.Repo.Datastore().Put(dsk, c.Bytes()) - } - - var nd *merkledag.ProtoNode - val, err := n.Repo.Datastore().Get(dsk) - - switch { - case err == ds.ErrNotFound || val == nil: - nd = ft.EmptyDirNode() - err := n.DAG.Add(n.Context(), nd) - if err != nil { - return fmt.Errorf("failure writing to dagstore: %s", err) - } - case err == nil: - c, err := cid.Cast(val) - if err != nil { - return err - } - - rnd, err := n.DAG.Get(n.Context(), c) - if err != nil { - return fmt.Errorf("error loading filesroot from DAG: %s", err) - } - - pbnd, ok := rnd.(*merkledag.ProtoNode) - if !ok { - return merkledag.ErrNotProtobuf - } - - nd = pbnd - default: - return err - } - - mr, err := mfs.NewRoot(n.Context(), n.DAG, nd, pf) - if err != nil { - return err - } - - n.FilesRoot = mr - return nil -} - -func loadPrivateKey(cfg *config.Identity, id peer.ID) (ic.PrivKey, error) { - sk, err := cfg.DecodePrivateKey("passphrase todo!") - if err != nil { - return nil, err - } - - id2, err := peer.IDFromPrivateKey(sk) - if err != nil { - return nil, err - } - - if id2 != id { - return nil, fmt.Errorf("private key in config does not match id: %s != %s", id, id2) - } - - return sk, nil -} - -func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) { - var listen []ma.Multiaddr - for _, addr := range cfg.Addresses.Swarm { - maddr, err := ma.NewMultiaddr(addr) - if err != nil { - return nil, fmt.Errorf("failure to parse config.Addresses.Swarm: %s", cfg.Addresses.Swarm) - } - listen = append(listen, maddr) - } - - return listen, nil + return bootstrap.Peers.ToPeerInfos(parsed), nil } type ConstructPeerHostOpts struct { @@ -925,81 +183,3 @@ type ConstructPeerHostOpts struct { EnableRelayHop bool ConnectionManager ifconnmgr.ConnManager } - -type HostOption func(ctx context.Context, id peer.ID, ps pstore.Peerstore, options ...libp2p.Option) (p2phost.Host, error) - -var DefaultHostOption HostOption = constructPeerHost - -// isolates the complex initialization steps -func constructPeerHost(ctx context.Context, id peer.ID, ps pstore.Peerstore, options ...libp2p.Option) (p2phost.Host, error) { - pkey := ps.PrivKey(id) - if pkey == nil { - return nil, fmt.Errorf("missing private key for node ID: %s", id.Pretty()) - } - options = append([]libp2p.Option{libp2p.Identity(pkey), libp2p.Peerstore(ps)}, options...) - return libp2p.New(ctx, options...) -} - -func filterRelayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { - var raddrs []ma.Multiaddr - for _, addr := range addrs { - _, err := addr.ValueForProtocol(circuit.P_CIRCUIT) - if err == nil { - continue - } - raddrs = append(raddrs, addr) - } - return raddrs -} - -func composeAddrsFactory(f, g p2pbhost.AddrsFactory) p2pbhost.AddrsFactory { - return func(addrs []ma.Multiaddr) []ma.Multiaddr { - return f(g(addrs)) - } -} - -// startListening on the network addresses -func startListening(host p2phost.Host, cfg *config.Config) error { - listenAddrs, err := listenAddresses(cfg) - if err != nil { - return err - } - - // Actually start listening: - if err := host.Network().Listen(listenAddrs...); err != nil { - return err - } - - // list out our addresses - addrs, err := host.Network().InterfaceListenAddresses() - if err != nil { - return err - } - log.Infof("Swarm listening at: %s", addrs) - return nil -} - -func constructDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching, validator record.Validator) (routing.IpfsRouting, error) { - return dht.New( - ctx, host, - dhtopts.Datastore(dstore), - dhtopts.Validator(validator), - ) -} - -func constructClientDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching, validator record.Validator) (routing.IpfsRouting, error) { - return dht.New( - ctx, host, - dhtopts.Client(true), - dhtopts.Datastore(dstore), - dhtopts.Validator(validator), - ) -} - -type RoutingOption func(context.Context, p2phost.Host, ds.Batching, record.Validator) (routing.IpfsRouting, error) - -type DiscoveryOption func(context.Context, p2phost.Host) (discovery.Service, error) - -var DHTOption RoutingOption = constructDHTRouting -var DHTClientOption RoutingOption = constructClientDHTRouting -var NilRouterOption RoutingOption = nilrouting.ConstructNilRouting diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index f22803f92..eaf870ec8 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/ipfs/go-ipfs/core" + "github.com/ipfs/go-ipfs/core/node" "github.com/ipfs/go-ipfs/namesys" "github.com/ipfs/go-ipfs/pin" "github.com/ipfs/go-ipfs/provider" @@ -207,7 +208,7 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e cs := cfg.Ipns.ResolveCacheSize if cs == 0 { - cs = core.DefaultIpnsCacheSize + cs = node.DefaultIpnsCacheSize } if cs < 0 { return nil, fmt.Errorf("cannot specify negative resolve cache size") diff --git a/core/coreapi/path.go b/core/coreapi/path.go index 4c7837f36..314d1b5fd 100644 --- a/core/coreapi/path.go +++ b/core/coreapi/path.go @@ -5,7 +5,7 @@ import ( "fmt" gopath "path" - "github.com/ipfs/go-ipfs/core" + "github.com/ipfs/go-ipfs/namesys/resolve" "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" @@ -42,8 +42,8 @@ func (api *CoreAPI) ResolvePath(ctx context.Context, p path.Path) (path.Resolved } ipath := ipfspath.Path(p.String()) - ipath, err := core.ResolveIPNS(ctx, api.namesys, ipath) - if err == core.ErrNoNamesys { + ipath, err := resolve.ResolveIPNS(ctx, api.namesys, ipath) + if err == resolve.ErrNoNamesys { return nil, coreiface.ErrOffline } else if err != nil { return nil, err diff --git a/core/coreapi/test/api_test.go b/core/coreapi/test/api_test.go index 9ad164d8c..23b8b6289 100644 --- a/core/coreapi/test/api_test.go +++ b/core/coreapi/test/api_test.go @@ -8,6 +8,7 @@ import ( "path/filepath" "testing" + "github.com/ipfs/go-ipfs/core/bootstrap" "github.com/ipfs/go-ipfs/filestore" "github.com/ipfs/go-ipfs/core" @@ -101,7 +102,7 @@ func (NodeProvider) MakeAPISwarm(ctx context.Context, fullIdentity bool, n int) return nil, err } - bsinf := core.BootstrapConfigWithPeers( + bsinf := bootstrap.BootstrapConfigWithPeers( []pstore.PeerInfo{ nodes[0].Peerstore.PeerInfo(nodes[0].Identity), }, diff --git a/core/corehttp/corehttp.go b/core/corehttp/corehttp.go index 330e8e9c2..c52bea8f5 100644 --- a/core/corehttp/corehttp.go +++ b/core/corehttp/corehttp.go @@ -85,7 +85,7 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error } select { - case <-node.Process().Closing(): + case <-node.Process.Closing(): return fmt.Errorf("failed to start server, process closing") default: } @@ -95,7 +95,7 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error } var serverError error - serverProc := node.Process().Go(func(p goprocess.Process) { + serverProc := node.Process.Go(func(p goprocess.Process) { serverError = server.Serve(lis) }) @@ -103,7 +103,7 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error select { case <-serverProc.Closed(): // if node being closed before server exits, close server - case <-node.Process().Closing(): + case <-node.Process.Closing(): log.Infof("server at %s terminating...", addr) warnProc := periodicproc.Tick(5*time.Second, func(_ goprocess.Process) { diff --git a/core/corehttp/gateway_handler.go b/core/corehttp/gateway_handler.go index cdbcce594..72566930b 100644 --- a/core/corehttp/gateway_handler.go +++ b/core/corehttp/gateway_handler.go @@ -14,6 +14,7 @@ import ( "github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/dagutils" + "github.com/ipfs/go-ipfs/namesys/resolve" "github.com/dustin/go-humanize" "github.com/ipfs/go-cid" @@ -423,7 +424,7 @@ func (i *gatewayHandler) putHandler(w http.ResponseWriter, r *http.Request) { } var newcid cid.Cid - rnode, err := core.Resolve(ctx, i.node.Namesys, i.node.Resolver, rootPath) + rnode, err := resolve.Resolve(ctx, i.node.Namesys, i.node.Resolver, rootPath) switch ev := err.(type) { case resolver.ErrNoLink: // ev.Node < node where resolve failed diff --git a/core/mock/mock.go b/core/mock/mock.go index dc47917aa..e759d2010 100644 --- a/core/mock/mock.go +++ b/core/mock/mock.go @@ -5,6 +5,7 @@ import ( commands "github.com/ipfs/go-ipfs/commands" core "github.com/ipfs/go-ipfs/core" + "github.com/ipfs/go-ipfs/core/node" "github.com/ipfs/go-ipfs/repo" datastore "github.com/ipfs/go-datastore" @@ -29,7 +30,7 @@ func NewMockNode() (*core.IpfsNode, error) { }) } -func MockHostOption(mn mocknet.Mocknet) core.HostOption { +func MockHostOption(mn mocknet.Mocknet) node.HostOption { return func(ctx context.Context, id peer.ID, ps pstore.Peerstore, _ ...libp2p.Option) (host.Host, error) { return mn.AddPeerWithPeerstore(id, ps) } diff --git a/core/node/builder.go b/core/node/builder.go new file mode 100644 index 000000000..e83fa764e --- /dev/null +++ b/core/node/builder.go @@ -0,0 +1,143 @@ +package node + +import ( + "context" + "crypto/rand" + "encoding/base64" + "errors" + + "go.uber.org/fx" + + "github.com/ipfs/go-ipfs/repo" + + ds "github.com/ipfs/go-datastore" + dsync "github.com/ipfs/go-datastore/sync" + cfg "github.com/ipfs/go-ipfs-config" + ci "github.com/libp2p/go-libp2p-crypto" + peer "github.com/libp2p/go-libp2p-peer" +) + +type BuildCfg struct { + // If online is set, the node will have networking enabled + Online bool + + // ExtraOpts is a map of extra options used to configure the ipfs nodes creation + ExtraOpts map[string]bool + + // If permanent then node should run more expensive processes + // that will improve performance in long run + Permanent bool + + // DisableEncryptedConnections disables connection encryption *entirely*. + // DO NOT SET THIS UNLESS YOU'RE TESTING. + DisableEncryptedConnections bool + + // If NilRepo is set, a Repo backed by a nil datastore will be constructed + NilRepo bool + + Routing RoutingOption + Host HostOption + Repo repo.Repo +} + +func (cfg *BuildCfg) getOpt(key string) bool { + if cfg.ExtraOpts == nil { + return false + } + + return cfg.ExtraOpts[key] +} + +func (cfg *BuildCfg) fillDefaults() error { + if cfg.Repo != nil && cfg.NilRepo { + return errors.New("cannot set a Repo and specify nilrepo at the same time") + } + + if cfg.Repo == nil { + var d ds.Datastore + if cfg.NilRepo { + d = ds.NewNullDatastore() + } else { + d = ds.NewMapDatastore() + } + r, err := defaultRepo(dsync.MutexWrap(d)) + if err != nil { + return err + } + cfg.Repo = r + } + + if cfg.Routing == nil { + cfg.Routing = DHTOption + } + + if cfg.Host == nil { + cfg.Host = DefaultHostOption + } + + return nil +} + +func (cfg *BuildCfg) options(ctx context.Context) fx.Option { + err := cfg.fillDefaults() + if err != nil { + return fx.Error(err) + } + + repoOption := fx.Provide(func(lc fx.Lifecycle) repo.Repo { + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return cfg.Repo.Close() + }, + }) + + return cfg.Repo + }) + + metricsCtx := fx.Provide(func() MetricsCtx { + return MetricsCtx(ctx) + }) + + hostOption := fx.Provide(func() HostOption { + return cfg.Host + }) + + routingOption := fx.Provide(func() RoutingOption { + return cfg.Routing + }) + + return fx.Options( + repoOption, + hostOption, + routingOption, + metricsCtx, + ) +} + +func defaultRepo(dstore repo.Datastore) (repo.Repo, error) { + c := cfg.Config{} + priv, pub, err := ci.GenerateKeyPairWithReader(ci.RSA, 1024, rand.Reader) + if err != nil { + return nil, err + } + + pid, err := peer.IDFromPublicKey(pub) + if err != nil { + return nil, err + } + + privkeyb, err := priv.Bytes() + if err != nil { + return nil, err + } + + c.Bootstrap = cfg.DefaultBootstrapAddresses + c.Addresses.Swarm = []string{"/ip4/0.0.0.0/tcp/4001"} + c.Identity.PeerID = pid.Pretty() + c.Identity.PrivKey = base64.StdEncoding.EncodeToString(privkeyb) + + return &repo.Mock{ + D: dstore, + C: c, + }, nil +} diff --git a/core/node/core.go b/core/node/core.go new file mode 100644 index 000000000..7bba419e8 --- /dev/null +++ b/core/node/core.go @@ -0,0 +1,117 @@ +package node + +import ( + "context" + "fmt" + + "github.com/ipfs/go-ipfs/pin" + "github.com/ipfs/go-ipfs/repo" + + "github.com/ipfs/go-bitswap" + "github.com/ipfs/go-bitswap/network" + "github.com/ipfs/go-blockservice" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-ipfs-blockstore" + "github.com/ipfs/go-ipfs-exchange-interface" + "github.com/ipfs/go-ipfs-exchange-offline" + "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-merkledag" + "github.com/ipfs/go-mfs" + "github.com/ipfs/go-unixfs" + "github.com/libp2p/go-libp2p-host" + "github.com/libp2p/go-libp2p-routing" + "go.uber.org/fx" +) + +func BlockServiceCtor(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService { + bsvc := blockservice.New(bs, rem) + + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return bsvc.Close() + }, + }) + + return bsvc +} + +func Pinning(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo) (pin.Pinner, error) { + internalDag := merkledag.NewDAGService(blockservice.New(bstore, offline.Exchange(bstore))) + pinning, err := pin.LoadPinner(repo.Datastore(), ds, internalDag) + if err != nil { + // TODO: we should move towards only running 'NewPinner' explicitly on + // node init instead of implicitly here as a result of the pinner keys + // not being found in the datastore. + // this is kinda sketchy and could cause data loss + pinning = pin.NewPinner(repo.Datastore(), ds, internalDag) + } + + return pinning, nil +} + +func DagCtor(bs blockservice.BlockService) format.DAGService { + return merkledag.NewDAGService(bs) +} + +func OnlineExchangeCtor(mctx MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.IpfsRouting, bs blockstore.GCBlockstore) exchange.Interface { + bitswapNetwork := network.NewFromIpfsHost(host, rt) + exch := bitswap.New(lifecycleCtx(mctx, lc), bitswapNetwork, bs) + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return exch.Close() + }, + }) + return exch +} + +func Files(mctx MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, error) { + dsk := datastore.NewKey("/local/filesroot") + pf := func(ctx context.Context, c cid.Cid) error { + return repo.Datastore().Put(dsk, c.Bytes()) + } + + var nd *merkledag.ProtoNode + val, err := repo.Datastore().Get(dsk) + ctx := lifecycleCtx(mctx, lc) + + switch { + case err == datastore.ErrNotFound || val == nil: + nd = unixfs.EmptyDirNode() + err := dag.Add(ctx, nd) + if err != nil { + return nil, fmt.Errorf("failure writing to dagstore: %s", err) + } + case err == nil: + c, err := cid.Cast(val) + if err != nil { + return nil, err + } + + rnd, err := dag.Get(ctx, c) + if err != nil { + return nil, fmt.Errorf("error loading filesroot from DAG: %s", err) + } + + pbnd, ok := rnd.(*merkledag.ProtoNode) + if !ok { + return nil, merkledag.ErrNotProtobuf + } + + nd = pbnd + default: + return nil, err + } + + root, err := mfs.NewRoot(ctx, dag, nd, pf) + + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return root.Close() + }, + }) + + return root, err +} + +type MetricsCtx context.Context diff --git a/core/node/discovery.go b/core/node/discovery.go new file mode 100644 index 000000000..f51e7593d --- /dev/null +++ b/core/node/discovery.go @@ -0,0 +1,51 @@ +package node + +import ( + "context" + "time" + + "github.com/ipfs/go-ipfs-config" + "github.com/libp2p/go-libp2p-host" + "github.com/libp2p/go-libp2p-peerstore" + "github.com/libp2p/go-libp2p/p2p/discovery" + "go.uber.org/fx" +) + +const discoveryConnTimeout = time.Second * 30 + +type discoveryHandler struct { + ctx context.Context + host host.Host +} + +func (dh *discoveryHandler) HandlePeerFound(p peerstore.PeerInfo) { + log.Warning("trying peer info: ", p) + ctx, cancel := context.WithTimeout(dh.ctx, discoveryConnTimeout) + defer cancel() + if err := dh.host.Connect(ctx, p); err != nil { + log.Warning("Failed to connect to peer found by discovery: ", err) + } +} + +func NewDiscoveryHandler(mctx MetricsCtx, lc fx.Lifecycle, host host.Host) *discoveryHandler { + return &discoveryHandler{ + ctx: lifecycleCtx(mctx, lc), + host: host, + } +} + +func SetupDiscovery(mctx MetricsCtx, lc fx.Lifecycle, cfg *config.Config, host host.Host, handler *discoveryHandler) error { + if cfg.Discovery.MDNS.Enabled { + mdns := cfg.Discovery.MDNS + if mdns.Interval == 0 { + mdns.Interval = 5 + } + service, err := discovery.NewMdnsService(lifecycleCtx(mctx, lc), host, time.Duration(mdns.Interval)*time.Second, discovery.ServiceTag) + if err != nil { + log.Error("mdns error: ", err) + return nil + } + service.RegisterNotifee(handler) + } + return nil +} diff --git a/core/node/groups.go b/core/node/groups.go new file mode 100644 index 000000000..45d91471a --- /dev/null +++ b/core/node/groups.go @@ -0,0 +1,134 @@ +package node + +import ( + "context" + + "github.com/ipfs/go-ipfs/p2p" + "github.com/ipfs/go-ipfs/provider" + + offline "github.com/ipfs/go-ipfs-exchange-offline" + offroute "github.com/ipfs/go-ipfs-routing/offline" + "github.com/ipfs/go-path/resolver" + "go.uber.org/fx" +) + +var BaseLibP2P = fx.Options( + fx.Provide(P2PAddrFilters), + fx.Provide(P2PBandwidthCounter), + fx.Provide(P2PPNet), + fx.Provide(P2PAddrsFactory), + fx.Provide(P2PConnectionManager), + fx.Provide(P2PNatPortMap), + fx.Provide(P2PRelay), + fx.Provide(P2PAutoRealy), + fx.Provide(P2PDefaultTransports), + fx.Provide(P2PQUIC), + + fx.Provide(P2PHost), + + fx.Provide(NewDiscoveryHandler), + + fx.Invoke(AutoNATService), + fx.Invoke(P2PPNetChecker), + fx.Invoke(StartListening), + fx.Invoke(SetupDiscovery), +) + +func LibP2P(cfg *BuildCfg) fx.Option { + opts := fx.Options( + BaseLibP2P, + + maybeProvide(P2PNoSecurity, cfg.DisableEncryptedConnections), + maybeProvide(Pubsub, cfg.getOpt("pubsub") || cfg.getOpt("ipnsps")), + + fx.Provide(P2PSmuxTransport(cfg.getOpt("mplex"))), + fx.Provide(P2PRouting), + fx.Provide(P2PBaseRouting), + maybeProvide(P2PPubsubRouter, cfg.getOpt("ipnsps")), + ) + + return opts +} + +func Storage(cfg *BuildCfg) fx.Option { + return fx.Options( + fx.Provide(RepoConfig), + fx.Provide(DatastoreCtor), + fx.Provide(BaseBlockstoreCtor(cfg.Permanent, cfg.NilRepo)), + fx.Provide(GcBlockstoreCtor), + ) +} + +var Identity = fx.Options( + fx.Provide(PeerID), + fx.Provide(PrivateKey), + fx.Provide(Peerstore), +) + +var IPNS = fx.Options( + fx.Provide(RecordValidator), +) + +var Providers = fx.Options( + fx.Provide(ProviderQueue), + fx.Provide(ProviderCtor), + fx.Provide(ReproviderCtor), + + fx.Invoke(Reprovider), +) + +func Online(cfg *BuildCfg) fx.Option { + return fx.Options( + fx.Provide(OnlineExchangeCtor), + fx.Provide(OnlineNamesysCtor), + + fx.Invoke(IpnsRepublisher), + + fx.Provide(p2p.NewP2P), + + LibP2P(cfg), + Providers, + ) +} + +var Offline = fx.Options( + fx.Provide(offline.Exchange), + fx.Provide(OfflineNamesysCtor), + fx.Provide(offroute.NewOfflineRouter), + fx.Provide(provider.NewOfflineProvider), +) + +var Core = fx.Options( + fx.Provide(BlockServiceCtor), + fx.Provide(DagCtor), + fx.Provide(resolver.NewBasicResolver), + fx.Provide(Pinning), + fx.Provide(Files), +) + +func Networked(cfg *BuildCfg) fx.Option { + if cfg.Online { + return Online(cfg) + } + return Offline +} + +func IPFS(ctx context.Context, cfg *BuildCfg) fx.Option { + if cfg == nil { + cfg = new(BuildCfg) + } + + return fx.Options( + cfg.options(ctx), + + fx.Provide(baseProcess), + fx.Invoke(setupSharding), + + Storage(cfg), + Identity, + IPNS, + Networked(cfg), + + Core, + ) +} diff --git a/core/node/helpers.go b/core/node/helpers.go new file mode 100644 index 000000000..4fced7fc6 --- /dev/null +++ b/core/node/helpers.go @@ -0,0 +1,76 @@ +package node + +import ( + "context" + + config "github.com/ipfs/go-ipfs-config" + uio "github.com/ipfs/go-unixfs/io" + "github.com/jbenet/goprocess" + "github.com/pkg/errors" + "go.uber.org/fx" +) + +// lifecycleCtx creates a context which will be cancelled when lifecycle stops +// +// This is a hack which we need because most of our services use contexts in a +// wrong way +func lifecycleCtx(mctx MetricsCtx, lc fx.Lifecycle) context.Context { + ctx, cancel := context.WithCancel(mctx) + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + cancel() + return nil + }, + }) + return ctx +} + +type lcProcess struct { + fx.In + + LC fx.Lifecycle + Proc goprocess.Process +} + +// Append wraps ProcessFunc into a goprocess, and appends it to the lifecycle +func (lp *lcProcess) Append(f goprocess.ProcessFunc) { + // Hooks are guaranteed to run in sequence. If a hook fails to start, its + // OnStop won't be executed. + var proc goprocess.Process + + lp.LC.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + proc = lp.Proc.Go(f) + return nil + }, + OnStop: func(ctx context.Context) error { + if proc == nil { // Theoretically this shouldn't ever happen + return errors.New("lcProcess: proc was nil") + } + + return proc.Close() // todo: respect ctx, somehow + }, + }) +} + +func maybeProvide(opt interface{}, enable bool) fx.Option { + if enable { + return fx.Provide(opt) + } + return fx.Options() +} + +func setupSharding(cfg *config.Config) { + // TEMP: setting global sharding switch here + uio.UseHAMTSharding = cfg.Experimental.ShardingEnabled +} + +func baseProcess(lc fx.Lifecycle) goprocess.Process { + p := goprocess.WithParent(goprocess.Background()) + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + return p.Close() + }, + }) + return p +} diff --git a/core/node/identity.go b/core/node/identity.go new file mode 100644 index 000000000..eb3903098 --- /dev/null +++ b/core/node/identity.go @@ -0,0 +1,48 @@ +package node + +import ( + "errors" + "fmt" + + "github.com/ipfs/go-ipfs-config" + "github.com/libp2p/go-libp2p-crypto" + "github.com/libp2p/go-libp2p-peer" +) + +func PeerID(cfg *config.Config) (peer.ID, error) { + cid := cfg.Identity.PeerID + if cid == "" { + return "", errors.New("identity was not set in config (was 'ipfs init' run?)") + } + if len(cid) == 0 { + return "", errors.New("no peer ID in config! (was 'ipfs init' run?)") + } + + id, err := peer.IDB58Decode(cid) + if err != nil { + return "", fmt.Errorf("peer ID invalid: %s", err) + } + + return id, nil +} + +func PrivateKey(cfg *config.Config, id peer.ID) (crypto.PrivKey, error) { + if cfg.Identity.PrivKey == "" { + return nil, nil + } + + sk, err := cfg.Identity.DecodePrivateKey("passphrase todo!") + if err != nil { + return nil, err + } + + id2, err := peer.IDFromPrivateKey(sk) + if err != nil { + return nil, err + } + + if id2 != id { + return nil, fmt.Errorf("private key in config does not match id: %s != %s", id, id2) + } + return sk, nil +} diff --git a/core/node/ipns.go b/core/node/ipns.go new file mode 100644 index 000000000..afd6e678d --- /dev/null +++ b/core/node/ipns.go @@ -0,0 +1,71 @@ +package node + +import ( + "fmt" + "time" + + "github.com/ipfs/go-ipfs-config" + "github.com/ipfs/go-ipfs-util" + "github.com/ipfs/go-ipns" + "github.com/libp2p/go-libp2p-crypto" + "github.com/libp2p/go-libp2p-peerstore" + "github.com/libp2p/go-libp2p-record" + "github.com/libp2p/go-libp2p-routing" + + "github.com/ipfs/go-ipfs/namesys" + "github.com/ipfs/go-ipfs/namesys/republisher" + "github.com/ipfs/go-ipfs/repo" +) + +const DefaultIpnsCacheSize = 128 + +func RecordValidator(ps peerstore.Peerstore) record.Validator { + return record.NamespacedValidator{ + "pk": record.PublicKeyValidator{}, + "ipns": ipns.Validator{KeyBook: ps}, + } +} + +func OfflineNamesysCtor(rt routing.IpfsRouting, repo repo.Repo) (namesys.NameSystem, error) { + return namesys.NewNameSystem(rt, repo.Datastore(), 0), nil +} + +func OnlineNamesysCtor(rt routing.IpfsRouting, repo repo.Repo, cfg *config.Config) (namesys.NameSystem, error) { + cs := cfg.Ipns.ResolveCacheSize + if cs == 0 { + cs = DefaultIpnsCacheSize + } + if cs < 0 { + return nil, fmt.Errorf("cannot specify negative resolve cache size") + } + return namesys.NewNameSystem(rt, repo.Datastore(), cs), nil +} + +func IpnsRepublisher(lc lcProcess, cfg *config.Config, namesys namesys.NameSystem, repo repo.Repo, privKey crypto.PrivKey) error { + repub := republisher.NewRepublisher(namesys, repo.Datastore(), privKey, repo.Keystore()) + + if cfg.Ipns.RepublishPeriod != "" { + d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod) + if err != nil { + return fmt.Errorf("failure to parse config setting IPNS.RepublishPeriod: %s", err) + } + + if !util.Debug && (d < time.Minute || d > (time.Hour*24)) { + return fmt.Errorf("config setting IPNS.RepublishPeriod is not between 1min and 1day: %s", d) + } + + repub.Interval = d + } + + if cfg.Ipns.RecordLifetime != "" { + d, err := time.ParseDuration(cfg.Ipns.RecordLifetime) + if err != nil { + return fmt.Errorf("failure to parse config setting IPNS.RecordLifetime: %s", err) + } + + repub.RecordLifetime = d + } + + lc.Append(repub.Run) + return nil +} diff --git a/core/node/libp2p.go b/core/node/libp2p.go new file mode 100644 index 000000000..be1b4861a --- /dev/null +++ b/core/node/libp2p.go @@ -0,0 +1,578 @@ +package node + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "os" + "sort" + "strings" + "time" + + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-ipfs-config" + nilrouting "github.com/ipfs/go-ipfs-routing/none" + logging "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-autonat-svc" + "github.com/libp2p/go-libp2p-circuit" + "github.com/libp2p/go-libp2p-connmgr" + "github.com/libp2p/go-libp2p-crypto" + "github.com/libp2p/go-libp2p-host" + "github.com/libp2p/go-libp2p-kad-dht" + dhtopts "github.com/libp2p/go-libp2p-kad-dht/opts" + "github.com/libp2p/go-libp2p-metrics" + "github.com/libp2p/go-libp2p-peer" + "github.com/libp2p/go-libp2p-peerstore" + "github.com/libp2p/go-libp2p-peerstore/pstoremem" + "github.com/libp2p/go-libp2p-pnet" + "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p-pubsub-router" + "github.com/libp2p/go-libp2p-quic-transport" + "github.com/libp2p/go-libp2p-record" + "github.com/libp2p/go-libp2p-routing" + "github.com/libp2p/go-libp2p-routing-helpers" + p2pbhost "github.com/libp2p/go-libp2p/p2p/host/basic" + "github.com/libp2p/go-libp2p/p2p/host/routed" + mafilter "github.com/libp2p/go-maddr-filter" + smux "github.com/libp2p/go-stream-muxer" + ma "github.com/multiformats/go-multiaddr" + mplex "github.com/whyrusleeping/go-smux-multiplex" + yamux "github.com/whyrusleeping/go-smux-yamux" + mamask "github.com/whyrusleeping/multiaddr-filter" + "go.uber.org/fx" + + "github.com/ipfs/go-ipfs/repo" +) + +var log = logging.Logger("node") + +type HostOption func(ctx context.Context, id peer.ID, ps peerstore.Peerstore, options ...libp2p.Option) (host.Host, error) +type RoutingOption func(context.Context, host.Host, datastore.Batching, record.Validator) (routing.IpfsRouting, error) + +var DefaultHostOption HostOption = constructPeerHost + +// isolates the complex initialization steps +func constructPeerHost(ctx context.Context, id peer.ID, ps peerstore.Peerstore, options ...libp2p.Option) (host.Host, error) { + pkey := ps.PrivKey(id) + if pkey == nil { + return nil, fmt.Errorf("missing private key for node ID: %s", id.Pretty()) + } + options = append([]libp2p.Option{libp2p.Identity(pkey), libp2p.Peerstore(ps)}, options...) + return libp2p.New(ctx, options...) +} + +func constructDHTRouting(ctx context.Context, host host.Host, dstore datastore.Batching, validator record.Validator) (routing.IpfsRouting, error) { + return dht.New( + ctx, host, + dhtopts.Datastore(dstore), + dhtopts.Validator(validator), + ) +} + +func constructClientDHTRouting(ctx context.Context, host host.Host, dstore datastore.Batching, validator record.Validator) (routing.IpfsRouting, error) { + return dht.New( + ctx, host, + dhtopts.Client(true), + dhtopts.Datastore(dstore), + dhtopts.Validator(validator), + ) +} + +var DHTOption RoutingOption = constructDHTRouting +var DHTClientOption RoutingOption = constructClientDHTRouting +var NilRouterOption RoutingOption = nilrouting.ConstructNilRouting + +func Peerstore(id peer.ID, sk crypto.PrivKey) peerstore.Peerstore { + ps := pstoremem.NewPeerstore() + + if sk != nil { + ps.AddPrivKey(id, sk) + ps.AddPubKey(id, sk.GetPublic()) + } + + return ps +} + +func P2PAddrFilters(cfg *config.Config) (opts Libp2pOpts, err error) { + for _, s := range cfg.Swarm.AddrFilters { + f, err := mamask.NewMask(s) + if err != nil { + return opts, fmt.Errorf("incorrectly formatted address filter in config: %s", s) + } + opts.Opts = append(opts.Opts, libp2p.FilterAddresses(f)) + } + return opts, nil +} + +func P2PBandwidthCounter(cfg *config.Config) (opts Libp2pOpts, reporter metrics.Reporter) { + reporter = metrics.NewBandwidthCounter() + + if !cfg.Swarm.DisableBandwidthMetrics { + opts.Opts = append(opts.Opts, libp2p.BandwidthReporter(reporter)) + } + return opts, reporter +} + +type Libp2pOpts struct { + fx.Out + + Opts []libp2p.Option `group:"libp2p"` +} + +type PNetFingerprint []byte + +func P2PPNet(repo repo.Repo) (opts Libp2pOpts, fp PNetFingerprint, err error) { + swarmkey, err := repo.SwarmKey() + if err != nil || swarmkey == nil { + return opts, nil, err + } + + protec, err := pnet.NewProtector(bytes.NewReader(swarmkey)) + if err != nil { + return opts, nil, fmt.Errorf("failed to configure private network: %s", err) + } + fp = protec.Fingerprint() + + opts.Opts = append(opts.Opts, libp2p.PrivateNetwork(protec)) + return opts, fp, nil +} + +func P2PPNetChecker(repo repo.Repo, ph host.Host, lc fx.Lifecycle) error { + // TODO: better check? + swarmkey, err := repo.SwarmKey() + if err != nil || swarmkey == nil { + return err + } + + done := make(chan struct{}) + lc.Append(fx.Hook{ + OnStart: func(_ context.Context) error { + go func() { + t := time.NewTicker(30 * time.Second) + defer t.Stop() + + <-t.C // swallow one tick + for { + select { + case <-t.C: + if len(ph.Network().Peers()) == 0 { + log.Warning("We are in private network and have no peers.") + log.Warning("This might be configuration mistake.") + } + case <-done: + return + } + } + }() + return nil + }, + OnStop: func(_ context.Context) error { + close(done) + return nil + }, + }) + return nil +} + +func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) { + var annAddrs []ma.Multiaddr + for _, addr := range cfg.Announce { + maddr, err := ma.NewMultiaddr(addr) + if err != nil { + return nil, err + } + annAddrs = append(annAddrs, maddr) + } + + filters := mafilter.NewFilters() + noAnnAddrs := map[string]bool{} + for _, addr := range cfg.NoAnnounce { + f, err := mamask.NewMask(addr) + if err == nil { + filters.AddDialFilter(f) + continue + } + maddr, err := ma.NewMultiaddr(addr) + if err != nil { + return nil, err + } + noAnnAddrs[maddr.String()] = true + } + + return func(allAddrs []ma.Multiaddr) []ma.Multiaddr { + var addrs []ma.Multiaddr + if len(annAddrs) > 0 { + addrs = annAddrs + } else { + addrs = allAddrs + } + + var out []ma.Multiaddr + for _, maddr := range addrs { + // check for exact matches + ok := noAnnAddrs[maddr.String()] + // check for /ipcidr matches + if !ok && !filters.AddrBlocked(maddr) { + out = append(out, maddr) + } + } + return out + }, nil +} + +func P2PAddrsFactory(cfg *config.Config) (opts Libp2pOpts, err error) { + addrsFactory, err := makeAddrsFactory(cfg.Addresses) + if err != nil { + return opts, err + } + opts.Opts = append(opts.Opts, libp2p.AddrsFactory(addrsFactory)) + return +} + +func P2PConnectionManager(cfg *config.Config) (opts Libp2pOpts, err error) { + grace := config.DefaultConnMgrGracePeriod + low := config.DefaultConnMgrHighWater + high := config.DefaultConnMgrHighWater + + switch cfg.Swarm.ConnMgr.Type { + case "": + // 'default' value is the basic connection manager + return + case "none": + return opts, nil + case "basic": + grace, err = time.ParseDuration(cfg.Swarm.ConnMgr.GracePeriod) + if err != nil { + return opts, fmt.Errorf("parsing Swarm.ConnMgr.GracePeriod: %s", err) + } + + low = cfg.Swarm.ConnMgr.LowWater + high = cfg.Swarm.ConnMgr.HighWater + default: + return opts, fmt.Errorf("unrecognized ConnMgr.Type: %q", cfg.Swarm.ConnMgr.Type) + } + + cm := connmgr.NewConnManager(low, high, grace) + opts.Opts = append(opts.Opts, libp2p.ConnectionManager(cm)) + return +} + +func makeSmuxTransportOption(mplexExp bool) libp2p.Option { + const yamuxID = "/yamux/1.0.0" + const mplexID = "/mplex/6.7.0" + + ymxtpt := &yamux.Transport{ + AcceptBacklog: 512, + ConnectionWriteTimeout: time.Second * 10, + KeepAliveInterval: time.Second * 30, + EnableKeepAlive: true, + MaxStreamWindowSize: uint32(16 * 1024 * 1024), // 16MiB + LogOutput: ioutil.Discard, + } + + if os.Getenv("YAMUX_DEBUG") != "" { + ymxtpt.LogOutput = os.Stderr + } + + muxers := map[string]smux.Transport{yamuxID: ymxtpt} + if mplexExp { + muxers[mplexID] = mplex.DefaultTransport + } + + // Allow muxer preference order overriding + order := []string{yamuxID, mplexID} + if prefs := os.Getenv("LIBP2P_MUX_PREFS"); prefs != "" { + order = strings.Fields(prefs) + } + + opts := make([]libp2p.Option, 0, len(order)) + for _, id := range order { + tpt, ok := muxers[id] + if !ok { + log.Warning("unknown or duplicate muxer in LIBP2P_MUX_PREFS: %s", id) + continue + } + delete(muxers, id) + opts = append(opts, libp2p.Muxer(id, tpt)) + } + + return libp2p.ChainOptions(opts...) +} + +func P2PSmuxTransport(mplex bool) func() (opts Libp2pOpts, err error) { + return func() (opts Libp2pOpts, err error) { + opts.Opts = append(opts.Opts, makeSmuxTransportOption(mplex)) + return + } +} + +func P2PNatPortMap(cfg *config.Config) (opts Libp2pOpts, err error) { + if !cfg.Swarm.DisableNatPortMap { + opts.Opts = append(opts.Opts, libp2p.NATPortMap()) + } + return +} + +func P2PRelay(cfg *config.Config) (opts Libp2pOpts, err error) { + if cfg.Swarm.DisableRelay { + // Enabled by default. + opts.Opts = append(opts.Opts, libp2p.DisableRelay()) + } else { + relayOpts := []relay.RelayOpt{relay.OptDiscovery} + if cfg.Swarm.EnableRelayHop { + relayOpts = append(relayOpts, relay.OptHop) + } + opts.Opts = append(opts.Opts, libp2p.EnableRelay(relayOpts...)) + } + return +} + +func P2PAutoRealy(cfg *config.Config) (opts Libp2pOpts, err error) { + // enable autorelay + if cfg.Swarm.EnableAutoRelay { + opts.Opts = append(opts.Opts, libp2p.EnableAutoRelay()) + } + return +} + +func P2PDefaultTransports() (opts Libp2pOpts, err error) { + opts.Opts = append(opts.Opts, libp2p.DefaultTransports) + return +} + +func P2PQUIC(cfg *config.Config) (opts Libp2pOpts, err error) { + if cfg.Experimental.QUIC { + opts.Opts = append(opts.Opts, libp2p.Transport(libp2pquic.NewTransport)) + } + return +} + +func P2PNoSecurity() (opts Libp2pOpts) { + opts.Opts = append(opts.Opts, libp2p.NoSecurity) + // TODO: shouldn't this be Errorf to guarantee visibility? + log.Warningf(`Your IPFS node has been configured to run WITHOUT ENCRYPTED CONNECTIONS. + You will not be able to connect to any nodes configured to use encrypted connections`) + return opts +} + +type P2PHostIn struct { + fx.In + + Repo repo.Repo + Validator record.Validator + HostOption HostOption + RoutingOption RoutingOption + ID peer.ID + Peerstore peerstore.Peerstore + + Opts [][]libp2p.Option `group:"libp2p"` +} + +type BaseRouting routing.IpfsRouting +type P2PHostOut struct { + fx.Out + + Host host.Host + Routing BaseRouting +} + +func P2PHost(mctx MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHostOut, err error) { + opts := []libp2p.Option{libp2p.NoListenAddrs} + for _, o := range params.Opts { + opts = append(opts, o...) + } + + ctx := lifecycleCtx(mctx, lc) + + opts = append(opts, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { + r, err := params.RoutingOption(ctx, h, params.Repo.Datastore(), params.Validator) + out.Routing = r + return r, err + })) + + out.Host, err = params.HostOption(ctx, params.ID, params.Peerstore, opts...) + if err != nil { + return P2PHostOut{}, err + } + + // this code is necessary just for tests: mock network constructions + // ignore the libp2p constructor options that actually construct the routing! + if out.Routing == nil { + r, err := params.RoutingOption(ctx, out.Host, params.Repo.Datastore(), params.Validator) + if err != nil { + return P2PHostOut{}, err + } + out.Routing = r + out.Host = routedhost.Wrap(out.Host, out.Routing) + } + + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return out.Host.Close() + }, + }) + + return out, err +} + +type Router struct { + routing.IpfsRouting + + Priority int // less = more important +} + +type p2pRouterOut struct { + fx.Out + + Router Router `group:"routers"` +} + +func P2PBaseRouting(lc fx.Lifecycle, in BaseRouting) (out p2pRouterOut, dr *dht.IpfsDHT) { + if dht, ok := in.(*dht.IpfsDHT); ok { + dr = dht + + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return dr.Close() + }, + }) + } + + return p2pRouterOut{ + Router: Router{ + Priority: 1000, + IpfsRouting: in, + }, + }, dr +} + +type p2pOnlineRoutingIn struct { + fx.In + + Routers []Router `group:"routers"` + Validator record.Validator +} + +func P2PRouting(in p2pOnlineRoutingIn) routing.IpfsRouting { + routers := in.Routers + + sort.SliceStable(routers, func(i, j int) bool { + return routers[i].Priority < routers[j].Priority + }) + + irouters := make([]routing.IpfsRouting, len(routers)) + for i, v := range routers { + irouters[i] = v.IpfsRouting + } + + return routinghelpers.Tiered{ + Routers: irouters, + Validator: in.Validator, + } +} + +type p2pPSRoutingIn struct { + fx.In + + BaseRouting BaseRouting + Repo repo.Repo + Validator record.Validator + Host host.Host + PubSub *pubsub.PubSub `optional:"true"` +} + +func P2PPubsubRouter(mctx MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) (p2pRouterOut, *namesys.PubsubValueStore) { + psRouter := namesys.NewPubsubValueStore( + lifecycleCtx(mctx, lc), + in.Host, + in.BaseRouting, + in.PubSub, + in.Validator, + ) + + return p2pRouterOut{ + Router: Router{ + IpfsRouting: &routinghelpers.Compose{ + ValueStore: &routinghelpers.LimitedValueStore{ + ValueStore: psRouter, + Namespaces: []string{"ipns"}, + }, + }, + Priority: 100, + }, + }, psRouter +} + +func AutoNATService(mctx MetricsCtx, lc fx.Lifecycle, cfg *config.Config, host host.Host) error { + if !cfg.Swarm.EnableAutoNATService { + return nil + } + var opts []libp2p.Option + if cfg.Experimental.QUIC { + opts = append(opts, libp2p.DefaultTransports, libp2p.Transport(libp2pquic.NewTransport)) + } + + _, err := autonat.NewAutoNATService(lifecycleCtx(mctx, lc), host, opts...) + return err +} + +func Pubsub(mctx MetricsCtx, lc fx.Lifecycle, host host.Host, cfg *config.Config) (service *pubsub.PubSub, err error) { + var pubsubOptions []pubsub.Option + if cfg.Pubsub.DisableSigning { + pubsubOptions = append(pubsubOptions, pubsub.WithMessageSigning(false)) + } + + if cfg.Pubsub.StrictSignatureVerification { + pubsubOptions = append(pubsubOptions, pubsub.WithStrictSignatureVerification(true)) + } + + switch cfg.Pubsub.Router { + case "": + fallthrough + case "floodsub": + service, err = pubsub.NewFloodSub(lifecycleCtx(mctx, lc), host, pubsubOptions...) + + case "gossipsub": + service, err = pubsub.NewGossipSub(lifecycleCtx(mctx, lc), host, pubsubOptions...) + + default: + err = fmt.Errorf("Unknown pubsub router %s", cfg.Pubsub.Router) + } + + return service, err +} + +func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) { + var listen []ma.Multiaddr + for _, addr := range cfg.Addresses.Swarm { + maddr, err := ma.NewMultiaddr(addr) + if err != nil { + return nil, fmt.Errorf("failure to parse config.Addresses.Swarm: %s", cfg.Addresses.Swarm) + } + listen = append(listen, maddr) + } + + return listen, nil +} + +func StartListening(host host.Host, cfg *config.Config) error { + listenAddrs, err := listenAddresses(cfg) + if err != nil { + return err + } + + // Actually start listening: + if err := host.Network().Listen(listenAddrs...); err != nil { + return err + } + + // list out our addresses + addrs, err := host.Network().InterfaceListenAddresses() + if err != nil { + return err + } + log.Infof("Swarm listening at: %s", addrs) + return nil +} diff --git a/core/node/provider.go b/core/node/provider.go new file mode 100644 index 000000000..8518ca70e --- /dev/null +++ b/core/node/provider.go @@ -0,0 +1,72 @@ +package node + +import ( + "context" + "fmt" + "time" + + "github.com/ipfs/go-ipfs-config" + "github.com/ipfs/go-ipld-format" + "github.com/libp2p/go-libp2p-routing" + "go.uber.org/fx" + + "github.com/ipfs/go-ipfs/exchange/reprovide" + "github.com/ipfs/go-ipfs/pin" + "github.com/ipfs/go-ipfs/provider" + "github.com/ipfs/go-ipfs/repo" +) + +const kReprovideFrequency = time.Hour * 12 + +func ProviderQueue(mctx MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (*provider.Queue, error) { + return provider.NewQueue(lifecycleCtx(mctx, lc), "provider-v1", repo.Datastore()) +} + +func ProviderCtor(mctx MetricsCtx, lc fx.Lifecycle, queue *provider.Queue, rt routing.IpfsRouting) provider.Provider { + p := provider.NewProvider(lifecycleCtx(mctx, lc), queue, rt) + + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + p.Run() + return nil + }, + OnStop: func(ctx context.Context) error { + return p.Close() + }, + }) + + return p +} + +func ReproviderCtor(mctx MetricsCtx, lc fx.Lifecycle, cfg *config.Config, bs BaseBlocks, ds format.DAGService, pinning pin.Pinner, rt routing.IpfsRouting) (*reprovide.Reprovider, error) { + var keyProvider reprovide.KeyChanFunc + + switch cfg.Reprovider.Strategy { + case "all": + fallthrough + case "": + keyProvider = reprovide.NewBlockstoreProvider(bs) + case "roots": + keyProvider = reprovide.NewPinnedProvider(pinning, ds, true) + case "pinned": + keyProvider = reprovide.NewPinnedProvider(pinning, ds, false) + default: + return nil, fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy) + } + return reprovide.NewReprovider(lifecycleCtx(mctx, lc), rt, keyProvider), nil +} + +func Reprovider(cfg *config.Config, reprovider *reprovide.Reprovider) error { + reproviderInterval := kReprovideFrequency + if cfg.Reprovider.Interval != "" { + dur, err := time.ParseDuration(cfg.Reprovider.Interval) + if err != nil { + return err + } + + reproviderInterval = dur + } + + go reprovider.Run(reproviderInterval) + return nil +} diff --git a/core/node/storage.go b/core/node/storage.go new file mode 100644 index 000000000..69c912609 --- /dev/null +++ b/core/node/storage.go @@ -0,0 +1,96 @@ +package node + +import ( + "context" + "os" + "syscall" + "time" + + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/retrystore" + blockstore "github.com/ipfs/go-ipfs-blockstore" + config "github.com/ipfs/go-ipfs-config" + "go.uber.org/fx" + + "github.com/ipfs/go-ipfs/filestore" + "github.com/ipfs/go-ipfs/repo" + "github.com/ipfs/go-ipfs/thirdparty/cidv0v1" + "github.com/ipfs/go-ipfs/thirdparty/verifbs" +) + +func isTooManyFDError(err error) bool { + perr, ok := err.(*os.PathError) + if ok && perr.Err == syscall.EMFILE { + return true + } + + return false +} + +func RepoConfig(repo repo.Repo) (*config.Config, error) { + return repo.Config() +} + +func DatastoreCtor(repo repo.Repo) datastore.Datastore { + return repo.Datastore() +} + +type BaseBlocks blockstore.Blockstore + +func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx MetricsCtx, repo repo.Repo, cfg *config.Config, lc fx.Lifecycle) (bs BaseBlocks, err error) { + return func(mctx MetricsCtx, repo repo.Repo, cfg *config.Config, lc fx.Lifecycle) (bs BaseBlocks, err error) { + rds := &retrystore.Datastore{ + Batching: repo.Datastore(), + Delay: time.Millisecond * 200, + Retries: 6, + TempErrFunc: isTooManyFDError, + } + // hash security + bs = blockstore.NewBlockstore(rds) + bs = &verifbs.VerifBS{Blockstore: bs} + + opts := blockstore.DefaultCacheOpts() + opts.HasBloomFilterSize = cfg.Datastore.BloomFilterSize + if !permanent { + opts.HasBloomFilterSize = 0 + } + + if !nilRepo { + ctx, cancel := context.WithCancel(mctx) + + lc.Append(fx.Hook{ + OnStop: func(context context.Context) error { + cancel() + return nil + }, + }) + bs, err = blockstore.CachedBlockstore(ctx, bs, opts) + if err != nil { + return nil, err + } + } + + bs = blockstore.NewIdStore(bs) + bs = cidv0v1.NewBlockstore(bs) + + if cfg.Datastore.HashOnRead { // TODO: review: this is how it was done originally, is there a reason we can't just pass this directly? + bs.HashOnRead(true) + } + + return + } +} + +func GcBlockstoreCtor(repo repo.Repo, bb BaseBlocks, cfg *config.Config) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore, fstore *filestore.Filestore) { + gclocker = blockstore.NewGCLocker() + gcbs = blockstore.NewGCBlockstore(bb, gclocker) + + if cfg.Experimental.FilestoreEnabled || cfg.Experimental.UrlstoreEnabled { + // hash security + fstore = filestore.NewFilestore(bb, repo.FileManager()) // TODO: mark optional + gcbs = blockstore.NewGCBlockstore(fstore, gclocker) + gcbs = &verifbs.VerifBSGC{GCBlockstore: gcbs} + } + bs = gcbs + return +} diff --git a/fuse/ipns/ipns_unix.go b/fuse/ipns/ipns_unix.go index d9f6c6740..a662ab22e 100644 --- a/fuse/ipns/ipns_unix.go +++ b/fuse/ipns/ipns_unix.go @@ -13,6 +13,8 @@ import ( core "github.com/ipfs/go-ipfs/core" namesys "github.com/ipfs/go-ipfs/namesys" + resolve "github.com/ipfs/go-ipfs/namesys/resolve" + dag "github.com/ipfs/go-merkledag" path "github.com/ipfs/go-path" ft "github.com/ipfs/go-unixfs" @@ -96,7 +98,7 @@ func loadRoot(ctx context.Context, rt *keyRoot, ipfs *core.IpfsNode, name string return nil, err } - node, err := core.Resolve(ctx, ipfs.Namesys, ipfs.Resolver, p) + node, err := resolve.Resolve(ctx, ipfs.Namesys, ipfs.Resolver, p) switch err { case nil: case namesys.ErrResolveFailed: diff --git a/fuse/ipns/mount_unix.go b/fuse/ipns/mount_unix.go index e6b551b2d..54e62df60 100644 --- a/fuse/ipns/mount_unix.go +++ b/fuse/ipns/mount_unix.go @@ -22,5 +22,5 @@ func Mount(ipfs *core.IpfsNode, ipnsmp, ipfsmp string) (mount.Mount, error) { return nil, err } - return mount.NewMount(ipfs.Process(), fsys, ipnsmp, allow_other) + return mount.NewMount(ipfs.Process, fsys, ipnsmp, allow_other) } diff --git a/fuse/node/mount_nofuse.go b/fuse/node/mount_nofuse.go index 7f824ef3e..92f61f809 100644 --- a/fuse/node/mount_nofuse.go +++ b/fuse/node/mount_nofuse.go @@ -8,6 +8,8 @@ import ( core "github.com/ipfs/go-ipfs/core" ) +type errNeedFuseVersion error // used in tests, needed in OSX + func Mount(node *core.IpfsNode, fsdir, nsdir string) error { return errors.New("not compiled in") } diff --git a/fuse/node/mount_unix.go b/fuse/node/mount_unix.go index 8fee86947..3b9cb5c3a 100644 --- a/fuse/node/mount_unix.go +++ b/fuse/node/mount_unix.go @@ -30,6 +30,8 @@ var platformFuseChecks = func(*core.IpfsNode) error { return nil } +type errNeedFuseVersion error // used in tests, needed in OSX + func Mount(node *core.IpfsNode, fsdir, nsdir string) error { // check if we already have live mounts. // if the user said "Mount", then there must be something wrong. diff --git a/fuse/readonly/mount_unix.go b/fuse/readonly/mount_unix.go index ab7945456..656e23c49 100644 --- a/fuse/readonly/mount_unix.go +++ b/fuse/readonly/mount_unix.go @@ -16,5 +16,5 @@ func Mount(ipfs *core.IpfsNode, mountpoint string) (mount.Mount, error) { } allow_other := cfg.Mounts.FuseAllowOther fsys := NewFileSystem(ipfs) - return mount.NewMount(ipfs.Process(), fsys, mountpoint, allow_other) + return mount.NewMount(ipfs.Process, fsys, mountpoint, allow_other) } diff --git a/go.mod b/go.mod index 168172446..2c32a77f9 100644 --- a/go.mod +++ b/go.mod @@ -100,6 +100,7 @@ require ( github.com/multiformats/go-multibase v0.0.1 github.com/multiformats/go-multihash v0.0.1 github.com/opentracing/opentracing-go v1.0.2 + github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v0.9.2 github.com/syndtr/goleveldb v1.0.0 github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc @@ -108,6 +109,10 @@ require ( github.com/whyrusleeping/go-sysinfo v0.0.0-20190219211824-4a357d4b90b1 github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c + go.uber.org/atomic v1.3.2 // indirect + go.uber.org/dig v1.7.0 + go.uber.org/fx v1.9.0 + go.uber.org/multierr v1.1.0 // indirect golang.org/x/sys v0.0.0-20190302025703-b6889370fb10 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e // indirect gopkg.in/airbrake/gobrake.v2 v2.0.9 // indirect diff --git a/go.sum b/go.sum index 300dd111c..61fc30ac9 100644 --- a/go.sum +++ b/go.sum @@ -524,6 +524,14 @@ github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSv github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg= github.com/whyrusleeping/yamux v1.1.5 h1:4CK3aUUJQu0qpKZv5gEWJjNOQtdbdDhVVS6PJ+HimdE= github.com/whyrusleeping/yamux v1.1.5/go.mod h1:E8LnQQ8HKx5KD29HZFUwM1PxCOdPRzGwur1mcYhXcD8= +go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= +go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/dig v1.7.0 h1:E5/L92iQTNJTjfgJF2KgU+/JpMaiuvK2DHLBj0+kSZk= +go.uber.org/dig v1.7.0/go.mod h1:z+dSd2TP9Usi48jL8M3v63iSBVkiwtVyMKxMZYYauPg= +go.uber.org/fx v1.9.0 h1:7OAz8ucp35AU8eydejpYG7QrbE8rLKzGhHbZlJi5LYY= +go.uber.org/fx v1.9.0/go.mod h1:mFdUyAUuJ3w4jAckiKSKbldsxy1ojpAMJ+dVZg5Y0Aw= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go4.org v0.0.0-20190218023631-ce4c26f7be8e h1:m9LfARr2VIOW0vsV19kEKp/sWQvZnGobA8JHui/XJoY= go4.org v0.0.0-20190218023631-ce4c26f7be8e/go.mod h1:MkTOUMDaeVYJUOUsaDXIhWPZYa1yOyC1qaOBpL57BhE= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= diff --git a/namesys/republisher/repub_test.go b/namesys/republisher/repub_test.go index 8f0048c4c..48a0b086f 100644 --- a/namesys/republisher/repub_test.go +++ b/namesys/republisher/repub_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/ipfs/go-ipfs/core" + "github.com/ipfs/go-ipfs/core/bootstrap" mock "github.com/ipfs/go-ipfs/core/mock" namesys "github.com/ipfs/go-ipfs/namesys" . "github.com/ipfs/go-ipfs/namesys/republisher" @@ -45,7 +46,7 @@ func TestRepublish(t *testing.T) { t.Fatal(err) } - bsinf := core.BootstrapConfigWithPeers( + bsinf := bootstrap.BootstrapConfigWithPeers( []pstore.PeerInfo{ nodes[0].Peerstore.PeerInfo(nodes[0].Identity), }, diff --git a/core/pathresolver_test.go b/namesys/resolve/pathresolver_test.go similarity index 63% rename from core/pathresolver_test.go rename to namesys/resolve/pathresolver_test.go index 92318f275..fe578b5d3 100644 --- a/core/pathresolver_test.go +++ b/namesys/resolve/pathresolver_test.go @@ -1,10 +1,11 @@ -package core_test +package resolve_test import ( "testing" - core "github.com/ipfs/go-ipfs/core" coremock "github.com/ipfs/go-ipfs/core/mock" + "github.com/ipfs/go-ipfs/namesys/resolve" + path "github.com/ipfs/go-path" ) @@ -14,17 +15,17 @@ func TestResolveNoComponents(t *testing.T) { t.Fatal("Should have constructed a mock node", err) } - _, err = core.Resolve(n.Context(), n.Namesys, n.Resolver, path.Path("/ipns/")) + _, err = resolve.Resolve(n.Context(), n.Namesys, n.Resolver, path.Path("/ipns/")) if err != path.ErrNoComponents { t.Fatal("Should error with no components (/ipns/).", err) } - _, err = core.Resolve(n.Context(), n.Namesys, n.Resolver, path.Path("/ipfs/")) + _, err = resolve.Resolve(n.Context(), n.Namesys, n.Resolver, path.Path("/ipfs/")) if err != path.ErrNoComponents { t.Fatal("Should error with no components (/ipfs/).", err) } - _, err = core.Resolve(n.Context(), n.Namesys, n.Resolver, path.Path("/../..")) + _, err = resolve.Resolve(n.Context(), n.Namesys, n.Resolver, path.Path("/../..")) if err != path.ErrBadPath { t.Fatal("Should error with invalid path.", err) } diff --git a/core/pathresolver.go b/namesys/resolve/resolve.go similarity index 88% rename from core/pathresolver.go rename to namesys/resolve/resolve.go index 21c2a84be..128619c65 100644 --- a/core/pathresolver.go +++ b/namesys/resolve/resolve.go @@ -1,18 +1,20 @@ -package core +package resolve import ( "context" "errors" "strings" - namesys "github.com/ipfs/go-ipfs/namesys" - - ipld "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log" - path "github.com/ipfs/go-path" - resolver "github.com/ipfs/go-path/resolver" + "github.com/ipfs/go-path" + "github.com/ipfs/go-path/resolver" + + "github.com/ipfs/go-ipfs/namesys" ) +var log = logging.Logger("nsresolv") + // ErrNoNamesys is an explicit error for when an IPFS node doesn't // (yet) have a name system var ErrNoNamesys = errors.New( @@ -64,7 +66,7 @@ func ResolveIPNS(ctx context.Context, nsys namesys.NameSystem, p path.Path) (pat // Resolve resolves the given path by parsing out protocol-specific // entries (e.g. /ipns/) and then going through the /ipfs/ // entries and returning the final node. -func Resolve(ctx context.Context, nsys namesys.NameSystem, r *resolver.Resolver, p path.Path) (ipld.Node, error) { +func Resolve(ctx context.Context, nsys namesys.NameSystem, r *resolver.Resolver, p path.Path) (format.Node, error) { p, err := ResolveIPNS(ctx, nsys, p) if err != nil { return nil, err diff --git a/test/integration/addcat_test.go b/test/integration/addcat_test.go index b0def746d..98e6936ee 100644 --- a/test/integration/addcat_test.go +++ b/test/integration/addcat_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/ipfs/go-ipfs/core" + "github.com/ipfs/go-ipfs/core/bootstrap" "github.com/ipfs/go-ipfs/core/coreapi" mock "github.com/ipfs/go-ipfs/core/mock" "github.com/ipfs/go-ipfs/thirdparty/unit" @@ -140,10 +141,10 @@ func DirectAddCat(data []byte, conf testutil.LatencyConfig) error { bs1 := []pstore.PeerInfo{adder.Peerstore.PeerInfo(adder.Identity)} bs2 := []pstore.PeerInfo{catter.Peerstore.PeerInfo(catter.Identity)} - if err := catter.Bootstrap(core.BootstrapConfigWithPeers(bs1)); err != nil { + if err := catter.Bootstrap(bootstrap.BootstrapConfigWithPeers(bs1)); err != nil { return err } - if err := adder.Bootstrap(core.BootstrapConfigWithPeers(bs2)); err != nil { + if err := adder.Bootstrap(bootstrap.BootstrapConfigWithPeers(bs2)); err != nil { return err } diff --git a/test/integration/bench_cat_test.go b/test/integration/bench_cat_test.go index e8a2322de..a40fcfe37 100644 --- a/test/integration/bench_cat_test.go +++ b/test/integration/bench_cat_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/ipfs/go-ipfs/core" + "github.com/ipfs/go-ipfs/core/bootstrap" "github.com/ipfs/go-ipfs/core/coreapi" mock "github.com/ipfs/go-ipfs/core/mock" "github.com/ipfs/go-ipfs/thirdparty/unit" @@ -83,10 +84,10 @@ func benchCat(b *testing.B, data []byte, conf testutil.LatencyConfig) error { bs1 := []pstore.PeerInfo{adder.Peerstore.PeerInfo(adder.Identity)} bs2 := []pstore.PeerInfo{catter.Peerstore.PeerInfo(catter.Identity)} - if err := catter.Bootstrap(core.BootstrapConfigWithPeers(bs1)); err != nil { + if err := catter.Bootstrap(bootstrap.BootstrapConfigWithPeers(bs1)); err != nil { return err } - if err := adder.Bootstrap(core.BootstrapConfigWithPeers(bs2)); err != nil { + if err := adder.Bootstrap(bootstrap.BootstrapConfigWithPeers(bs2)); err != nil { return err } diff --git a/test/integration/bitswap_wo_routing_test.go b/test/integration/bitswap_wo_routing_test.go index af4406633..30b8ce30d 100644 --- a/test/integration/bitswap_wo_routing_test.go +++ b/test/integration/bitswap_wo_routing_test.go @@ -8,6 +8,7 @@ import ( "github.com/ipfs/go-block-format" "github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/core/mock" + "github.com/ipfs/go-ipfs/core/node" cid "github.com/ipfs/go-cid" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" @@ -26,7 +27,7 @@ func TestBitswapWithoutRouting(t *testing.T) { n, err := core.NewNode(ctx, &core.BuildCfg{ Online: true, Host: coremock.MockHostOption(mn), - Routing: core.NilRouterOption, // no routing + Routing: node.NilRouterOption, // no routing }) if err != nil { t.Fatal(err) diff --git a/test/integration/three_legged_cat_test.go b/test/integration/three_legged_cat_test.go index 953c7e370..1fc0e7bf2 100644 --- a/test/integration/three_legged_cat_test.go +++ b/test/integration/three_legged_cat_test.go @@ -10,6 +10,7 @@ import ( "time" core "github.com/ipfs/go-ipfs/core" + bootstrap2 "github.com/ipfs/go-ipfs/core/bootstrap" "github.com/ipfs/go-ipfs/core/coreapi" mock "github.com/ipfs/go-ipfs/core/mock" "github.com/ipfs/go-ipfs/thirdparty/unit" @@ -118,7 +119,7 @@ func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error { } bis := bootstrap.Peerstore.PeerInfo(bootstrap.PeerHost.ID()) - bcfg := core.BootstrapConfigWithPeers([]pstore.PeerInfo{bis}) + bcfg := bootstrap2.BootstrapConfigWithPeers([]pstore.PeerInfo{bis}) if err := adder.Bootstrap(bcfg); err != nil { return err } diff --git a/test/sharness/t0270-filestore.sh b/test/sharness/t0270-filestore.sh index 829562c80..200305493 100755 --- a/test/sharness/t0270-filestore.sh +++ b/test/sharness/t0270-filestore.sh @@ -74,6 +74,8 @@ init_ipfs_filestore() { grep "either the filestore or the urlstore must be enabled" add_out ' + assert_repo_size_less_than 1000000 + test_expect_success "enable urlstore config setting" ' ipfs config --json Experimental.UrlstoreEnabled true ' @@ -84,6 +86,8 @@ init_ipfs_filestore() { grep "filestore is not enabled" add_out ' + assert_repo_size_less_than 1000000 + test_expect_success "enable filestore config setting" ' ipfs config --json Experimental.UrlstoreEnabled true && ipfs config --json Experimental.FilestoreEnabled true diff --git a/thirdparty/math2/math2.go b/thirdparty/math2/math2.go deleted file mode 100644 index e8a75b5f7..000000000 --- a/thirdparty/math2/math2.go +++ /dev/null @@ -1,9 +0,0 @@ -package math2 - -// IntMin returns the smaller of x or y. -func IntMin(x, y int) int { - if x < y { - return x - } - return y -}