diff --git a/cmd/ipfs/kubo/daemon.go b/cmd/ipfs/kubo/daemon.go index c6fb6b8cb..e52e68906 100644 --- a/cmd/ipfs/kubo/daemon.go +++ b/cmd/ipfs/kubo/daemon.go @@ -35,7 +35,6 @@ import ( fsrepo "github.com/ipfs/kubo/repo/fsrepo" "github.com/ipfs/kubo/repo/fsrepo/migrations" "github.com/ipfs/kubo/repo/fsrepo/migrations/ipfsfetcher" - goprocess "github.com/jbenet/goprocess" p2pcrypto "github.com/libp2p/go-libp2p/core/crypto" pnet "github.com/libp2p/go-libp2p/core/pnet" "github.com/libp2p/go-libp2p/core/protocol" @@ -537,10 +536,19 @@ take effect. if err != nil { return err } + + pluginErrc := make(chan error, 1) select { - case <-node.Process.Closing(): + case <-node.Context().Done(): + close(pluginErrc) default: - node.Process.AddChild(goprocess.WithTeardown(cctx.Plugins.Close)) + context.AfterFunc(node.Context(), func() { + err := cctx.Plugins.Close() + if err != nil { + pluginErrc <- fmt.Errorf("closing plugins: %w", err) + } + close(pluginErrc) + }) } // construct api endpoint - every time @@ -558,6 +566,11 @@ take effect. if err := mountFuse(req, cctx); err != nil { return err } + defer func() { + if _err != nil { + nodeMount.Unmount(node) + } + }() } // repo blockstore GC - if --enable-gc flag is present @@ -703,10 +716,17 @@ take effect. log.Fatal("Support for IPFS_REUSEPORT was removed. Use LIBP2P_TCP_REUSEPORT instead.") } + unmountErrc := make(chan error) + context.AfterFunc(node.Context(), func() { + <-node.Context().Done() + nodeMount.Unmount(node) + close(unmountErrc) + }) + // collect long-running errors and block for shutdown // TODO(cryptix): our fuse currently doesn't follow this pattern for graceful shutdown var errs error - for err := range merge(apiErrc, gwErrc, gcErrc, p2pGwErrc) { + for err := range merge(apiErrc, gwErrc, gcErrc, p2pGwErrc, pluginErrc, unmountErrc) { if err != nil { errs = multierr.Append(errs, err) } @@ -1053,14 +1073,13 @@ func serveTrustlessGatewayOverLibp2p(cctx *oldcmds.Context) (<-chan error, error errc := make(chan error, 1) go func() { - defer close(errc) errc <- h.Serve() + close(errc) }() - go func() { - <-node.Process.Closing() + context.AfterFunc(node.Context(), func() { h.Close() - }() + }) return errc, nil } @@ -1145,14 +1164,14 @@ func maybeRunGC(req *cmds.Request, node *core.IpfsNode) (<-chan error, error) { return errc, nil } -// merge does fan-in of multiple read-only error channels -// taken from http://blog.golang.org/pipelines +// merge does fan-in of multiple read-only error channels. func merge(cs ...<-chan error) <-chan error { var wg sync.WaitGroup out := make(chan error) - // Start an output goroutine for each input channel in cs. output - // copies values from c to out until c is closed, then calls wg.Done. + // Start a goroutine for each input channel in cs, that copies values from + // the input channel to the output channel until the input channel is + // closed. output := func(c <-chan error) { for n := range c { out <- n @@ -1166,8 +1185,8 @@ func merge(cs ...<-chan error) <-chan error { } } - // Start a goroutine to close out once all the output goroutines are - // done. This must start after the wg.Add call. + // Start a goroutine to close out once all the output goroutines, and other + // things to wait on, are done. go func() { wg.Wait() close(out) @@ -1238,8 +1257,6 @@ Visit https://github.com/ipfs/kubo/releases or https://dist.ipfs.tech/#kubo and select { case <-ctx.Done(): return - case <-nd.Process.Closing(): - return case <-ticker.C: continue } diff --git a/cmd/ipfswatch/main.go b/cmd/ipfswatch/main.go index 6850f6423..3178cf564 100644 --- a/cmd/ipfswatch/main.go +++ b/cmd/ipfswatch/main.go @@ -21,7 +21,6 @@ import ( fsnotify "github.com/fsnotify/fsnotify" "github.com/ipfs/boxo/files" - process "github.com/jbenet/goprocess" ) var ( @@ -54,7 +53,6 @@ func main() { } func run(ipfsPath, watchPath string) error { - proc := process.WithParent(process.Background()) log.Printf("running IPFSWatch on '%s' using repo at '%s'...", watchPath, ipfsPath) ipfsPath, err := fsutil.ExpandHome(ipfsPath) @@ -99,11 +97,11 @@ func run(ipfsPath, watchPath string) error { corehttp.WebUIOption, corehttp.CommandsOption(cmdCtx(node, ipfsPath)), } - proc.Go(func(p process.Process) { + go func() { if err := corehttp.ListenAndServe(node, addr, opts...); err != nil { return } - }) + }() } interrupts := make(chan os.Signal, 1) @@ -137,7 +135,7 @@ func run(ipfsPath, watchPath string) error { } } } - proc.Go(func(p process.Process) { + go func() { file, err := os.Open(e.Name) if err != nil { log.Println(err) @@ -162,7 +160,7 @@ func run(ipfsPath, watchPath string) error { log.Println(err) } log.Printf("added %s... key: %s", e.Name, k) - }) + }() } case err := <-watcher.Errors: log.Println(err) diff --git a/core/core.go b/core/core.go index 085ca2c5f..c693600f7 100644 --- a/core/core.go +++ b/core/core.go @@ -29,7 +29,6 @@ import ( provider "github.com/ipfs/boxo/provider" ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" - goprocess "github.com/jbenet/goprocess" ddht "github.com/libp2p/go-libp2p-kad-dht/dual" pubsub "github.com/libp2p/go-libp2p-pubsub" psrouter "github.com/libp2p/go-libp2p-pubsub-router" @@ -119,8 +118,7 @@ type IpfsNode struct { P2P *p2p.P2P `optional:"true"` - Process goprocess.Process - ctx context.Context + ctx context.Context stop func() error diff --git a/core/corehttp/corehttp.go b/core/corehttp/corehttp.go index 595a0aa5f..344991923 100644 --- a/core/corehttp/corehttp.go +++ b/core/corehttp/corehttp.go @@ -13,8 +13,6 @@ import ( logging "github.com/ipfs/go-log/v2" core "github.com/ipfs/kubo/core" - "github.com/jbenet/goprocess" - periodicproc "github.com/jbenet/goprocess/periodic" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" ) @@ -97,7 +95,7 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error } select { - case <-node.Process.Closing(): + case <-node.Context().Done(): return fmt.Errorf("failed to start server, process closing") default: } @@ -107,20 +105,31 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error } var serverError error - serverProc := node.Process.Go(func(p goprocess.Process) { + serverClosed := make(chan struct{}) + go func() { serverError = server.Serve(lis) - }) + close(serverClosed) + }() // wait for server to exit. select { - case <-serverProc.Closed(): + case <-serverClosed: // if node being closed before server exits, close server - case <-node.Process.Closing(): + case <-node.Context().Done(): log.Infof("server at %s terminating...", addr) - warnProc := periodicproc.Tick(5*time.Second, func(_ goprocess.Process) { - log.Infof("waiting for server at %s to terminate...", addr) - }) + go func() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + log.Infof("waiting for server at %s to terminate...", addr) + case <-serverClosed: + return + } + } + }() // This timeout shouldn't be necessary if all of our commands // are obeying their contexts but we should have *some* timeout. @@ -130,10 +139,8 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error // Should have already closed but we still need to wait for it // to set the error. - <-serverProc.Closed() + <-serverClosed serverError = err - - warnProc.Close() } log.Infof("server at %s terminated", addr) diff --git a/core/node/groups.go b/core/node/groups.go index 9925d3bd6..5b16b5527 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -445,8 +445,6 @@ func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option { return fx.Options( bcfgOpts, - fx.Provide(baseProcess), - Storage(bcfg, cfg), Identity(cfg), IPNS, diff --git a/core/node/helpers.go b/core/node/helpers.go index 491d627bf..05cccfd01 100644 --- a/core/node/helpers.go +++ b/core/node/helpers.go @@ -4,7 +4,6 @@ import ( "context" "errors" - "github.com/jbenet/goprocess" "go.uber.org/fx" ) @@ -55,14 +54,3 @@ func maybeInvoke(opt interface{}, enable bool) fx.Option { } return fx.Options() } - -// baseProcess creates a goprocess which is closed when the lifecycle signals it to stop -func baseProcess(lc fx.Lifecycle) goprocess.Process { - p := goprocess.WithParent(goprocess.Background()) - lc.Append(fx.Hook{ - OnStop: func(_ context.Context) error { - return p.Close() - }, - }) - return p -} diff --git a/docs/changelogs/v0.37.md b/docs/changelogs/v0.37.md index 77406b8c9..72e15d2da 100644 --- a/docs/changelogs/v0.37.md +++ b/docs/changelogs/v0.37.md @@ -11,7 +11,7 @@ This release was brought to you by the [Interplanetary Shipyard](https://ipship - [Overview](#overview) - [๐Ÿ”ฆ Highlights](#-highlights) - [Clear provide queue when reprovide strategy changes](#clear-provide-queue-when-reprovide-strategy-changes) - - [Remove unnecessary packages from thirdparty](#remove-unnecessary-packages-from-thirdparty) + - [Removed unnecessary dependencies](#removed-unnecessary-dependencies) - [๐Ÿ“ฆ๏ธ Important dependency updates](#-important-dependency-updates) - [๐Ÿ“ Changelog](#-changelog) - [๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ Contributors](#-contributors) @@ -31,13 +31,16 @@ A new `ipfs provide clear` command also allows manual queue clearing for debuggi > [!NOTE] > Upgrading to Kubo 0.37 will automatically clear any preexisting provide queue. The next time `Reprovider.Interval` hits, `Reprovider.Strategy` will be executed on a clean slate, ensuring consistent behavior with your current configuration. -#### Remove unnecessary packages from thirdparty +#### Removed unnecessary dependencies -Removed unnecessary packages from the `thirdparty` area of kubo repositroy. +Kubo has been cleaned up by removing unnecessary dependencies and packages: - Removed `thirdparty/assert` (replaced by `github.com/stretchr/testify/require`) -- Removed `thirdparty/dir` (replaced by `misc/fsutil)` +- Removed `thirdparty/dir` (replaced by `misc/fsutil`) - Removed `thirdparty/notifier` (unused) +- Removed `goprocess` dependency (replaced with native Go `context` patterns) + +These changes reduce the dependency footprint while improving code maintainability and following Go best practices. #### ๐Ÿ“ฆ๏ธ Important dependency updates diff --git a/docs/examples/kubo-as-a-library/go.mod b/docs/examples/kubo-as-a-library/go.mod index 5c01ebefe..55ef762a4 100644 --- a/docs/examples/kubo-as-a-library/go.mod +++ b/docs/examples/kubo-as-a-library/go.mod @@ -102,7 +102,6 @@ require ( github.com/ipshipyard/p2p-forge v0.6.1 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect - github.com/jbenet/goprocess v0.1.4 // indirect github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/cpuid/v2 v2.2.10 // indirect github.com/koron/go-ssdp v0.0.6 // indirect diff --git a/docs/examples/kubo-as-a-library/go.sum b/docs/examples/kubo-as-a-library/go.sum index 40692a807..47158688c 100644 --- a/docs/examples/kubo-as-a-library/go.sum +++ b/docs/examples/kubo-as-a-library/go.sum @@ -377,8 +377,6 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY= github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= -github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= -github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= diff --git a/fuse/ipns/mount_unix.go b/fuse/ipns/mount_unix.go index 34a8eef51..8c8ea8afe 100644 --- a/fuse/ipns/mount_unix.go +++ b/fuse/ipns/mount_unix.go @@ -29,5 +29,5 @@ func Mount(ipfs *core.IpfsNode, ipnsmp, ipfsmp string) (mount.Mount, error) { return nil, err } - return mount.NewMount(ipfs.Process, fsys, ipnsmp, allowOther) + return mount.NewMount(fsys, ipnsmp, allowOther) } diff --git a/fuse/mfs/mount_unix.go b/fuse/mfs/mount_unix.go index 7fe72e8df..bd7021e28 100644 --- a/fuse/mfs/mount_unix.go +++ b/fuse/mfs/mount_unix.go @@ -17,5 +17,5 @@ func Mount(ipfs *core.IpfsNode, mountpoint string) (mount.Mount, error) { } allowOther := cfg.Mounts.FuseAllowOther fsys := NewFileSystem(ipfs) - return mount.NewMount(ipfs.Process, fsys, mountpoint, allowOther) + return mount.NewMount(fsys, mountpoint, allowOther) } diff --git a/fuse/mount/fuse.go b/fuse/mount/fuse.go index 02d733b89..e18c0b4a9 100644 --- a/fuse/mount/fuse.go +++ b/fuse/mount/fuse.go @@ -11,7 +11,6 @@ import ( "bazil.org/fuse" "bazil.org/fuse/fs" - "github.com/jbenet/goprocess" ) var ErrNotMounted = errors.New("not mounted") @@ -25,12 +24,12 @@ type mount struct { active bool activeLock *sync.RWMutex - proc goprocess.Process + unmountOnce sync.Once } // Mount mounts a fuse fs.FS at a given location, and returns a Mount instance. -// parent is a ContextGroup to bind the mount's ContextGroup to. -func NewMount(p goprocess.Process, fsys fs.FS, mountpoint string, allowOther bool) (Mount, error) { +// ctx is parent is a ContextGroup to bind the mount's ContextGroup to. +func NewMount(fsys fs.FS, mountpoint string, allowOther bool) (Mount, error) { var conn *fuse.Conn var err error @@ -54,12 +53,10 @@ func NewMount(p goprocess.Process, fsys fs.FS, mountpoint string, allowOther boo filesys: fsys, active: false, activeLock: &sync.RWMutex{}, - proc: goprocess.WithParent(p), // link it to parent. } - m.proc.SetTeardown(m.unmount) // launch the mounting process. - if err := m.mount(); err != nil { + if err = m.mount(); err != nil { _ = m.Unmount() // just in case. return nil, err } @@ -135,10 +132,6 @@ func (m *mount) unmount() error { return nil } -func (m *mount) Process() goprocess.Process { - return m.proc -} - func (m *mount) MountPoint() string { return m.mpoint } @@ -148,8 +141,12 @@ func (m *mount) Unmount() error { return ErrNotMounted } - // call Process Close(), which calls unmount() exactly once. - return m.proc.Close() + var err error + m.unmountOnce.Do(func() { + err = m.unmount() + }) + + return err } func (m *mount) IsActive() bool { diff --git a/fuse/mount/mount.go b/fuse/mount/mount.go index b9008bc46..ca10405fe 100644 --- a/fuse/mount/mount.go +++ b/fuse/mount/mount.go @@ -9,7 +9,6 @@ import ( "time" logging "github.com/ipfs/go-log/v2" - goprocess "github.com/jbenet/goprocess" ) var log = logging.Logger("mount") @@ -26,10 +25,6 @@ type Mount interface { // Checks if the mount is still active. IsActive() bool - - // Process returns the mount's Process to be able to link it - // to other processes. Unmount upon closing. - Process() goprocess.Process } // ForceUnmount attempts to forcibly unmount a given mount. diff --git a/fuse/node/mount_nofuse.go b/fuse/node/mount_nofuse.go index 7423cb24d..6d4e102e2 100644 --- a/fuse/node/mount_nofuse.go +++ b/fuse/node/mount_nofuse.go @@ -12,3 +12,7 @@ import ( func Mount(node *core.IpfsNode, fsdir, nsdir, mfsdir string) error { return errors.New("not compiled in") } + +func Unmount(node *core.IpfsNode) { + return +} diff --git a/fuse/node/mount_notsupp.go b/fuse/node/mount_notsupp.go index 79ac0e791..15f98c40e 100644 --- a/fuse/node/mount_notsupp.go +++ b/fuse/node/mount_notsupp.go @@ -12,3 +12,7 @@ import ( func Mount(node *core.IpfsNode, fsdir, nsdir, mfsdir string) error { return errors.New("FUSE not supported on OpenBSD or NetBSD. See #5334 (https://github.com/ipfs/kubo/issues/5334).") } + +func Unmount(node *core.IpfsNode) { + return +} diff --git a/fuse/node/mount_unix.go b/fuse/node/mount_unix.go index c628a85f4..6c63f6e50 100644 --- a/fuse/node/mount_unix.go +++ b/fuse/node/mount_unix.go @@ -36,18 +36,7 @@ func Mount(node *core.IpfsNode, fsdir, nsdir, mfsdir string) error { // check if we already have live mounts. // if the user said "Mount", then there must be something wrong. // so, close them and try again. - if node.Mounts.Ipfs != nil && node.Mounts.Ipfs.IsActive() { - // best effort - _ = node.Mounts.Ipfs.Unmount() - } - if node.Mounts.Ipns != nil && node.Mounts.Ipns.IsActive() { - // best effort - _ = node.Mounts.Ipns.Unmount() - } - if node.Mounts.Mfs != nil && node.Mounts.Mfs.IsActive() { - // best effort - _ = node.Mounts.Mfs.Unmount() - } + Unmount(node) if err := platformFuseChecks(node); err != nil { return err @@ -56,6 +45,27 @@ func Mount(node *core.IpfsNode, fsdir, nsdir, mfsdir string) error { return doMount(node, fsdir, nsdir, mfsdir) } +func Unmount(node *core.IpfsNode) { + if node.Mounts.Ipfs != nil && node.Mounts.Ipfs.IsActive() { + // best effort + if err := node.Mounts.Ipfs.Unmount(); err != nil { + log.Errorf("error unmounting IPFS: %s", err) + } + } + if node.Mounts.Ipns != nil && node.Mounts.Ipns.IsActive() { + // best effort + if err := node.Mounts.Ipns.Unmount(); err != nil { + log.Errorf("error unmounting IPNS: %s", err) + } + } + if node.Mounts.Mfs != nil && node.Mounts.Mfs.IsActive() { + // best effort + if err := node.Mounts.Mfs.Unmount(); err != nil { + log.Errorf("error unmounting MFS: %s", err) + } + } +} + func doMount(node *core.IpfsNode, fsdir, nsdir, mfsdir string) error { fmtFuseErr := func(err error, mountpoint string) error { s := err.Error() diff --git a/fuse/node/mount_windows.go b/fuse/node/mount_windows.go index 42e6bc10b..9f22fe59e 100644 --- a/fuse/node/mount_windows.go +++ b/fuse/node/mount_windows.go @@ -9,3 +9,9 @@ func Mount(node *core.IpfsNode, fsdir, nsdir, mfsdir string) error { // currently a no-op, but we don't want to return an error return nil } + +func Unmount(node *core.IpfsNode) { + // TODO + // currently a no-op + return +} diff --git a/fuse/readonly/mount_unix.go b/fuse/readonly/mount_unix.go index 19be37abe..0ee198409 100644 --- a/fuse/readonly/mount_unix.go +++ b/fuse/readonly/mount_unix.go @@ -17,5 +17,5 @@ func Mount(ipfs *core.IpfsNode, mountpoint string) (mount.Mount, error) { } allowOther := cfg.Mounts.FuseAllowOther fsys := NewFileSystem(ipfs) - return mount.NewMount(ipfs.Process, fsys, mountpoint, allowOther) + return mount.NewMount(fsys, mountpoint, allowOther) } diff --git a/go.mod b/go.mod index 79b803861..c7a89e9ef 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,6 @@ require ( github.com/ipld/go-ipld-prime v0.21.0 github.com/ipshipyard/p2p-forge v0.6.1 github.com/jbenet/go-temp-err-catcher v0.1.0 - github.com/jbenet/goprocess v0.1.4 github.com/julienschmidt/httprouter v1.3.0 github.com/libp2p/go-doh-resolver v0.5.0 github.com/libp2p/go-libp2p v0.42.1 diff --git a/go.sum b/go.sum index 1e225d83a..c8ab14151 100644 --- a/go.sum +++ b/go.sum @@ -443,14 +443,11 @@ github.com/ipshipyard/p2p-forge v0.6.1 h1:987/hUC1YxI56CcMX6iTB+9BLjFV0d2SJnig9Z github.com/ipshipyard/p2p-forge v0.6.1/go.mod h1:pj8Zcs+ex5OMq5a1bFLHqW0oL3qYO0v5eGLZmit0l7U= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= -github.com/jbenet/go-cienv v0.1.0 h1:Vc/s0QbQtoxX8MwwSLWWh+xNNZvM3Lw7NsTcHrvvhMc= github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABoLk/+KKHggpk= github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY= github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= -github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= -github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= diff --git a/test/cli/daemon_test.go b/test/cli/daemon_test.go index 7a8c583a2..f87a21651 100644 --- a/test/cli/daemon_test.go +++ b/test/cli/daemon_test.go @@ -1,10 +1,20 @@ package cli import ( + "bytes" + "crypto/rand" + "fmt" + "io" + "net/http" "os/exec" "testing" + "time" + "github.com/ipfs/kubo/config" "github.com/ipfs/kubo/test/cli/harness" + "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" + "github.com/stretchr/testify/require" ) func TestDaemon(t *testing.T) { @@ -22,4 +32,125 @@ func TestDaemon(t *testing.T) { node.StopDaemon() }) + + t.Run("daemon shuts down gracefully with active operations", func(t *testing.T) { + t.Parallel() + + // Start daemon with multiple components active via config + node := harness.NewT(t).NewNode().Init() + + // Enable experimental features and pubsub via config + node.UpdateConfig(func(cfg *config.Config) { + cfg.Pubsub.Enabled = config.True // Instead of --enable-pubsub-experiment + cfg.Experimental.P2pHttpProxy = true // Enable P2P HTTP proxy + cfg.Experimental.GatewayOverLibp2p = true // Enable gateway over libp2p + }) + + node.StartDaemon("--enable-gc") + + // Start background operations to simulate real daemon workload: + // 1. "ipfs add" simulates content onboarding/ingestion work + // 2. Gateway request simulates content retrieval and gateway processing work + + // Background operation 1: Continuous add of random data to simulate onboarding + addDone := make(chan struct{}) + go func() { + defer close(addDone) + + // Start the add command asynchronously + res := node.Runner.Run(harness.RunRequest{ + Path: node.IPFSBin, + Args: []string{"add", "--progress=false", "-"}, + RunFunc: (*exec.Cmd).Start, + CmdOpts: []harness.CmdOpt{ + harness.RunWithStdin(&infiniteReader{}), + }, + }) + + // Wait for command to finish (when daemon stops) + if res.Cmd != nil { + _ = res.Cmd.Wait() // Ignore error, expect command to be killed during shutdown + } + }() + + // Background operation 2: Gateway CAR request to simulate retrieval work + gatewayDone := make(chan struct{}) + go func() { + defer close(gatewayDone) + + // First add a file sized to ensure gateway request takes ~1 minute + largeData := make([]byte, 512*1024) // 512KB of data + _, _ = rand.Read(largeData) // Always succeeds for crypto/rand + testCID := node.IPFSAdd(bytes.NewReader(largeData)) + + // Get gateway address from config + cfg := node.ReadConfig() + gatewayMaddr, err := multiaddr.NewMultiaddr(cfg.Addresses.Gateway[0]) + if err != nil { + return + } + gatewayAddr, err := manet.ToNetAddr(gatewayMaddr) + if err != nil { + return + } + + // Request CAR but slow reading to simulate heavy gateway load + gatewayURL := fmt.Sprintf("http://%s/ipfs/%s?format=car", gatewayAddr, testCID) + + client := &http.Client{Timeout: 90 * time.Second} + resp, err := client.Get(gatewayURL) + if err == nil { + defer resp.Body.Close() + // Read response slowly: 512KB รท 1KB ร— 125ms = ~64 seconds (1+ minute) total + // This ensures operation is still active when we shutdown at 2 seconds + buf := make([]byte, 1024) // 1KB buffer + for { + if _, err := io.ReadFull(resp.Body, buf); err != nil { + return + } + time.Sleep(125 * time.Millisecond) // 125ms delay = ~64s total for 512KB + } + } + }() + + // Let operations run for 2 seconds to ensure they're active + time.Sleep(2 * time.Second) + + // Trigger graceful shutdown + shutdownStart := time.Now() + node.StopDaemon() + shutdownDuration := time.Since(shutdownStart) + + // Verify clean shutdown: + // - Daemon should stop within reasonable time (not hang) + require.Less(t, shutdownDuration, 10*time.Second, "daemon should shut down within 10 seconds") + + // Wait for background operations to complete (with timeout) + select { + case <-addDone: + // Good, add operation terminated + case <-time.After(5 * time.Second): + t.Error("add operation did not terminate within 5 seconds after daemon shutdown") + } + + select { + case <-gatewayDone: + // Good, gateway operation terminated + case <-time.After(5 * time.Second): + t.Error("gateway operation did not terminate within 5 seconds after daemon shutdown") + } + + // Verify we can restart with same repo (no lock issues) + node.StartDaemon() + node.StopDaemon() + }) +} + +// infiniteReader provides an infinite stream of random data +type infiniteReader struct{} + +func (r *infiniteReader) Read(p []byte) (n int, err error) { + _, _ = rand.Read(p) // Always succeeds for crypto/rand + time.Sleep(50 * time.Millisecond) // Rate limit to simulate steady stream + return len(p), nil } diff --git a/test/cli/fuse_test.go b/test/cli/fuse_test.go new file mode 100644 index 000000000..6182a069a --- /dev/null +++ b/test/cli/fuse_test.go @@ -0,0 +1,166 @@ +package cli + +import ( + "os" + "os/exec" + "path/filepath" + "runtime" + "strings" + "testing" + + "github.com/ipfs/kubo/test/cli/harness" + "github.com/ipfs/kubo/test/cli/testutils" + "github.com/stretchr/testify/require" +) + +func TestFUSE(t *testing.T) { + testutils.RequiresFUSE(t) + t.Parallel() + + t.Run("mount and unmount work correctly", func(t *testing.T) { + t.Parallel() + + // Create a node and start daemon + node := harness.NewT(t).NewNode().Init() + node.StartDaemon() + + // Create mount directories in the node's working directory + nodeDir := node.Dir + ipfsMount := filepath.Join(nodeDir, "ipfs") + ipnsMount := filepath.Join(nodeDir, "ipns") + mfsMount := filepath.Join(nodeDir, "mfs") + + err := os.MkdirAll(ipfsMount, 0755) + require.NoError(t, err) + err = os.MkdirAll(ipnsMount, 0755) + require.NoError(t, err) + err = os.MkdirAll(mfsMount, 0755) + require.NoError(t, err) + + // Ensure any existing mounts are cleaned up first + failOnError := false // mount points might not exist from previous runs + doUnmount(t, ipfsMount, failOnError) + doUnmount(t, ipnsMount, failOnError) + doUnmount(t, mfsMount, failOnError) + + // Test mount operation + result := node.IPFS("mount", "-f", ipfsMount, "-n", ipnsMount, "-m", mfsMount) + + // Verify mount output + expectedOutput := "IPFS mounted at: " + ipfsMount + "\n" + + "IPNS mounted at: " + ipnsMount + "\n" + + "MFS mounted at: " + mfsMount + "\n" + require.Equal(t, expectedOutput, result.Stdout.String()) + + // Test basic MFS functionality via FUSE mount + testFile := filepath.Join(mfsMount, "testfile") + testContent := "hello fuse world" + + // Create file via FUSE mount + err = os.WriteFile(testFile, []byte(testContent), 0644) + require.NoError(t, err) + + // Verify file appears in MFS via IPFS commands + result = node.IPFS("files", "ls", "/") + require.Contains(t, result.Stdout.String(), "testfile") + + // Read content back via MFS FUSE mount + readContent, err := os.ReadFile(testFile) + require.NoError(t, err) + require.Equal(t, testContent, string(readContent)) + + // Get the CID of the MFS file + result = node.IPFS("files", "stat", "/testfile", "--format=") + fileCID := strings.TrimSpace(result.Stdout.String()) + require.NotEmpty(t, fileCID, "should have a CID for the MFS file") + + // Read the same content via IPFS FUSE mount using the CID + ipfsFile := filepath.Join(ipfsMount, fileCID) + ipfsContent, err := os.ReadFile(ipfsFile) + require.NoError(t, err) + require.Equal(t, testContent, string(ipfsContent), "content should match between MFS and IPFS mounts") + + // Verify both FUSE mounts return identical data + require.Equal(t, readContent, ipfsContent, "MFS and IPFS FUSE mounts should return identical data") + + // Test that mount directories cannot be removed while mounted + err = os.Remove(ipfsMount) + require.Error(t, err, "should not be able to remove mounted directory") + + // Stop daemon - this should trigger automatic unmount via context cancellation + node.StopDaemon() + + // Daemon shutdown should handle unmount synchronously via context.AfterFunc + + // Verify directories can now be removed (indicating successful unmount) + err = os.Remove(ipfsMount) + require.NoError(t, err, "should be able to remove directory after unmount") + err = os.Remove(ipnsMount) + require.NoError(t, err, "should be able to remove directory after unmount") + err = os.Remove(mfsMount) + require.NoError(t, err, "should be able to remove directory after unmount") + }) + + t.Run("explicit unmount works", func(t *testing.T) { + t.Parallel() + + node := harness.NewT(t).NewNode().Init() + node.StartDaemon() + + // Create mount directories + nodeDir := node.Dir + ipfsMount := filepath.Join(nodeDir, "ipfs") + ipnsMount := filepath.Join(nodeDir, "ipns") + mfsMount := filepath.Join(nodeDir, "mfs") + + err := os.MkdirAll(ipfsMount, 0755) + require.NoError(t, err) + err = os.MkdirAll(ipnsMount, 0755) + require.NoError(t, err) + err = os.MkdirAll(mfsMount, 0755) + require.NoError(t, err) + + // Clean up any existing mounts + failOnError := false // mount points might not exist from previous runs + doUnmount(t, ipfsMount, failOnError) + doUnmount(t, ipnsMount, failOnError) + doUnmount(t, mfsMount, failOnError) + + // Mount + node.IPFS("mount", "-f", ipfsMount, "-n", ipnsMount, "-m", mfsMount) + + // Explicit unmount via platform-specific command + failOnError = true // test that explicit unmount works correctly + doUnmount(t, ipfsMount, failOnError) + doUnmount(t, ipnsMount, failOnError) + doUnmount(t, mfsMount, failOnError) + + // Verify directories can be removed after explicit unmount + err = os.Remove(ipfsMount) + require.NoError(t, err) + err = os.Remove(ipnsMount) + require.NoError(t, err) + err = os.Remove(mfsMount) + require.NoError(t, err) + + node.StopDaemon() + }) +} + +// doUnmount performs platform-specific unmount, similar to sharness do_umount +// failOnError: if true, unmount errors cause test failure; if false, errors are ignored (useful for cleanup) +func doUnmount(t *testing.T, mountPoint string, failOnError bool) { + t.Helper() + var cmd *exec.Cmd + if runtime.GOOS == "linux" { + // fusermount -u: unmount filesystem (strict - fails if busy) + cmd = exec.Command("fusermount", "-u", mountPoint) + } else { + cmd = exec.Command("umount", mountPoint) + } + + err := cmd.Run() + if err != nil && failOnError { + t.Fatalf("failed to unmount %s: %v", mountPoint, err) + } +} diff --git a/test/cli/testutils/requires.go b/test/cli/testutils/requires.go index 1462b7fee..b0070e441 100644 --- a/test/cli/testutils/requires.go +++ b/test/cli/testutils/requires.go @@ -2,6 +2,7 @@ package testutils import ( "os" + "os/exec" "runtime" "testing" ) @@ -13,9 +14,48 @@ func RequiresDocker(t *testing.T) { } func RequiresFUSE(t *testing.T) { - if os.Getenv("TEST_FUSE") != "1" { - t.SkipNow() + // Skip if FUSE tests are explicitly disabled + if os.Getenv("TEST_FUSE") == "0" { + t.Skip("FUSE tests disabled via TEST_FUSE=0") } + + // If TEST_FUSE=1 is set, always run (for backwards compatibility) + if os.Getenv("TEST_FUSE") == "1" { + return + } + + // Auto-detect FUSE availability based on platform and tools + if !isFUSEAvailable(t) { + t.Skip("FUSE not available (no fusermount/umount found or unsupported platform)") + } +} + +// isFUSEAvailable checks if FUSE is available on the current system +func isFUSEAvailable(t *testing.T) bool { + t.Helper() + + // Check platform support + switch runtime.GOOS { + case "linux", "darwin", "freebsd", "openbsd", "netbsd": + // These platforms potentially support FUSE + case "windows": + // Windows has limited FUSE support via WinFsp, but skip for now + return false + default: + // Unknown platform, assume no FUSE support + return false + } + + // Check for required unmount tools + var unmountCmd string + if runtime.GOOS == "linux" { + unmountCmd = "fusermount" + } else { + unmountCmd = "umount" + } + + _, err := exec.LookPath(unmountCmd) + return err == nil } func RequiresExpensive(t *testing.T) {