mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 18:37:45 +08:00
387 lines
11 KiB
Go
387 lines
11 KiB
Go
/*
|
|
Package core implements the IpfsNode object and related methods.
|
|
|
|
Packages underneath core/ provide a (relatively) stable, low-level API
|
|
to carry out most IPFS-related tasks. For more details on the other
|
|
interfaces and how core/... fits into the bigger IPFS picture, see:
|
|
|
|
$ godoc github.com/ipfs/go-ipfs
|
|
*/
|
|
package core
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
"go.uber.org/fx"
|
|
|
|
version "github.com/ipfs/go-ipfs"
|
|
rp "github.com/ipfs/go-ipfs/exchange/reprovide"
|
|
"github.com/ipfs/go-ipfs/filestore"
|
|
"github.com/ipfs/go-ipfs/fuse/mount"
|
|
"github.com/ipfs/go-ipfs/namesys"
|
|
ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher"
|
|
"github.com/ipfs/go-ipfs/p2p"
|
|
"github.com/ipfs/go-ipfs/pin"
|
|
"github.com/ipfs/go-ipfs/provider"
|
|
"github.com/ipfs/go-ipfs/repo"
|
|
|
|
bserv "github.com/ipfs/go-blockservice"
|
|
"github.com/ipfs/go-cid"
|
|
ds "github.com/ipfs/go-datastore"
|
|
bstore "github.com/ipfs/go-ipfs-blockstore"
|
|
config "github.com/ipfs/go-ipfs-config"
|
|
exchange "github.com/ipfs/go-ipfs-exchange-interface"
|
|
nilrouting "github.com/ipfs/go-ipfs-routing/none"
|
|
ipld "github.com/ipfs/go-ipld-format"
|
|
logging "github.com/ipfs/go-log"
|
|
"github.com/ipfs/go-merkledag"
|
|
"github.com/ipfs/go-mfs"
|
|
"github.com/ipfs/go-path/resolver"
|
|
ft "github.com/ipfs/go-unixfs"
|
|
"github.com/jbenet/goprocess"
|
|
"github.com/libp2p/go-libp2p"
|
|
autonat "github.com/libp2p/go-libp2p-autonat-svc"
|
|
circuit "github.com/libp2p/go-libp2p-circuit"
|
|
ic "github.com/libp2p/go-libp2p-crypto"
|
|
p2phost "github.com/libp2p/go-libp2p-host"
|
|
ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
|
|
dht "github.com/libp2p/go-libp2p-kad-dht"
|
|
dhtopts "github.com/libp2p/go-libp2p-kad-dht/opts"
|
|
metrics "github.com/libp2p/go-libp2p-metrics"
|
|
peer "github.com/libp2p/go-libp2p-peer"
|
|
pstore "github.com/libp2p/go-libp2p-peerstore"
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
psrouter "github.com/libp2p/go-libp2p-pubsub-router"
|
|
record "github.com/libp2p/go-libp2p-record"
|
|
routing "github.com/libp2p/go-libp2p-routing"
|
|
"github.com/libp2p/go-libp2p/p2p/discovery"
|
|
p2pbhost "github.com/libp2p/go-libp2p/p2p/host/basic"
|
|
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
|
|
mafilter "github.com/libp2p/go-maddr-filter"
|
|
smux "github.com/libp2p/go-stream-muxer"
|
|
ma "github.com/multiformats/go-multiaddr"
|
|
mplex "github.com/whyrusleeping/go-smux-multiplex"
|
|
yamux "github.com/whyrusleeping/go-smux-yamux"
|
|
mamask "github.com/whyrusleeping/multiaddr-filter"
|
|
)
|
|
|
|
const kReprovideFrequency = time.Hour * 12
|
|
const discoveryConnTimeout = time.Second * 30
|
|
const DefaultIpnsCacheSize = 128
|
|
|
|
var log = logging.Logger("core")
|
|
|
|
func init() {
|
|
identify.ClientVersion = "go-ipfs/" + version.CurrentVersionNumber + "/" + version.CurrentCommit
|
|
}
|
|
|
|
// IpfsNode is IPFS Core module. It represents an IPFS instance.
|
|
type IpfsNode struct {
|
|
|
|
// Self
|
|
Identity peer.ID // the local node's identity
|
|
|
|
Repo repo.Repo
|
|
|
|
// Local node
|
|
Pinning pin.Pinner // the pinning manager
|
|
Mounts Mounts `optional:"true"` // current mount state, if any.
|
|
PrivateKey ic.PrivKey // the local node's private Key
|
|
PNetFingerprint PNetFingerprint `optional:"true"` // fingerprint of private network
|
|
|
|
// Services
|
|
Peerstore pstore.Peerstore `optional:"true"` // storage for other Peer instances
|
|
Blockstore bstore.GCBlockstore // the block store (lower level)
|
|
Filestore *filestore.Filestore // the filestore blockstore
|
|
BaseBlocks BaseBlocks // the raw blockstore, no filestore wrapping
|
|
GCLocker bstore.GCLocker // the locker used to protect the blockstore during gc
|
|
Blocks bserv.BlockService // the block service, get/add blocks.
|
|
DAG ipld.DAGService // the merkle dag service, get/add objects.
|
|
Resolver *resolver.Resolver // the path resolution system
|
|
Reporter metrics.Reporter `optional:"true"`
|
|
Discovery discovery.Service `optional:"true"`
|
|
FilesRoot *mfs.Root
|
|
RecordValidator record.Validator
|
|
|
|
// Online
|
|
PeerHost p2phost.Host `optional:"true"` // the network host (server+client)
|
|
Bootstrapper io.Closer `optional:"true"` // the periodic bootstrapper
|
|
Routing routing.IpfsRouting `optional:"true"` // the routing system. recommend ipfs-dht
|
|
Exchange exchange.Interface // the block exchange + strategy (bitswap)
|
|
Namesys namesys.NameSystem // the name system, resolves paths to hashes
|
|
Provider provider.Provider // the value provider system
|
|
Reprovider *rp.Reprovider `optional:"true"` // the value reprovider system
|
|
IpnsRepub *ipnsrp.Republisher `optional:"true"`
|
|
|
|
AutoNAT *autonat.AutoNATService `optional:"true"`
|
|
PubSub *pubsub.PubSub `optional:"true"`
|
|
PSRouter *psrouter.PubsubValueStore `optional:"true"`
|
|
DHT *dht.IpfsDHT `optional:"true"`
|
|
P2P *p2p.P2P `optional:"true"`
|
|
|
|
Process goprocess.Process
|
|
ctx context.Context
|
|
|
|
app *fx.App
|
|
|
|
// Flags
|
|
IsOnline bool `optional:"true"` // Online is set when networking is enabled.
|
|
IsDaemon bool `optional:"true"` // Daemon is set when running on a long-running daemon.
|
|
}
|
|
|
|
// Mounts defines what the node's mount state is. This should
|
|
// perhaps be moved to the daemon or mount. It's here because
|
|
// it needs to be accessible across daemon requests.
|
|
type Mounts struct {
|
|
Ipfs mount.Mount
|
|
Ipns mount.Mount
|
|
}
|
|
|
|
func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) {
|
|
var annAddrs []ma.Multiaddr
|
|
for _, addr := range cfg.Announce {
|
|
maddr, err := ma.NewMultiaddr(addr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
annAddrs = append(annAddrs, maddr)
|
|
}
|
|
|
|
filters := mafilter.NewFilters()
|
|
noAnnAddrs := map[string]bool{}
|
|
for _, addr := range cfg.NoAnnounce {
|
|
f, err := mamask.NewMask(addr)
|
|
if err == nil {
|
|
filters.AddDialFilter(f)
|
|
continue
|
|
}
|
|
maddr, err := ma.NewMultiaddr(addr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
noAnnAddrs[maddr.String()] = true
|
|
}
|
|
|
|
return func(allAddrs []ma.Multiaddr) []ma.Multiaddr {
|
|
var addrs []ma.Multiaddr
|
|
if len(annAddrs) > 0 {
|
|
addrs = annAddrs
|
|
} else {
|
|
addrs = allAddrs
|
|
}
|
|
|
|
var out []ma.Multiaddr
|
|
for _, maddr := range addrs {
|
|
// check for exact matches
|
|
ok := noAnnAddrs[maddr.String()]
|
|
// check for /ipcidr matches
|
|
if !ok && !filters.AddrBlocked(maddr) {
|
|
out = append(out, maddr)
|
|
}
|
|
}
|
|
return out
|
|
}, nil
|
|
}
|
|
|
|
func makeSmuxTransportOption(mplexExp bool) libp2p.Option {
|
|
const yamuxID = "/yamux/1.0.0"
|
|
const mplexID = "/mplex/6.7.0"
|
|
|
|
ymxtpt := &yamux.Transport{
|
|
AcceptBacklog: 512,
|
|
ConnectionWriteTimeout: time.Second * 10,
|
|
KeepAliveInterval: time.Second * 30,
|
|
EnableKeepAlive: true,
|
|
MaxStreamWindowSize: uint32(16 * 1024 * 1024), // 16MiB
|
|
LogOutput: ioutil.Discard,
|
|
}
|
|
|
|
if os.Getenv("YAMUX_DEBUG") != "" {
|
|
ymxtpt.LogOutput = os.Stderr
|
|
}
|
|
|
|
muxers := map[string]smux.Transport{yamuxID: ymxtpt}
|
|
if mplexExp {
|
|
muxers[mplexID] = mplex.DefaultTransport
|
|
}
|
|
|
|
// Allow muxer preference order overriding
|
|
order := []string{yamuxID, mplexID}
|
|
if prefs := os.Getenv("LIBP2P_MUX_PREFS"); prefs != "" {
|
|
order = strings.Fields(prefs)
|
|
}
|
|
|
|
opts := make([]libp2p.Option, 0, len(order))
|
|
for _, id := range order {
|
|
tpt, ok := muxers[id]
|
|
if !ok {
|
|
log.Warning("unknown or duplicate muxer in LIBP2P_MUX_PREFS: %s", id)
|
|
continue
|
|
}
|
|
delete(muxers, id)
|
|
opts = append(opts, libp2p.Muxer(id, tpt))
|
|
}
|
|
|
|
return libp2p.ChainOptions(opts...)
|
|
}
|
|
|
|
// Close calls Close() on the App object
|
|
func (n *IpfsNode) Close() error {
|
|
return n.app.Stop(n.ctx)
|
|
}
|
|
|
|
// Context returns the IpfsNode context
|
|
func (n *IpfsNode) Context() context.Context {
|
|
if n.ctx == nil {
|
|
n.ctx = context.TODO()
|
|
}
|
|
return n.ctx
|
|
}
|
|
|
|
// Bootstrap will set and call the IpfsNodes bootstrap function.
|
|
func (n *IpfsNode) Bootstrap(cfg BootstrapConfig) error {
|
|
// TODO what should return value be when in offlineMode?
|
|
if n.Routing == nil {
|
|
return nil
|
|
}
|
|
|
|
if n.Bootstrapper != nil {
|
|
n.Bootstrapper.Close() // stop previous bootstrap process.
|
|
}
|
|
|
|
// if the caller did not specify a bootstrap peer function, get the
|
|
// freshest bootstrap peers from config. this responds to live changes.
|
|
if cfg.BootstrapPeers == nil {
|
|
cfg.BootstrapPeers = func() []pstore.PeerInfo {
|
|
ps, err := n.loadBootstrapPeers()
|
|
if err != nil {
|
|
log.Warning("failed to parse bootstrap peers from config")
|
|
return nil
|
|
}
|
|
return ps
|
|
}
|
|
}
|
|
|
|
var err error
|
|
n.Bootstrapper, err = Bootstrap(n, cfg)
|
|
return err
|
|
}
|
|
|
|
func (n *IpfsNode) loadBootstrapPeers() ([]pstore.PeerInfo, error) {
|
|
cfg, err := n.Repo.Config()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
parsed, err := cfg.BootstrapPeers()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return toPeerInfos(parsed), nil
|
|
}
|
|
|
|
func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) {
|
|
var listen []ma.Multiaddr
|
|
for _, addr := range cfg.Addresses.Swarm {
|
|
maddr, err := ma.NewMultiaddr(addr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failure to parse config.Addresses.Swarm: %s", cfg.Addresses.Swarm)
|
|
}
|
|
listen = append(listen, maddr)
|
|
}
|
|
|
|
return listen, nil
|
|
}
|
|
|
|
type ConstructPeerHostOpts struct {
|
|
AddrsFactory p2pbhost.AddrsFactory
|
|
DisableNatPortMap bool
|
|
DisableRelay bool
|
|
EnableRelayHop bool
|
|
ConnectionManager ifconnmgr.ConnManager
|
|
}
|
|
|
|
type HostOption func(ctx context.Context, id peer.ID, ps pstore.Peerstore, options ...libp2p.Option) (p2phost.Host, error)
|
|
|
|
var DefaultHostOption HostOption = constructPeerHost
|
|
|
|
// isolates the complex initialization steps
|
|
func constructPeerHost(ctx context.Context, id peer.ID, ps pstore.Peerstore, options ...libp2p.Option) (p2phost.Host, error) {
|
|
pkey := ps.PrivKey(id)
|
|
if pkey == nil {
|
|
return nil, fmt.Errorf("missing private key for node ID: %s", id.Pretty())
|
|
}
|
|
options = append([]libp2p.Option{libp2p.Identity(pkey), libp2p.Peerstore(ps)}, options...)
|
|
return libp2p.New(ctx, options...)
|
|
}
|
|
|
|
func filterRelayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
|
|
var raddrs []ma.Multiaddr
|
|
for _, addr := range addrs {
|
|
_, err := addr.ValueForProtocol(circuit.P_CIRCUIT)
|
|
if err == nil {
|
|
continue
|
|
}
|
|
raddrs = append(raddrs, addr)
|
|
}
|
|
return raddrs
|
|
}
|
|
|
|
func composeAddrsFactory(f, g p2pbhost.AddrsFactory) p2pbhost.AddrsFactory {
|
|
return func(addrs []ma.Multiaddr) []ma.Multiaddr {
|
|
return f(g(addrs))
|
|
}
|
|
}
|
|
|
|
// startListening on the network addresses
|
|
func startListening(host p2phost.Host, cfg *config.Config) error {
|
|
listenAddrs, err := listenAddresses(cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Actually start listening:
|
|
if err := host.Network().Listen(listenAddrs...); err != nil {
|
|
return err
|
|
}
|
|
|
|
// list out our addresses
|
|
addrs, err := host.Network().InterfaceListenAddresses()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.Infof("Swarm listening at: %s", addrs)
|
|
return nil
|
|
}
|
|
|
|
func constructDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching, validator record.Validator) (routing.IpfsRouting, error) {
|
|
return dht.New(
|
|
ctx, host,
|
|
dhtopts.Datastore(dstore),
|
|
dhtopts.Validator(validator),
|
|
)
|
|
}
|
|
|
|
func constructClientDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching, validator record.Validator) (routing.IpfsRouting, error) {
|
|
return dht.New(
|
|
ctx, host,
|
|
dhtopts.Client(true),
|
|
dhtopts.Datastore(dstore),
|
|
dhtopts.Validator(validator),
|
|
)
|
|
}
|
|
|
|
type RoutingOption func(context.Context, p2phost.Host, ds.Batching, record.Validator) (routing.IpfsRouting, error)
|
|
|
|
type DiscoveryOption func(context.Context, p2phost.Host) (discovery.Service, error)
|
|
|
|
var DHTOption RoutingOption = constructDHTRouting
|
|
var DHTClientOption RoutingOption = constructClientDHTRouting
|
|
var NilRouterOption RoutingOption = nilrouting.ConstructNilRouting
|