kubo/test/cli/harness/node.go
Marcin Rataj 6a008fc74c
refactor: apply go fix modernizers from Go 1.26 (#11190)
* chore: apply go fix modernizers from Go 1.26

automated refactoring: interface{} to any, slices.Contains,
and other idiomatic updates.

* feat(ci): add `go fix` check to Go analysis workflow

ensures Go 1.26 modernizers are applied, fails CI if `go fix ./...`
produces any changes (similar to existing `go fmt` enforcement)
2026-02-11 01:01:32 +01:00

758 lines
19 KiB
Go

package harness
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"strings"
"syscall"
"time"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/kubo/config"
serial "github.com/ipfs/kubo/config/serialize"
"github.com/libp2p/go-libp2p/core/peer"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
var log = logging.Logger("testharness")
// Node is a single Kubo node.
// Each node has its own config and can run its own Kubo daemon.
type Node struct {
ID int
Dir string
APIListenAddr multiaddr.Multiaddr
GatewayListenAddr multiaddr.Multiaddr
SwarmAddr multiaddr.Multiaddr
EnableMDNS bool
IPFSBin string
Runner *Runner
Daemon *RunResult
}
func BuildNode(ipfsBin, baseDir string, id int) *Node {
dir := filepath.Join(baseDir, strconv.Itoa(id))
if err := os.MkdirAll(dir, 0o755); err != nil {
panic(err)
}
env := environToMap(os.Environ())
env["IPFS_PATH"] = dir
// If using "ipfs" binary name, provide helpful binary information
if ipfsBin == "ipfs" {
// Check if cmd/ipfs/ipfs exists (simple relative path check)
localBinary := "cmd/ipfs/ipfs"
localExists := false
if _, err := os.Stat(localBinary); err == nil {
localExists = true
if abs, err := filepath.Abs(localBinary); err == nil {
localBinary = abs
}
}
// Check if ipfs is available in PATH
pathBinary, pathErr := exec.LookPath("ipfs")
// Handle different scenarios
if pathErr != nil {
// No ipfs in PATH
if localExists {
fmt.Printf("WARNING: No 'ipfs' found in PATH, but local binary exists at %s\n", localBinary)
fmt.Printf("Consider adding it to PATH or run: export PATH=\"$(pwd)/cmd/ipfs:$PATH\"\n")
} else {
fmt.Printf("ERROR: No 'ipfs' binary found in PATH and no local build at cmd/ipfs/ipfs\n")
fmt.Printf("Run 'make build' first or install ipfs and add it to PATH\n")
panic("ipfs binary not available")
}
} else {
// ipfs found in PATH
if localExists && localBinary != pathBinary {
fmt.Printf("NOTE: Local binary at %s differs from PATH binary at %s\n", localBinary, pathBinary)
fmt.Printf("Consider adding the local binary to PATH if you want to use the version built by 'make build'\n")
}
// If they match or no local binary, no message needed
}
}
return &Node{
ID: id,
Dir: dir,
IPFSBin: ipfsBin,
Runner: &Runner{
Env: env,
Dir: dir,
},
}
}
func (n *Node) WriteBytes(filename string, b []byte) {
f, err := os.Create(filepath.Join(n.Dir, filename))
if err != nil {
panic(err)
}
defer f.Close()
_, err = io.Copy(f, bytes.NewReader(b))
if err != nil {
panic(err)
}
}
// ReadFile reads the specific file. If it is relative, it is relative the node's root dir.
func (n *Node) ReadFile(filename string) string {
f := filename
if !filepath.IsAbs(filename) {
f = filepath.Join(n.Dir, filename)
}
b, err := os.ReadFile(f)
if err != nil {
panic(err)
}
return string(b)
}
func (n *Node) ConfigFile() string {
return filepath.Join(n.Dir, "config")
}
func (n *Node) ReadConfig() *config.Config {
cfg, err := serial.Load(filepath.Join(n.Dir, "config"))
if err != nil {
panic(err)
}
return cfg
}
func (n *Node) WriteConfig(c *config.Config) {
err := serial.WriteConfigFile(filepath.Join(n.Dir, "config"), c)
if err != nil {
panic(err)
}
}
func (n *Node) UpdateConfig(f func(cfg *config.Config)) {
cfg := n.ReadConfig()
f(cfg)
n.WriteConfig(cfg)
}
func (n *Node) ReadUserResourceOverrides() *rcmgr.PartialLimitConfig {
var r rcmgr.PartialLimitConfig
err := serial.ReadConfigFile(filepath.Join(n.Dir, "libp2p-resource-limit-overrides.json"), &r)
switch err {
case nil, serial.ErrNotInitialized:
return &r
default:
panic(err)
}
}
func (n *Node) WriteUserSuppliedResourceOverrides(c *rcmgr.PartialLimitConfig) {
err := serial.WriteConfigFile(filepath.Join(n.Dir, "libp2p-resource-limit-overrides.json"), c)
if err != nil {
panic(err)
}
}
func (n *Node) UpdateUserSuppliedResourceManagerOverrides(f func(overrides *rcmgr.PartialLimitConfig)) {
overrides := n.ReadUserResourceOverrides()
f(overrides)
n.WriteUserSuppliedResourceOverrides(overrides)
}
func (n *Node) IPFS(args ...string) *RunResult {
res := n.RunIPFS(args...)
n.Runner.AssertNoError(res)
return res
}
func (n *Node) PipeStrToIPFS(s string, args ...string) *RunResult {
return n.PipeToIPFS(strings.NewReader(s), args...)
}
func (n *Node) PipeToIPFS(reader io.Reader, args ...string) *RunResult {
res := n.RunPipeToIPFS(reader, args...)
n.Runner.AssertNoError(res)
return res
}
func (n *Node) RunPipeToIPFS(reader io.Reader, args ...string) *RunResult {
return n.Runner.Run(RunRequest{
Path: n.IPFSBin,
Args: args,
CmdOpts: []CmdOpt{RunWithStdin(reader)},
})
}
func (n *Node) RunIPFS(args ...string) *RunResult {
return n.Runner.Run(RunRequest{
Path: n.IPFSBin,
Args: args,
})
}
// Init initializes and configures the IPFS node, after which it is ready to run.
func (n *Node) Init(ipfsArgs ...string) *Node {
n.Runner.MustRun(RunRequest{
Path: n.IPFSBin,
Args: append([]string{"init"}, ipfsArgs...),
})
if n.SwarmAddr == nil {
swarmAddr, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/0")
if err != nil {
panic(err)
}
n.SwarmAddr = swarmAddr
}
if n.APIListenAddr == nil {
apiAddr, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/0")
if err != nil {
panic(err)
}
n.APIListenAddr = apiAddr
}
if n.GatewayListenAddr == nil {
gatewayAddr, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/0")
if err != nil {
panic(err)
}
n.GatewayListenAddr = gatewayAddr
}
n.UpdateConfig(func(cfg *config.Config) {
cfg.Bootstrap = []string{}
cfg.Addresses.Swarm = []string{n.SwarmAddr.String()}
cfg.Addresses.API = []string{n.APIListenAddr.String()}
cfg.Addresses.Gateway = []string{n.GatewayListenAddr.String()}
cfg.Swarm.DisableNatPortMap = true
cfg.Discovery.MDNS.Enabled = n.EnableMDNS
cfg.Routing.LoopbackAddressesOnLanDHT = config.True
// Telemetry disabled by default in tests.
cfg.Plugins = config.Plugins{
Plugins: map[string]config.Plugin{
"telemetry": config.Plugin{
Disabled: true,
},
},
}
})
return n
}
// StartDaemonWithReq runs a Kubo daemon with the given request.
// This overwrites the request Path with the Kubo bin path.
//
// For example, if you want to run the daemon and see stderr and stdout to debug:
//
// node.StartDaemonWithReq(harness.RunRequest{
// CmdOpts: []harness.CmdOpt{
// harness.RunWithStderr(os.Stdout),
// harness.RunWithStdout(os.Stdout),
// },
// })
func (n *Node) StartDaemonWithReq(req RunRequest, authorization string) *Node {
alive := n.IsAlive()
if alive {
log.Panicf("node %d is already running", n.ID)
}
newReq := req
newReq.Path = n.IPFSBin
newReq.Args = append([]string{"daemon"}, req.Args...)
newReq.RunFunc = (*exec.Cmd).Start
log.Debugf("starting node %d", n.ID)
res := n.Runner.MustRun(newReq)
n.Daemon = res
log.Debugf("node %d started, checking API", n.ID)
n.WaitOnAPI(authorization)
return n
}
func (n *Node) StartDaemon(ipfsArgs ...string) *Node {
return n.StartDaemonWithReq(RunRequest{
Args: ipfsArgs,
}, "")
}
func (n *Node) StartDaemonWithAuthorization(secret string, ipfsArgs ...string) *Node {
return n.StartDaemonWithReq(RunRequest{
Args: ipfsArgs,
}, secret)
}
func (n *Node) signalAndWait(watch <-chan struct{}, signal os.Signal, t time.Duration) bool {
err := n.Daemon.Cmd.Process.Signal(signal)
if err != nil {
if errors.Is(err, os.ErrProcessDone) {
log.Debugf("process for node %d has already finished", n.ID)
return true
}
log.Panicf("error killing daemon for node %d with peer ID %s: %s", n.ID, n.PeerID(), err.Error())
}
timer := time.NewTimer(t)
defer timer.Stop()
select {
case <-watch:
return true
case <-timer.C:
return false
}
}
func (n *Node) StopDaemon() *Node {
log.Debugf("stopping node %d", n.ID)
if n.Daemon == nil {
log.Debugf("didn't stop node %d since no daemon present", n.ID)
return n
}
watch := make(chan struct{}, 1)
go func() {
_, _ = n.Daemon.Cmd.Process.Wait()
watch <- struct{}{}
}()
// os.Interrupt does not support interrupts on Windows https://github.com/golang/go/issues/46345
if runtime.GOOS == "windows" {
if n.signalAndWait(watch, syscall.SIGKILL, 5*time.Second) {
return n
}
log.Panicf("timed out stopping node %d with peer ID %s", n.ID, n.PeerID())
}
log.Debugf("signaling node %d with SIGTERM", n.ID)
if n.signalAndWait(watch, syscall.SIGTERM, 1*time.Second) {
return n
}
log.Debugf("signaling node %d with SIGTERM", n.ID)
if n.signalAndWait(watch, syscall.SIGTERM, 2*time.Second) {
return n
}
log.Debugf("signaling node %d with SIGQUIT", n.ID)
if n.signalAndWait(watch, syscall.SIGQUIT, 5*time.Second) {
return n
}
log.Debugf("signaling node %d with SIGKILL", n.ID)
if n.signalAndWait(watch, syscall.SIGKILL, 5*time.Second) {
return n
}
log.Panicf("timed out stopping node %d with peer ID %s", n.ID, n.PeerID())
return n
}
func (n *Node) APIAddr() multiaddr.Multiaddr {
ma, err := n.TryAPIAddr()
if err != nil {
panic(err)
}
return ma
}
func (n *Node) APIURL() string {
apiAddr := n.APIAddr()
netAddr, err := manet.ToNetAddr(apiAddr)
if err != nil {
panic(err)
}
return "http://" + netAddr.String()
}
func (n *Node) TryAPIAddr() (multiaddr.Multiaddr, error) {
b, err := os.ReadFile(filepath.Join(n.Dir, "api"))
if err != nil {
return nil, err
}
ma, err := multiaddr.NewMultiaddr(string(b))
if err != nil {
return nil, err
}
return ma, nil
}
func (n *Node) checkAPI(authorization string) bool {
apiAddr, err := n.TryAPIAddr()
if err != nil {
log.Debugf("node %d API addr not available yet: %s", n.ID, err.Error())
return false
}
if unixAddr, err := apiAddr.ValueForProtocol(multiaddr.P_UNIX); err == nil {
parts := strings.SplitN(unixAddr, "/", 2)
if len(parts) < 1 {
panic("malformed unix socket address")
}
fileName := "/" + parts[1]
_, err := os.Stat(fileName)
return !errors.Is(err, fs.ErrNotExist)
}
ip, err := apiAddr.ValueForProtocol(multiaddr.P_IP4)
if err != nil {
panic(err)
}
port, err := apiAddr.ValueForProtocol(multiaddr.P_TCP)
if err != nil {
panic(err)
}
url := fmt.Sprintf("http://%s:%s/api/v0/id", ip, port)
log.Debugf("checking API for node %d at %s", n.ID, url)
req, err := http.NewRequest(http.MethodPost, url, nil)
if err != nil {
panic(err)
}
if authorization != "" {
req.Header.Set("Authorization", authorization)
}
httpResp, err := http.DefaultClient.Do(req)
if err != nil {
log.Debugf("node %d API check error: %s", err.Error())
return false
}
defer httpResp.Body.Close()
resp := struct {
ID string
}{}
respBytes, err := io.ReadAll(httpResp.Body)
if err != nil {
log.Debugf("error reading API check response for node %d: %s", n.ID, err.Error())
return false
}
log.Debugf("got API check response for node %d: %s", n.ID, string(respBytes))
err = json.Unmarshal(respBytes, &resp)
if err != nil {
log.Debugf("error decoding API check response for node %d: %s", n.ID, err.Error())
return false
}
if resp.ID == "" {
log.Debugf("API check response for node %d did not contain a Peer ID", n.ID)
return false
}
respPeerID, err := peer.Decode(resp.ID)
if err != nil {
panic(err)
}
peerID := n.PeerID()
if respPeerID != peerID {
log.Panicf("expected peer ID %s but got %s", peerID, resp.ID)
}
log.Debugf("API check for node %d successful", n.ID)
return true
}
func (n *Node) PeerID() peer.ID {
cfg := n.ReadConfig()
id, err := peer.Decode(cfg.Identity.PeerID)
if err != nil {
panic(err)
}
return id
}
func (n *Node) WaitOnAPI(authorization string) *Node {
log.Debugf("waiting on API for node %d", n.ID)
for range 50 {
if n.checkAPI(authorization) {
log.Debugf("daemon API found, daemon stdout: %s", n.Daemon.Stdout.String())
return n
}
time.Sleep(400 * time.Millisecond)
}
log.Panicf("node %d with peer ID %s failed to come online: \n%s\n\n%s", n.ID, n.PeerID(), n.Daemon.Stderr.String(), n.Daemon.Stdout.String())
return n
}
func (n *Node) IsAlive() bool {
if n.Daemon == nil || n.Daemon.Cmd == nil || n.Daemon.Cmd.Process == nil {
return false
}
log.Debugf("signaling node %d daemon process for liveness check", n.ID)
err := n.Daemon.Cmd.Process.Signal(syscall.Signal(0))
if err == nil {
log.Debugf("node %d daemon is alive", n.ID)
return true
}
log.Debugf("node %d daemon not alive: %s", err.Error())
return false
}
func (n *Node) SwarmAddrs() []multiaddr.Multiaddr {
res := n.Runner.Run(RunRequest{
Path: n.IPFSBin,
Args: []string{"swarm", "addrs", "local"},
})
if res.ExitCode() != 0 {
// If swarm command fails (e.g., daemon not online), return empty slice
log.Debugf("Node %d: swarm addrs local failed (exit %d): %s", n.ID, res.ExitCode(), res.Stderr.String())
return []multiaddr.Multiaddr{}
}
out := strings.TrimSpace(res.Stdout.String())
if out == "" {
log.Debugf("Node %d: swarm addrs local returned empty output", n.ID)
return []multiaddr.Multiaddr{}
}
log.Debugf("Node %d: swarm addrs local output: %s", n.ID, out)
outLines := strings.Split(out, "\n")
var addrs []multiaddr.Multiaddr
for _, addrStr := range outLines {
addrStr = strings.TrimSpace(addrStr)
if addrStr == "" {
continue
}
ma, err := multiaddr.NewMultiaddr(addrStr)
if err != nil {
panic(err)
}
addrs = append(addrs, ma)
}
log.Debugf("Node %d: parsed %d swarm addresses", n.ID, len(addrs))
return addrs
}
// SwarmAddrsWithTimeout waits for swarm addresses to be available
func (n *Node) SwarmAddrsWithTimeout(timeout time.Duration) []multiaddr.Multiaddr {
start := time.Now()
for time.Since(start) < timeout {
addrs := n.SwarmAddrs()
if len(addrs) > 0 {
return addrs
}
time.Sleep(100 * time.Millisecond)
}
return []multiaddr.Multiaddr{}
}
func (n *Node) SwarmAddrsWithPeerIDs() []multiaddr.Multiaddr {
return n.SwarmAddrsWithPeerIDsTimeout(5 * time.Second)
}
func (n *Node) SwarmAddrsWithPeerIDsTimeout(timeout time.Duration) []multiaddr.Multiaddr {
ipfsProtocol := multiaddr.ProtocolWithCode(multiaddr.P_IPFS).Name
peerID := n.PeerID()
var addrs []multiaddr.Multiaddr
for _, ma := range n.SwarmAddrsWithTimeout(timeout) {
// add the peer ID to the multiaddr if it doesn't have it
_, err := ma.ValueForProtocol(multiaddr.P_IPFS)
if errors.Is(err, multiaddr.ErrProtocolNotFound) {
comp, err := multiaddr.NewComponent(ipfsProtocol, peerID.String())
if err != nil {
panic(err)
}
ma = ma.Encapsulate(comp)
}
addrs = append(addrs, ma)
}
return addrs
}
func (n *Node) SwarmAddrsWithoutPeerIDs() []multiaddr.Multiaddr {
var addrs []multiaddr.Multiaddr
for _, ma := range n.SwarmAddrs() {
i := 0
for _, c := range ma {
if c.Protocol().Code == multiaddr.P_IPFS {
continue
}
ma[i] = c
i++
}
ma = ma[:i]
if len(ma) > 0 {
addrs = append(addrs, ma)
}
}
return addrs
}
func (n *Node) Connect(other *Node) *Node {
// Get the peer addresses to connect to
addrs := other.SwarmAddrsWithPeerIDs()
if len(addrs) == 0 {
// If no addresses available, skip connection
log.Debugf("No swarm addresses available for connection")
return n
}
// Use Run instead of MustRun to avoid panics on connection failures
res := n.Runner.Run(RunRequest{
Path: n.IPFSBin,
Args: []string{"swarm", "connect", addrs[0].String()},
})
if res.ExitCode() != 0 {
log.Debugf("swarm connect failed: %s", res.Stderr.String())
}
return n
}
// ConnectAndWait connects to another node and waits for the connection to be established
func (n *Node) ConnectAndWait(other *Node, timeout time.Duration) error {
// Get the peer addresses to connect to - wait up to half the timeout for addresses
addrs := other.SwarmAddrsWithPeerIDsTimeout(timeout / 2)
if len(addrs) == 0 {
return fmt.Errorf("no swarm addresses available for node %d after waiting %v", other.ID, timeout/2)
}
otherPeerID := other.PeerID()
// Try to connect
res := n.Runner.Run(RunRequest{
Path: n.IPFSBin,
Args: []string{"swarm", "connect", addrs[0].String()},
})
if res.ExitCode() != 0 {
return fmt.Errorf("swarm connect failed: %s", res.Stderr.String())
}
// Wait for connection to be established
start := time.Now()
for time.Since(start) < timeout {
peers := n.Peers()
for _, peerAddr := range peers {
if peerID, err := peerAddr.ValueForProtocol(multiaddr.P_P2P); err == nil {
if peerID == otherPeerID.String() {
return nil // Connection established
}
}
}
time.Sleep(100 * time.Millisecond)
}
return fmt.Errorf("timeout waiting for connection to node %d (peer %s)", other.ID, otherPeerID)
}
func (n *Node) Peers() []multiaddr.Multiaddr {
// Wait for daemon to be ready if it's supposed to be running
if n.Daemon != nil && n.Daemon.Cmd != nil && n.Daemon.Cmd.Process != nil {
// Give daemon a short time to become ready
for range 10 {
if n.IsAlive() {
break
}
time.Sleep(100 * time.Millisecond)
}
}
res := n.Runner.Run(RunRequest{
Path: n.IPFSBin,
Args: []string{"swarm", "peers"},
})
if res.ExitCode() != 0 {
// If swarm peers fails (e.g., daemon not online), return empty slice
log.Debugf("swarm peers failed: %s", res.Stderr.String())
return []multiaddr.Multiaddr{}
}
var addrs []multiaddr.Multiaddr
for _, line := range res.Stdout.Lines() {
ma, err := multiaddr.NewMultiaddr(line)
if err != nil {
panic(err)
}
addrs = append(addrs, ma)
}
return addrs
}
func (n *Node) PeerWith(other *Node) {
n.UpdateConfig(func(cfg *config.Config) {
var addrs []multiaddr.Multiaddr
for _, addrStr := range other.ReadConfig().Addresses.Swarm {
ma, err := multiaddr.NewMultiaddr(addrStr)
if err != nil {
panic(err)
}
addrs = append(addrs, ma)
}
cfg.Peering.Peers = append(cfg.Peering.Peers, peer.AddrInfo{
ID: other.PeerID(),
Addrs: addrs,
})
})
}
func (n *Node) Disconnect(other *Node) {
n.IPFS("swarm", "disconnect", "/p2p/"+other.PeerID().String())
}
// GatewayURL waits for the gateway file and then returns its contents or times out.
func (n *Node) GatewayURL() string {
timer := time.NewTimer(1 * time.Second)
defer timer.Stop()
for {
select {
case <-timer.C:
panic("timeout waiting for gateway file")
default:
b, err := os.ReadFile(filepath.Join(n.Dir, "gateway"))
if err == nil {
return strings.TrimSpace(string(b))
}
if !errors.Is(err, fs.ErrNotExist) {
panic(err)
}
time.Sleep(1 * time.Millisecond)
}
}
}
func (n *Node) GatewayClient() *HTTPClient {
return &HTTPClient{
Client: http.DefaultClient,
BaseURL: n.GatewayURL(),
}
}
func (n *Node) APIClient() *HTTPClient {
return &HTTPClient{
Client: http.DefaultClient,
BaseURL: n.APIURL(),
}
}
// DatastoreCount returns the count of entries matching the given prefix.
// Requires the daemon to be stopped.
func (n *Node) DatastoreCount(prefix string) int64 {
res := n.IPFS("diag", "datastore", "count", prefix)
count, _ := strconv.ParseInt(strings.TrimSpace(res.Stdout.String()), 10, 64)
return count
}
// DatastoreGet retrieves the value at the given key.
// Requires the daemon to be stopped. Returns nil if key not found.
func (n *Node) DatastoreGet(key string) []byte {
res := n.RunIPFS("diag", "datastore", "get", key)
if res.Err != nil {
return nil
}
return res.Stdout.Bytes()
}
// DatastoreHasKey checks if a key exists in the datastore.
// Requires the daemon to be stopped.
func (n *Node) DatastoreHasKey(key string) bool {
res := n.RunIPFS("diag", "datastore", "get", key)
return res.Err == nil
}