refactor: remove goprocess (#10872)

* refactor: remove goprocess

The `goprocess` package is no longer needed. It can be replaces by modern `context` and `context.AfterFunc`.

* mod tidy

* log unmount errors on shutdown

* Do not log non-mounted errors on shutdown

* Use WaitGroup associated with IPFS node to wait for services to whutdown

* Prefer explicit Close to context.ArterFunc

* Do not use node-level WaitGroup

* Unmount for non-supported platforms

* fix return values

* test: daemon shuts down gracefully

make sure ongoing operations dont block shutdown

* test(cli): add TestFUSE

* test: smarter RequiresFUSE

opportunistically run FUSE tests if env has fusermount
and TEST_FUSE was not explicitly set

* docs: changelog

---------

Co-authored-by: gammazero <gammazero@users.noreply.github.com>
Co-authored-by: Marcin Rataj <lidel@lidel.org>
This commit is contained in:
Andrew Gillis 2025-08-05 15:33:45 -07:00 committed by GitHub
parent 47b31fe0c3
commit 90b73d2ad2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 453 additions and 98 deletions

View File

@ -35,7 +35,6 @@ import (
fsrepo "github.com/ipfs/kubo/repo/fsrepo"
"github.com/ipfs/kubo/repo/fsrepo/migrations"
"github.com/ipfs/kubo/repo/fsrepo/migrations/ipfsfetcher"
goprocess "github.com/jbenet/goprocess"
p2pcrypto "github.com/libp2p/go-libp2p/core/crypto"
pnet "github.com/libp2p/go-libp2p/core/pnet"
"github.com/libp2p/go-libp2p/core/protocol"
@ -537,10 +536,19 @@ take effect.
if err != nil {
return err
}
pluginErrc := make(chan error, 1)
select {
case <-node.Process.Closing():
case <-node.Context().Done():
close(pluginErrc)
default:
node.Process.AddChild(goprocess.WithTeardown(cctx.Plugins.Close))
context.AfterFunc(node.Context(), func() {
err := cctx.Plugins.Close()
if err != nil {
pluginErrc <- fmt.Errorf("closing plugins: %w", err)
}
close(pluginErrc)
})
}
// construct api endpoint - every time
@ -558,6 +566,11 @@ take effect.
if err := mountFuse(req, cctx); err != nil {
return err
}
defer func() {
if _err != nil {
nodeMount.Unmount(node)
}
}()
}
// repo blockstore GC - if --enable-gc flag is present
@ -703,10 +716,17 @@ take effect.
log.Fatal("Support for IPFS_REUSEPORT was removed. Use LIBP2P_TCP_REUSEPORT instead.")
}
unmountErrc := make(chan error)
context.AfterFunc(node.Context(), func() {
<-node.Context().Done()
nodeMount.Unmount(node)
close(unmountErrc)
})
// collect long-running errors and block for shutdown
// TODO(cryptix): our fuse currently doesn't follow this pattern for graceful shutdown
var errs error
for err := range merge(apiErrc, gwErrc, gcErrc, p2pGwErrc) {
for err := range merge(apiErrc, gwErrc, gcErrc, p2pGwErrc, pluginErrc, unmountErrc) {
if err != nil {
errs = multierr.Append(errs, err)
}
@ -1053,14 +1073,13 @@ func serveTrustlessGatewayOverLibp2p(cctx *oldcmds.Context) (<-chan error, error
errc := make(chan error, 1)
go func() {
defer close(errc)
errc <- h.Serve()
close(errc)
}()
go func() {
<-node.Process.Closing()
context.AfterFunc(node.Context(), func() {
h.Close()
}()
})
return errc, nil
}
@ -1145,14 +1164,14 @@ func maybeRunGC(req *cmds.Request, node *core.IpfsNode) (<-chan error, error) {
return errc, nil
}
// merge does fan-in of multiple read-only error channels
// taken from http://blog.golang.org/pipelines
// merge does fan-in of multiple read-only error channels.
func merge(cs ...<-chan error) <-chan error {
var wg sync.WaitGroup
out := make(chan error)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
// Start a goroutine for each input channel in cs, that copies values from
// the input channel to the output channel until the input channel is
// closed.
output := func(c <-chan error) {
for n := range c {
out <- n
@ -1166,8 +1185,8 @@ func merge(cs ...<-chan error) <-chan error {
}
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
// Start a goroutine to close out once all the output goroutines, and other
// things to wait on, are done.
go func() {
wg.Wait()
close(out)
@ -1238,8 +1257,6 @@ Visit https://github.com/ipfs/kubo/releases or https://dist.ipfs.tech/#kubo and
select {
case <-ctx.Done():
return
case <-nd.Process.Closing():
return
case <-ticker.C:
continue
}

View File

@ -21,7 +21,6 @@ import (
fsnotify "github.com/fsnotify/fsnotify"
"github.com/ipfs/boxo/files"
process "github.com/jbenet/goprocess"
)
var (
@ -54,7 +53,6 @@ func main() {
}
func run(ipfsPath, watchPath string) error {
proc := process.WithParent(process.Background())
log.Printf("running IPFSWatch on '%s' using repo at '%s'...", watchPath, ipfsPath)
ipfsPath, err := fsutil.ExpandHome(ipfsPath)
@ -99,11 +97,11 @@ func run(ipfsPath, watchPath string) error {
corehttp.WebUIOption,
corehttp.CommandsOption(cmdCtx(node, ipfsPath)),
}
proc.Go(func(p process.Process) {
go func() {
if err := corehttp.ListenAndServe(node, addr, opts...); err != nil {
return
}
})
}()
}
interrupts := make(chan os.Signal, 1)
@ -137,7 +135,7 @@ func run(ipfsPath, watchPath string) error {
}
}
}
proc.Go(func(p process.Process) {
go func() {
file, err := os.Open(e.Name)
if err != nil {
log.Println(err)
@ -162,7 +160,7 @@ func run(ipfsPath, watchPath string) error {
log.Println(err)
}
log.Printf("added %s... key: %s", e.Name, k)
})
}()
}
case err := <-watcher.Errors:
log.Println(err)

View File

@ -29,7 +29,6 @@ import (
provider "github.com/ipfs/boxo/provider"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
goprocess "github.com/jbenet/goprocess"
ddht "github.com/libp2p/go-libp2p-kad-dht/dual"
pubsub "github.com/libp2p/go-libp2p-pubsub"
psrouter "github.com/libp2p/go-libp2p-pubsub-router"
@ -119,8 +118,7 @@ type IpfsNode struct {
P2P *p2p.P2P `optional:"true"`
Process goprocess.Process
ctx context.Context
ctx context.Context
stop func() error

View File

@ -13,8 +13,6 @@ import (
logging "github.com/ipfs/go-log/v2"
core "github.com/ipfs/kubo/core"
"github.com/jbenet/goprocess"
periodicproc "github.com/jbenet/goprocess/periodic"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
@ -97,7 +95,7 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error
}
select {
case <-node.Process.Closing():
case <-node.Context().Done():
return fmt.Errorf("failed to start server, process closing")
default:
}
@ -107,20 +105,31 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error
}
var serverError error
serverProc := node.Process.Go(func(p goprocess.Process) {
serverClosed := make(chan struct{})
go func() {
serverError = server.Serve(lis)
})
close(serverClosed)
}()
// wait for server to exit.
select {
case <-serverProc.Closed():
case <-serverClosed:
// if node being closed before server exits, close server
case <-node.Process.Closing():
case <-node.Context().Done():
log.Infof("server at %s terminating...", addr)
warnProc := periodicproc.Tick(5*time.Second, func(_ goprocess.Process) {
log.Infof("waiting for server at %s to terminate...", addr)
})
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
log.Infof("waiting for server at %s to terminate...", addr)
case <-serverClosed:
return
}
}
}()
// This timeout shouldn't be necessary if all of our commands
// are obeying their contexts but we should have *some* timeout.
@ -130,10 +139,8 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error
// Should have already closed but we still need to wait for it
// to set the error.
<-serverProc.Closed()
<-serverClosed
serverError = err
warnProc.Close()
}
log.Infof("server at %s terminated", addr)

View File

@ -445,8 +445,6 @@ func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option {
return fx.Options(
bcfgOpts,
fx.Provide(baseProcess),
Storage(bcfg, cfg),
Identity(cfg),
IPNS,

View File

@ -4,7 +4,6 @@ import (
"context"
"errors"
"github.com/jbenet/goprocess"
"go.uber.org/fx"
)
@ -55,14 +54,3 @@ func maybeInvoke(opt interface{}, enable bool) fx.Option {
}
return fx.Options()
}
// baseProcess creates a goprocess which is closed when the lifecycle signals it to stop
func baseProcess(lc fx.Lifecycle) goprocess.Process {
p := goprocess.WithParent(goprocess.Background())
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return p.Close()
},
})
return p
}

View File

@ -11,7 +11,7 @@ This release was brought to you by the [Interplanetary Shipyard](https://ipship
- [Overview](#overview)
- [🔦 Highlights](#-highlights)
- [Clear provide queue when reprovide strategy changes](#clear-provide-queue-when-reprovide-strategy-changes)
- [Remove unnecessary packages from thirdparty](#remove-unnecessary-packages-from-thirdparty)
- [Removed unnecessary dependencies](#removed-unnecessary-dependencies)
- [📦️ Important dependency updates](#-important-dependency-updates)
- [📝 Changelog](#-changelog)
- [👨‍👩‍👧‍👦 Contributors](#-contributors)
@ -31,13 +31,16 @@ A new `ipfs provide clear` command also allows manual queue clearing for debuggi
> [!NOTE]
> Upgrading to Kubo 0.37 will automatically clear any preexisting provide queue. The next time `Reprovider.Interval` hits, `Reprovider.Strategy` will be executed on a clean slate, ensuring consistent behavior with your current configuration.
#### Remove unnecessary packages from thirdparty
#### Removed unnecessary dependencies
Removed unnecessary packages from the `thirdparty` area of kubo repositroy.
Kubo has been cleaned up by removing unnecessary dependencies and packages:
- Removed `thirdparty/assert` (replaced by `github.com/stretchr/testify/require`)
- Removed `thirdparty/dir` (replaced by `misc/fsutil)`
- Removed `thirdparty/dir` (replaced by `misc/fsutil`)
- Removed `thirdparty/notifier` (unused)
- Removed `goprocess` dependency (replaced with native Go `context` patterns)
These changes reduce the dependency footprint while improving code maintainability and following Go best practices.
#### 📦️ Important dependency updates

View File

@ -102,7 +102,6 @@ require (
github.com/ipshipyard/p2p-forge v0.6.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.10 // indirect
github.com/koron/go-ssdp v0.0.6 // indirect

View File

@ -377,8 +377,6 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY=
github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o=
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=

View File

@ -29,5 +29,5 @@ func Mount(ipfs *core.IpfsNode, ipnsmp, ipfsmp string) (mount.Mount, error) {
return nil, err
}
return mount.NewMount(ipfs.Process, fsys, ipnsmp, allowOther)
return mount.NewMount(fsys, ipnsmp, allowOther)
}

View File

@ -17,5 +17,5 @@ func Mount(ipfs *core.IpfsNode, mountpoint string) (mount.Mount, error) {
}
allowOther := cfg.Mounts.FuseAllowOther
fsys := NewFileSystem(ipfs)
return mount.NewMount(ipfs.Process, fsys, mountpoint, allowOther)
return mount.NewMount(fsys, mountpoint, allowOther)
}

View File

@ -11,7 +11,6 @@ import (
"bazil.org/fuse"
"bazil.org/fuse/fs"
"github.com/jbenet/goprocess"
)
var ErrNotMounted = errors.New("not mounted")
@ -25,12 +24,12 @@ type mount struct {
active bool
activeLock *sync.RWMutex
proc goprocess.Process
unmountOnce sync.Once
}
// Mount mounts a fuse fs.FS at a given location, and returns a Mount instance.
// parent is a ContextGroup to bind the mount's ContextGroup to.
func NewMount(p goprocess.Process, fsys fs.FS, mountpoint string, allowOther bool) (Mount, error) {
// ctx is parent is a ContextGroup to bind the mount's ContextGroup to.
func NewMount(fsys fs.FS, mountpoint string, allowOther bool) (Mount, error) {
var conn *fuse.Conn
var err error
@ -54,12 +53,10 @@ func NewMount(p goprocess.Process, fsys fs.FS, mountpoint string, allowOther boo
filesys: fsys,
active: false,
activeLock: &sync.RWMutex{},
proc: goprocess.WithParent(p), // link it to parent.
}
m.proc.SetTeardown(m.unmount)
// launch the mounting process.
if err := m.mount(); err != nil {
if err = m.mount(); err != nil {
_ = m.Unmount() // just in case.
return nil, err
}
@ -135,10 +132,6 @@ func (m *mount) unmount() error {
return nil
}
func (m *mount) Process() goprocess.Process {
return m.proc
}
func (m *mount) MountPoint() string {
return m.mpoint
}
@ -148,8 +141,12 @@ func (m *mount) Unmount() error {
return ErrNotMounted
}
// call Process Close(), which calls unmount() exactly once.
return m.proc.Close()
var err error
m.unmountOnce.Do(func() {
err = m.unmount()
})
return err
}
func (m *mount) IsActive() bool {

View File

@ -9,7 +9,6 @@ import (
"time"
logging "github.com/ipfs/go-log/v2"
goprocess "github.com/jbenet/goprocess"
)
var log = logging.Logger("mount")
@ -26,10 +25,6 @@ type Mount interface {
// Checks if the mount is still active.
IsActive() bool
// Process returns the mount's Process to be able to link it
// to other processes. Unmount upon closing.
Process() goprocess.Process
}
// ForceUnmount attempts to forcibly unmount a given mount.

View File

@ -12,3 +12,7 @@ import (
func Mount(node *core.IpfsNode, fsdir, nsdir, mfsdir string) error {
return errors.New("not compiled in")
}
func Unmount(node *core.IpfsNode) {
return
}

View File

@ -12,3 +12,7 @@ import (
func Mount(node *core.IpfsNode, fsdir, nsdir, mfsdir string) error {
return errors.New("FUSE not supported on OpenBSD or NetBSD. See #5334 (https://github.com/ipfs/kubo/issues/5334).")
}
func Unmount(node *core.IpfsNode) {
return
}

View File

@ -36,18 +36,7 @@ func Mount(node *core.IpfsNode, fsdir, nsdir, mfsdir string) error {
// check if we already have live mounts.
// if the user said "Mount", then there must be something wrong.
// so, close them and try again.
if node.Mounts.Ipfs != nil && node.Mounts.Ipfs.IsActive() {
// best effort
_ = node.Mounts.Ipfs.Unmount()
}
if node.Mounts.Ipns != nil && node.Mounts.Ipns.IsActive() {
// best effort
_ = node.Mounts.Ipns.Unmount()
}
if node.Mounts.Mfs != nil && node.Mounts.Mfs.IsActive() {
// best effort
_ = node.Mounts.Mfs.Unmount()
}
Unmount(node)
if err := platformFuseChecks(node); err != nil {
return err
@ -56,6 +45,27 @@ func Mount(node *core.IpfsNode, fsdir, nsdir, mfsdir string) error {
return doMount(node, fsdir, nsdir, mfsdir)
}
func Unmount(node *core.IpfsNode) {
if node.Mounts.Ipfs != nil && node.Mounts.Ipfs.IsActive() {
// best effort
if err := node.Mounts.Ipfs.Unmount(); err != nil {
log.Errorf("error unmounting IPFS: %s", err)
}
}
if node.Mounts.Ipns != nil && node.Mounts.Ipns.IsActive() {
// best effort
if err := node.Mounts.Ipns.Unmount(); err != nil {
log.Errorf("error unmounting IPNS: %s", err)
}
}
if node.Mounts.Mfs != nil && node.Mounts.Mfs.IsActive() {
// best effort
if err := node.Mounts.Mfs.Unmount(); err != nil {
log.Errorf("error unmounting MFS: %s", err)
}
}
}
func doMount(node *core.IpfsNode, fsdir, nsdir, mfsdir string) error {
fmtFuseErr := func(err error, mountpoint string) error {
s := err.Error()

View File

@ -9,3 +9,9 @@ func Mount(node *core.IpfsNode, fsdir, nsdir, mfsdir string) error {
// currently a no-op, but we don't want to return an error
return nil
}
func Unmount(node *core.IpfsNode) {
// TODO
// currently a no-op
return
}

View File

@ -17,5 +17,5 @@ func Mount(ipfs *core.IpfsNode, mountpoint string) (mount.Mount, error) {
}
allowOther := cfg.Mounts.FuseAllowOther
fsys := NewFileSystem(ipfs)
return mount.NewMount(ipfs.Process, fsys, mountpoint, allowOther)
return mount.NewMount(fsys, mountpoint, allowOther)
}

1
go.mod
View File

@ -49,7 +49,6 @@ require (
github.com/ipld/go-ipld-prime v0.21.0
github.com/ipshipyard/p2p-forge v0.6.1
github.com/jbenet/go-temp-err-catcher v0.1.0
github.com/jbenet/goprocess v0.1.4
github.com/julienschmidt/httprouter v1.3.0
github.com/libp2p/go-doh-resolver v0.5.0
github.com/libp2p/go-libp2p v0.42.1

3
go.sum
View File

@ -443,14 +443,11 @@ github.com/ipshipyard/p2p-forge v0.6.1 h1:987/hUC1YxI56CcMX6iTB+9BLjFV0d2SJnig9Z
github.com/ipshipyard/p2p-forge v0.6.1/go.mod h1:pj8Zcs+ex5OMq5a1bFLHqW0oL3qYO0v5eGLZmit0l7U=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jbenet/go-cienv v0.1.0 h1:Vc/s0QbQtoxX8MwwSLWWh+xNNZvM3Lw7NsTcHrvvhMc=
github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA=
github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABoLk/+KKHggpk=
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY=
github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o=
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=

View File

@ -1,10 +1,20 @@
package cli
import (
"bytes"
"crypto/rand"
"fmt"
"io"
"net/http"
"os/exec"
"testing"
"time"
"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/stretchr/testify/require"
)
func TestDaemon(t *testing.T) {
@ -22,4 +32,125 @@ func TestDaemon(t *testing.T) {
node.StopDaemon()
})
t.Run("daemon shuts down gracefully with active operations", func(t *testing.T) {
t.Parallel()
// Start daemon with multiple components active via config
node := harness.NewT(t).NewNode().Init()
// Enable experimental features and pubsub via config
node.UpdateConfig(func(cfg *config.Config) {
cfg.Pubsub.Enabled = config.True // Instead of --enable-pubsub-experiment
cfg.Experimental.P2pHttpProxy = true // Enable P2P HTTP proxy
cfg.Experimental.GatewayOverLibp2p = true // Enable gateway over libp2p
})
node.StartDaemon("--enable-gc")
// Start background operations to simulate real daemon workload:
// 1. "ipfs add" simulates content onboarding/ingestion work
// 2. Gateway request simulates content retrieval and gateway processing work
// Background operation 1: Continuous add of random data to simulate onboarding
addDone := make(chan struct{})
go func() {
defer close(addDone)
// Start the add command asynchronously
res := node.Runner.Run(harness.RunRequest{
Path: node.IPFSBin,
Args: []string{"add", "--progress=false", "-"},
RunFunc: (*exec.Cmd).Start,
CmdOpts: []harness.CmdOpt{
harness.RunWithStdin(&infiniteReader{}),
},
})
// Wait for command to finish (when daemon stops)
if res.Cmd != nil {
_ = res.Cmd.Wait() // Ignore error, expect command to be killed during shutdown
}
}()
// Background operation 2: Gateway CAR request to simulate retrieval work
gatewayDone := make(chan struct{})
go func() {
defer close(gatewayDone)
// First add a file sized to ensure gateway request takes ~1 minute
largeData := make([]byte, 512*1024) // 512KB of data
_, _ = rand.Read(largeData) // Always succeeds for crypto/rand
testCID := node.IPFSAdd(bytes.NewReader(largeData))
// Get gateway address from config
cfg := node.ReadConfig()
gatewayMaddr, err := multiaddr.NewMultiaddr(cfg.Addresses.Gateway[0])
if err != nil {
return
}
gatewayAddr, err := manet.ToNetAddr(gatewayMaddr)
if err != nil {
return
}
// Request CAR but slow reading to simulate heavy gateway load
gatewayURL := fmt.Sprintf("http://%s/ipfs/%s?format=car", gatewayAddr, testCID)
client := &http.Client{Timeout: 90 * time.Second}
resp, err := client.Get(gatewayURL)
if err == nil {
defer resp.Body.Close()
// Read response slowly: 512KB ÷ 1KB × 125ms = ~64 seconds (1+ minute) total
// This ensures operation is still active when we shutdown at 2 seconds
buf := make([]byte, 1024) // 1KB buffer
for {
if _, err := io.ReadFull(resp.Body, buf); err != nil {
return
}
time.Sleep(125 * time.Millisecond) // 125ms delay = ~64s total for 512KB
}
}
}()
// Let operations run for 2 seconds to ensure they're active
time.Sleep(2 * time.Second)
// Trigger graceful shutdown
shutdownStart := time.Now()
node.StopDaemon()
shutdownDuration := time.Since(shutdownStart)
// Verify clean shutdown:
// - Daemon should stop within reasonable time (not hang)
require.Less(t, shutdownDuration, 10*time.Second, "daemon should shut down within 10 seconds")
// Wait for background operations to complete (with timeout)
select {
case <-addDone:
// Good, add operation terminated
case <-time.After(5 * time.Second):
t.Error("add operation did not terminate within 5 seconds after daemon shutdown")
}
select {
case <-gatewayDone:
// Good, gateway operation terminated
case <-time.After(5 * time.Second):
t.Error("gateway operation did not terminate within 5 seconds after daemon shutdown")
}
// Verify we can restart with same repo (no lock issues)
node.StartDaemon()
node.StopDaemon()
})
}
// infiniteReader provides an infinite stream of random data
type infiniteReader struct{}
func (r *infiniteReader) Read(p []byte) (n int, err error) {
_, _ = rand.Read(p) // Always succeeds for crypto/rand
time.Sleep(50 * time.Millisecond) // Rate limit to simulate steady stream
return len(p), nil
}

166
test/cli/fuse_test.go Normal file
View File

@ -0,0 +1,166 @@
package cli
import (
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"testing"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/ipfs/kubo/test/cli/testutils"
"github.com/stretchr/testify/require"
)
func TestFUSE(t *testing.T) {
testutils.RequiresFUSE(t)
t.Parallel()
t.Run("mount and unmount work correctly", func(t *testing.T) {
t.Parallel()
// Create a node and start daemon
node := harness.NewT(t).NewNode().Init()
node.StartDaemon()
// Create mount directories in the node's working directory
nodeDir := node.Dir
ipfsMount := filepath.Join(nodeDir, "ipfs")
ipnsMount := filepath.Join(nodeDir, "ipns")
mfsMount := filepath.Join(nodeDir, "mfs")
err := os.MkdirAll(ipfsMount, 0755)
require.NoError(t, err)
err = os.MkdirAll(ipnsMount, 0755)
require.NoError(t, err)
err = os.MkdirAll(mfsMount, 0755)
require.NoError(t, err)
// Ensure any existing mounts are cleaned up first
failOnError := false // mount points might not exist from previous runs
doUnmount(t, ipfsMount, failOnError)
doUnmount(t, ipnsMount, failOnError)
doUnmount(t, mfsMount, failOnError)
// Test mount operation
result := node.IPFS("mount", "-f", ipfsMount, "-n", ipnsMount, "-m", mfsMount)
// Verify mount output
expectedOutput := "IPFS mounted at: " + ipfsMount + "\n" +
"IPNS mounted at: " + ipnsMount + "\n" +
"MFS mounted at: " + mfsMount + "\n"
require.Equal(t, expectedOutput, result.Stdout.String())
// Test basic MFS functionality via FUSE mount
testFile := filepath.Join(mfsMount, "testfile")
testContent := "hello fuse world"
// Create file via FUSE mount
err = os.WriteFile(testFile, []byte(testContent), 0644)
require.NoError(t, err)
// Verify file appears in MFS via IPFS commands
result = node.IPFS("files", "ls", "/")
require.Contains(t, result.Stdout.String(), "testfile")
// Read content back via MFS FUSE mount
readContent, err := os.ReadFile(testFile)
require.NoError(t, err)
require.Equal(t, testContent, string(readContent))
// Get the CID of the MFS file
result = node.IPFS("files", "stat", "/testfile", "--format=<hash>")
fileCID := strings.TrimSpace(result.Stdout.String())
require.NotEmpty(t, fileCID, "should have a CID for the MFS file")
// Read the same content via IPFS FUSE mount using the CID
ipfsFile := filepath.Join(ipfsMount, fileCID)
ipfsContent, err := os.ReadFile(ipfsFile)
require.NoError(t, err)
require.Equal(t, testContent, string(ipfsContent), "content should match between MFS and IPFS mounts")
// Verify both FUSE mounts return identical data
require.Equal(t, readContent, ipfsContent, "MFS and IPFS FUSE mounts should return identical data")
// Test that mount directories cannot be removed while mounted
err = os.Remove(ipfsMount)
require.Error(t, err, "should not be able to remove mounted directory")
// Stop daemon - this should trigger automatic unmount via context cancellation
node.StopDaemon()
// Daemon shutdown should handle unmount synchronously via context.AfterFunc
// Verify directories can now be removed (indicating successful unmount)
err = os.Remove(ipfsMount)
require.NoError(t, err, "should be able to remove directory after unmount")
err = os.Remove(ipnsMount)
require.NoError(t, err, "should be able to remove directory after unmount")
err = os.Remove(mfsMount)
require.NoError(t, err, "should be able to remove directory after unmount")
})
t.Run("explicit unmount works", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
node.StartDaemon()
// Create mount directories
nodeDir := node.Dir
ipfsMount := filepath.Join(nodeDir, "ipfs")
ipnsMount := filepath.Join(nodeDir, "ipns")
mfsMount := filepath.Join(nodeDir, "mfs")
err := os.MkdirAll(ipfsMount, 0755)
require.NoError(t, err)
err = os.MkdirAll(ipnsMount, 0755)
require.NoError(t, err)
err = os.MkdirAll(mfsMount, 0755)
require.NoError(t, err)
// Clean up any existing mounts
failOnError := false // mount points might not exist from previous runs
doUnmount(t, ipfsMount, failOnError)
doUnmount(t, ipnsMount, failOnError)
doUnmount(t, mfsMount, failOnError)
// Mount
node.IPFS("mount", "-f", ipfsMount, "-n", ipnsMount, "-m", mfsMount)
// Explicit unmount via platform-specific command
failOnError = true // test that explicit unmount works correctly
doUnmount(t, ipfsMount, failOnError)
doUnmount(t, ipnsMount, failOnError)
doUnmount(t, mfsMount, failOnError)
// Verify directories can be removed after explicit unmount
err = os.Remove(ipfsMount)
require.NoError(t, err)
err = os.Remove(ipnsMount)
require.NoError(t, err)
err = os.Remove(mfsMount)
require.NoError(t, err)
node.StopDaemon()
})
}
// doUnmount performs platform-specific unmount, similar to sharness do_umount
// failOnError: if true, unmount errors cause test failure; if false, errors are ignored (useful for cleanup)
func doUnmount(t *testing.T, mountPoint string, failOnError bool) {
t.Helper()
var cmd *exec.Cmd
if runtime.GOOS == "linux" {
// fusermount -u: unmount filesystem (strict - fails if busy)
cmd = exec.Command("fusermount", "-u", mountPoint)
} else {
cmd = exec.Command("umount", mountPoint)
}
err := cmd.Run()
if err != nil && failOnError {
t.Fatalf("failed to unmount %s: %v", mountPoint, err)
}
}

View File

@ -2,6 +2,7 @@ package testutils
import (
"os"
"os/exec"
"runtime"
"testing"
)
@ -13,9 +14,48 @@ func RequiresDocker(t *testing.T) {
}
func RequiresFUSE(t *testing.T) {
if os.Getenv("TEST_FUSE") != "1" {
t.SkipNow()
// Skip if FUSE tests are explicitly disabled
if os.Getenv("TEST_FUSE") == "0" {
t.Skip("FUSE tests disabled via TEST_FUSE=0")
}
// If TEST_FUSE=1 is set, always run (for backwards compatibility)
if os.Getenv("TEST_FUSE") == "1" {
return
}
// Auto-detect FUSE availability based on platform and tools
if !isFUSEAvailable(t) {
t.Skip("FUSE not available (no fusermount/umount found or unsupported platform)")
}
}
// isFUSEAvailable checks if FUSE is available on the current system
func isFUSEAvailable(t *testing.T) bool {
t.Helper()
// Check platform support
switch runtime.GOOS {
case "linux", "darwin", "freebsd", "openbsd", "netbsd":
// These platforms potentially support FUSE
case "windows":
// Windows has limited FUSE support via WinFsp, but skip for now
return false
default:
// Unknown platform, assume no FUSE support
return false
}
// Check for required unmount tools
var unmountCmd string
if runtime.GOOS == "linux" {
unmountCmd = "fusermount"
} else {
unmountCmd = "umount"
}
_, err := exec.LookPath(unmountCmd)
return err == nil
}
func RequiresExpensive(t *testing.T) {