mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 10:27:46 +08:00
Merge 3e1580afb0 into c5776476bc
This commit is contained in:
commit
f8cf05829c
67
docs/examples/dep-injection-multiple-nodes/README.md
Normal file
67
docs/examples/dep-injection-multiple-nodes/README.md
Normal file
@ -0,0 +1,67 @@
|
||||
# Manual dependency injection and spawning multiple nodes
|
||||
|
||||
|
||||
By the end of this tutorial, you will learn how to:
|
||||
|
||||
- Manually inject dependencies with your desired configuration to a node.
|
||||
- Spawn two different IPFS nodes with the same configuration in different ports.
|
||||
- Generate a random file and add it from one node.
|
||||
- Retrieve the added file from the other node.
|
||||
|
||||
All of this using only golang!
|
||||
|
||||
In order to complete this tutorial, you will need:
|
||||
- golang installed on your machine. See how at https://golang.org/doc/install
|
||||
- git installed on your machine (so that go can download the repo and the necessary dependencies). See how at https://git-scm.com/downloads
|
||||
- IPFS Desktop (for convenience) installed and running on your machine. See how at https://github.com/ipfs-shipyard/ipfs-desktop#ipfs-desktop
|
||||
|
||||
**Disclaimer**: This example was an attempt to understand how to manually inject dependencies to spawn an IPFS node with custom configurations
|
||||
without having to rely on `repo.Repo` and `BuildCfg`.
|
||||
For this specific example the focus was on being able to initialize our own `exchange.Interface` and set IPFS nodes on different ports. Expect in
|
||||
the future further improvements to this example in order to clean the code and include additional custom configurations. The example is inspired by the [following function](../../../core/builder.go#L27)
|
||||
|
||||
## Getting started
|
||||
|
||||
**Note:** Make sure you have [](https://golang.org/dl/) installed.
|
||||
|
||||
Download go-ipfs and jump into the example folder:
|
||||
|
||||
```
|
||||
> go get -u github.com/ipfs/go-ipfs
|
||||
cd $GOPATH/src/github.com/ipfs/go-ipfs/docs/examples/dep-injection-multiple-nodes
|
||||
```
|
||||
|
||||
## Running the example as-is
|
||||
|
||||
To run the example, simply do:
|
||||
|
||||
```
|
||||
> go run main.go
|
||||
```
|
||||
|
||||
You should see the following as output:
|
||||
|
||||
```
|
||||
[*] Spawned first node listening at: [/ip4/192.168.0.56/tcp/36911 /ip4/127.0.0.1/tcp/36911 /ip6/::1/tcp/39831 /ip4/192.168.0.56/udp/60407/quic /ip4/127.0.0.1/udp/60407/quic /ip6/::1/udp/41561/quic /ip4/2.137.154.240/udp/60407/quic /ip4/2.137.154.240/tcp/36911]
|
||||
[*] Spawned first node listening at: [/ip6/::1/udp/41482/quic /ip4/192.168.0.56/tcp/42321 /ip4/127.0.0.1/tcp/42321 /ip6/::1/tcp/34749 /ip4/192.168.0.56/udp/37708/quic /ip4/127.0.0.1/udp/37708/quic]
|
||||
[*] Connected fron node1 to node2
|
||||
[*] Added a test file to the network: /ipfs/QmXG1dKK7B4srPsiiCi6xZ4HkvpShKoDxuBr7BoNEGic2M
|
||||
[*] Searching for /ipfs/QmXG1dKK7B4srPsiiCi6xZ4HkvpShKoDxuBr7BoNEGic2M from node 2
|
||||
[*] Retrieved file with size: 1324643
|
||||
```
|
||||
|
||||
## Understanding the example
|
||||
|
||||
The example comprises the following parts:
|
||||
* A [main function](./main.go#L309-L362) where all the action happens. Here we define the [size of the random file](./main.go#L312) to be generated, we [spawn two IPFS nodes](./main.go#L317-L335), we [connect both nodes](./main.go#L337-L342), we [generate a random file and added it to the network from node 1](./main.go#L344-L351) and finally [retrieve it from node2](./main.go#L353-L367).
|
||||
|
||||
* The nodes are spawned using the same [`NewNode` function](./main.go#L248-L307) which initializes the node and injects all the corresponding dependencies. In this example both nodes are using the same configuration.
|
||||
* Nodes return their [`close` function](./main.go#L319) in case they want to [be gracefully closed](./main.go#L368-L372), as this way of spawning nodes generate a nil pointer reference error with `node.Close()` as it can't be properly initialized.
|
||||
|
||||
* The configuration and dependencies of the node are set in the [`setConfig` function](./main.go#L68-L249). This is the place to go if you want to change some configurations of the nodes to be spawned. In this function you will be able to do some cool stuff such as:
|
||||
* Setting the listening address for the nodes, or [allow them to listen from any available port](./main.go#L92-L98).
|
||||
* Choosing the [`routingOption` to use](./main.go#L126-L130).
|
||||
* Or setting [your custom exchange interface](./main.go#L132-L170)
|
||||
* Dependencies are injected [here](./main.go#L185-L244). Many of the ones used for this example are the default ones, but you could customize and set them at your desire as done for `hostOpotions` and the `exchangeInterface`.
|
||||
|
||||
* More advanced configurations coming in the future.
|
||||
20
docs/examples/dep-injection-multiple-nodes/go.mod
Normal file
20
docs/examples/dep-injection-multiple-nodes/go.mod
Normal file
@ -0,0 +1,20 @@
|
||||
module github.com/ipfs/go-ipfs/examples/dep-injection-multiple-nodes
|
||||
|
||||
go 1.14
|
||||
|
||||
require (
|
||||
github.com/ipfs/go-filestore v1.0.0
|
||||
github.com/ipfs/go-ipfs v0.6.0 // indirect
|
||||
github.com/ipfs/go-ipfs-blockstore v1.0.1 // indirect
|
||||
github.com/ipfs/go-ipfs-config v0.9.0 // indirect
|
||||
github.com/libp2p/go-libp2p v0.10.2
|
||||
github.com/libp2p/go-libp2p-autonat v0.3.2 // indirect
|
||||
github.com/libp2p/go-libp2p-core v0.6.1
|
||||
github.com/libp2p/go-libp2p-peerstore v0.2.6
|
||||
github.com/libp2p/go-mplex v0.1.3 // indirect
|
||||
github.com/libp2p/go-reuseport-transport v0.0.4 // indirect
|
||||
github.com/libp2p/go-sockaddr v0.1.0 // indirect
|
||||
github.com/libp2p/go-yamux v1.3.8 // indirect
|
||||
github.com/prometheus/common v0.12.0 // indirect
|
||||
go.uber.org/fx v1.13.0 // indirect
|
||||
)
|
||||
1474
docs/examples/dep-injection-multiple-nodes/go.sum
Normal file
1474
docs/examples/dep-injection-multiple-nodes/go.sum
Normal file
File diff suppressed because it is too large
Load Diff
373
docs/examples/dep-injection-multiple-nodes/main.go
Normal file
373
docs/examples/dep-injection-multiple-nodes/main.go
Normal file
@ -0,0 +1,373 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"io"
|
||||
mathrand "math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-bitswap"
|
||||
"github.com/ipfs/go-bitswap/network"
|
||||
"github.com/ipfs/go-datastore"
|
||||
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
config "github.com/ipfs/go-ipfs-config"
|
||||
exchange "github.com/ipfs/go-ipfs-exchange-interface"
|
||||
files "github.com/ipfs/go-ipfs-files"
|
||||
"github.com/ipfs/go-ipfs/core"
|
||||
"github.com/ipfs/go-ipfs/core/bootstrap"
|
||||
"github.com/ipfs/go-ipfs/core/coreapi"
|
||||
"github.com/ipfs/go-ipfs/core/node"
|
||||
"github.com/ipfs/go-ipfs/core/node/helpers"
|
||||
"github.com/ipfs/go-ipfs/core/node/libp2p"
|
||||
"github.com/ipfs/go-ipfs/p2p"
|
||||
"github.com/ipfs/go-ipfs/repo"
|
||||
"github.com/jbenet/goprocess"
|
||||
ci "github.com/libp2p/go-libp2p-core/crypto"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/routing"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
"github.com/prometheus/common/log"
|
||||
|
||||
ds "github.com/ipfs/go-datastore"
|
||||
dsync "github.com/ipfs/go-datastore/sync"
|
||||
|
||||
"github.com/ipfs/go-metrics-interface"
|
||||
"go.uber.org/fx"
|
||||
)
|
||||
|
||||
var randReader *mathrand.Rand
|
||||
|
||||
// RandReader generates a random stream of bytes
|
||||
func RandReader(len int) io.Reader {
|
||||
if randReader == nil {
|
||||
randReader = mathrand.New(mathrand.NewSource(2))
|
||||
}
|
||||
data := make([]byte, len)
|
||||
randReader.Read(data)
|
||||
return bytes.NewReader(data)
|
||||
}
|
||||
|
||||
// baseProcess creates a goprocess which is closed when the lifecycle signals it to stop
|
||||
func baseProcess(lc fx.Lifecycle) goprocess.Process {
|
||||
p := goprocess.WithParent(goprocess.Background())
|
||||
lc.Append(fx.Hook{
|
||||
OnStop: func(_ context.Context) error {
|
||||
return p.Close()
|
||||
},
|
||||
})
|
||||
return p
|
||||
}
|
||||
|
||||
// setConfig manually injects dependencies for the IPFS nodes.
|
||||
func setConfig(ctx context.Context) fx.Option {
|
||||
|
||||
// Create new Datastore
|
||||
d := ds.NewMapDatastore()
|
||||
// Initialize config.
|
||||
cfg := &config.Config{}
|
||||
// Generate new KeyPair instead of using existing one.
|
||||
priv, pub, err := ci.GenerateKeyPairWithReader(ci.RSA, 2048, rand.Reader)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// Generate PeerID
|
||||
pid, err := peer.IDFromPublicKey(pub)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// Get PrivKey
|
||||
privkeyb, err := priv.Bytes()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// Use defaultBootstrap
|
||||
cfg.Bootstrap = config.DefaultBootstrapAddresses
|
||||
|
||||
//Allow the node to start in any available port. We do not use default ones.
|
||||
cfg.Addresses.Swarm = []string{
|
||||
"/ip4/0.0.0.0/tcp/0",
|
||||
"/ip6/::/tcp/0",
|
||||
"/ip4/0.0.0.0/udp/0/quic",
|
||||
"/ip6/::/udp/0/quic",
|
||||
}
|
||||
cfg.Identity.PeerID = pid.Pretty()
|
||||
cfg.Identity.PrivKey = base64.StdEncoding.EncodeToString(privkeyb)
|
||||
|
||||
// Repo structure that encapsulate the config and datastore for dependency injection.
|
||||
buildRepo := &repo.Mock{
|
||||
D: dsync.MutexWrap(d),
|
||||
C: *cfg,
|
||||
}
|
||||
repoOption := fx.Provide(func(lc fx.Lifecycle) repo.Repo {
|
||||
lc.Append(fx.Hook{
|
||||
OnStop: func(ctx context.Context) error {
|
||||
return buildRepo.Close()
|
||||
},
|
||||
})
|
||||
return buildRepo
|
||||
})
|
||||
|
||||
// Enable metrics in the node.
|
||||
metricsCtx := fx.Provide(func() helpers.MetricsCtx {
|
||||
return helpers.MetricsCtx(ctx)
|
||||
})
|
||||
|
||||
// Use DefaultHostOptions
|
||||
hostOption := fx.Provide(func() libp2p.HostOption {
|
||||
return libp2p.DefaultHostOption
|
||||
})
|
||||
|
||||
// Use libp2p.DHTOption. Could also use DHTClientOption.
|
||||
routingOption := fx.Provide(func() libp2p.RoutingOption {
|
||||
// return libp2p.DHTClientOption
|
||||
return libp2p.DHTOption
|
||||
})
|
||||
|
||||
// Uncomment if you want to set Graphsync as exchange interface.
|
||||
// gsExchange := func(mctx helpers.MetricsCtx, lc fx.Lifecycle,
|
||||
// host host.Host, rt routing.Routing, bs blockstore.GCBlockstore) exchange.Interface {
|
||||
|
||||
// // TODO: Graphsync currently doesn't follow exchange.Interface. Is missing Close()
|
||||
// ctx := helpers.LifecycleCtx(mctx, lc)
|
||||
// network := network.NewFromLibp2pHost(host)
|
||||
// ipldBridge := ipldbridge.NewIPLDBridge()
|
||||
// gsExch := gsimpl.New(ctx,
|
||||
// network, ipldBridge,
|
||||
// storeutil.LoaderForBlockstore(bs),
|
||||
// storeutil.StorerForBlockstore(bs),
|
||||
// )
|
||||
|
||||
// lc.Append(fx.Hook{
|
||||
// OnStop: func(ctx context.Context) error {
|
||||
// if exch.Close != nil {
|
||||
// return exch.Close()
|
||||
// }
|
||||
// return nil
|
||||
|
||||
// },
|
||||
// })
|
||||
// return exch
|
||||
// }
|
||||
|
||||
// Initializing bitswap exchange
|
||||
bsExchange := func(mctx helpers.MetricsCtx, lc fx.Lifecycle,
|
||||
host host.Host, rt routing.Routing, bs blockstore.GCBlockstore) exchange.Interface {
|
||||
bitswapNetwork := network.NewFromIpfsHost(host, rt)
|
||||
exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, bs)
|
||||
|
||||
lc.Append(fx.Hook{
|
||||
OnStop: func(ctx context.Context) error {
|
||||
return exch.Close()
|
||||
},
|
||||
})
|
||||
return exch
|
||||
}
|
||||
|
||||
// Return repo datastore
|
||||
repoDS := func(repo repo.Repo) datastore.Datastore {
|
||||
return d
|
||||
}
|
||||
|
||||
// Assign some defualt values.
|
||||
var repubPeriod, recordLifetime time.Duration
|
||||
ipnsCacheSize := cfg.Ipns.ResolveCacheSize
|
||||
enableRelay := cfg.Swarm.Transports.Network.Relay.WithDefault(!cfg.Swarm.DisableRelay) //nolint
|
||||
|
||||
// Inject all dependencies for the node.
|
||||
// Many of the default dependencies being used. If you want to manually set any of them
|
||||
// follow: https://github.com/ipfs/go-ipfs/blob/master/core/node/groups.go
|
||||
return fx.Options(
|
||||
// RepoConfigurations
|
||||
repoOption,
|
||||
hostOption,
|
||||
routingOption,
|
||||
metricsCtx,
|
||||
|
||||
// Setting baseProcess
|
||||
fx.Provide(baseProcess),
|
||||
|
||||
// Storage configuration
|
||||
fx.Provide(repoDS),
|
||||
fx.Provide(node.BaseBlockstoreCtor(blockstore.DefaultCacheOpts(),
|
||||
false, cfg.Datastore.HashOnRead)),
|
||||
fx.Provide(node.GcBlockstoreCtor),
|
||||
|
||||
// Identity dependencies
|
||||
node.Identity(cfg),
|
||||
|
||||
//IPNS dependencies
|
||||
node.IPNS,
|
||||
|
||||
// Network dependencies
|
||||
// fx.Provide(gsExchange), // Uncomment to set graphsync exchange
|
||||
fx.Provide(bsExchange),
|
||||
fx.Provide(node.Namesys(ipnsCacheSize)),
|
||||
fx.Provide(node.Peering),
|
||||
node.PeerWith(cfg.Peering.Peers...),
|
||||
|
||||
fx.Invoke(node.IpnsRepublisher(repubPeriod, recordLifetime)),
|
||||
|
||||
fx.Provide(p2p.New),
|
||||
|
||||
// Libp2p dependencies
|
||||
node.BaseLibP2P,
|
||||
fx.Provide(libp2p.AddrFilters(cfg.Swarm.AddrFilters)),
|
||||
fx.Provide(libp2p.AddrsFactory(cfg.Addresses.Announce, cfg.Addresses.NoAnnounce)),
|
||||
fx.Provide(libp2p.SmuxTransport(cfg.Swarm.Transports)),
|
||||
fx.Provide(libp2p.Relay(enableRelay, cfg.Swarm.EnableRelayHop)),
|
||||
fx.Provide(libp2p.Transports(cfg.Swarm.Transports)),
|
||||
fx.Invoke(libp2p.StartListening(cfg.Addresses.Swarm)),
|
||||
fx.Invoke(libp2p.SetupDiscovery(cfg.Discovery.MDNS.Enabled, cfg.Discovery.MDNS.Interval)),
|
||||
fx.Provide(libp2p.Routing),
|
||||
fx.Provide(libp2p.BaseRouting),
|
||||
|
||||
// Here you can see some more of the libp2p dependencies you could set.
|
||||
// fx.Provide(libp2p.Security(!bcfg.DisableEncryptedConnections, cfg.Swarm.Transports)),
|
||||
// maybeProvide(libp2p.PubsubRouter, bcfg.getOpt("ipnsps")),
|
||||
// maybeProvide(libp2p.BandwidthCounter, !cfg.Swarm.DisableBandwidthMetrics),
|
||||
// maybeProvide(libp2p.NatPortMap, !cfg.Swarm.DisableNatPortMap),
|
||||
// maybeProvide(libp2p.AutoRelay, cfg.Swarm.EnableAutoRelay),
|
||||
// autonat, // Sets autonat
|
||||
// connmgr, // Set connection manager
|
||||
// ps, // Sets pubsub router
|
||||
// disc, // Sets discovery service
|
||||
node.OnlineProviders(cfg.Experimental.StrategicProviding, cfg.Reprovider.Strategy, cfg.Reprovider.Interval),
|
||||
|
||||
// Core configuration
|
||||
node.Core,
|
||||
)
|
||||
}
|
||||
|
||||
// NewNode constructs and returns an IpfsNode using the given cfg.
|
||||
func NewNode(ctx context.Context) (*core.IpfsNode, func() error, error) {
|
||||
// save this context as the "lifetime" ctx.
|
||||
lctx := ctx
|
||||
|
||||
// derive a new context that ignores cancellations from the lifetime ctx.
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
// add a metrics scope.
|
||||
ctx = metrics.CtxScope(ctx, "ipfs")
|
||||
|
||||
n := &core.IpfsNode{}
|
||||
|
||||
app := fx.New(
|
||||
// Inject dependencies in the node.
|
||||
setConfig(ctx),
|
||||
|
||||
fx.NopLogger,
|
||||
fx.Extract(n),
|
||||
)
|
||||
|
||||
var once sync.Once
|
||||
var stopErr error
|
||||
stopNode := func() error {
|
||||
once.Do(func() {
|
||||
stopErr = app.Stop(context.Background())
|
||||
if stopErr != nil {
|
||||
log.Error("failure on stop: ", stopErr)
|
||||
}
|
||||
// Cancel the context _after_ the app has stopped.
|
||||
cancel()
|
||||
})
|
||||
return stopErr
|
||||
}
|
||||
// Set node to Online mode.
|
||||
n.IsOnline = true
|
||||
|
||||
go func() {
|
||||
// Shut down the application if the lifetime context is canceled.
|
||||
// NOTE: we _should_ stop the application by calling `Close()`
|
||||
// on the process. But we currently manage everything with contexts.
|
||||
select {
|
||||
case <-lctx.Done():
|
||||
err := stopNode()
|
||||
if err != nil {
|
||||
log.Error("failure on stop: ", err)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}()
|
||||
|
||||
if app.Err() != nil {
|
||||
return nil, nil, app.Err()
|
||||
}
|
||||
|
||||
if err := app.Start(ctx); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return n, stopNode, n.Bootstrap(bootstrap.DefaultBootstrapConfig)
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
||||
// FileSize to be generated randomly for the execution.
|
||||
FileSize := 1324643
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Spawn two different nodes with the same configuration.
|
||||
// Each will be started in a different port.
|
||||
n1, n1Close, err := NewNode(ctx)
|
||||
fmt.Println("[*] Spawned first node listening at: ", n1.PeerHost.Addrs())
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
n2, n2Close, err := NewNode(ctx)
|
||||
fmt.Println("[*] Spawned first node listening at: ", n2.PeerHost.Addrs())
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Configuring core APIs
|
||||
api1, err := coreapi.NewCoreAPI(n1)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
api2, err := coreapi.NewCoreAPI(n2)
|
||||
|
||||
// Connecting from n1 to n2
|
||||
err = api1.Swarm().Connect(ctx, *host.InfoFromHost(n2.PeerHost))
|
||||
if err != nil {
|
||||
fmt.Printf("failed to connect: %s\n", err)
|
||||
}
|
||||
fmt.Println("[*] Connected fron node1 to node2")
|
||||
|
||||
// Randomly generate the file.
|
||||
randomFile := files.NewReaderFile(RandReader(FileSize))
|
||||
// Add the file to the network.
|
||||
cidRandom, err := api1.Unixfs().Add(ctx, randomFile)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("Could not add random: %s", err))
|
||||
}
|
||||
fmt.Println("[*] Added a test file to the network:", cidRandom)
|
||||
|
||||
// Retrieve the DAG structure from the other node.
|
||||
fmt.Printf("[*] Searching for %v from node 2\n", cidRandom)
|
||||
f, err := api2.Unixfs().Get(ctx, cidRandom)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("Could find file in IPFS: %s", err))
|
||||
}
|
||||
// Traverse the full graph and write the file in /tmp/
|
||||
// If we don't write the file we only get the DagReader in f.
|
||||
err = files.WriteTo(f, "/tmp/"+time.Now().String())
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("Could not write retrieved file: %s", err))
|
||||
}
|
||||
// Size of the file.
|
||||
s, _ := f.Size()
|
||||
fmt.Println("[*] Retrieved file with size: ", s)
|
||||
// Close both nodes.
|
||||
n1Close()
|
||||
fmt.Println("[*] Gracefully closed node 1")
|
||||
n2Close()
|
||||
fmt.Println("[*] Gracefully closed node 2")
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user