mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 10:27:46 +08:00
fuse mount lifecycle fixes
This commit cleans up how mounting was being done. It now successfully signals when it is properly mounted and listen to close signals correctly.
This commit is contained in:
parent
1fa14335b9
commit
e21b1f662b
@ -134,6 +134,8 @@ func daemonFunc(req cmds.Request) (interface{}, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fmt.Printf("IPFS mounted at: %s\n", fsdir)
|
||||
fmt.Printf("IPNS mounted at: %s\n", nsdir)
|
||||
}
|
||||
|
||||
return nil, listenAndServeAPI(node, req, apiMaddr)
|
||||
|
||||
@ -143,6 +143,33 @@ baz
|
||||
},
|
||||
}
|
||||
|
||||
func Mount(node *core.IpfsNode, fsdir, nsdir string) error {
|
||||
// check if we already have live mounts.
|
||||
// if the user said "Mount", then there must be something wrong.
|
||||
// so, close them and try again.
|
||||
if node.Mounts.Ipfs != nil {
|
||||
node.Mounts.Ipfs.Unmount()
|
||||
}
|
||||
if node.Mounts.Ipns != nil {
|
||||
node.Mounts.Ipns.Unmount()
|
||||
}
|
||||
|
||||
if err := platformFuseChecks(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var err error
|
||||
if err = doMount(node, fsdir, nsdir); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var platformFuseChecks = func() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func doMount(node *core.IpfsNode, fsdir, nsdir string) error {
|
||||
fmtFuseErr := func(err error) error {
|
||||
s := err.Error()
|
||||
@ -176,8 +203,14 @@ func doMount(node *core.IpfsNode, fsdir, nsdir string) error {
|
||||
<-done
|
||||
|
||||
if err1 != nil || err2 != nil {
|
||||
fsmount.Close()
|
||||
nsmount.Close()
|
||||
log.Infof("error mounting: %s %s", err1, err2)
|
||||
if fsmount != nil {
|
||||
fsmount.Unmount()
|
||||
}
|
||||
if nsmount != nil {
|
||||
nsmount.Unmount()
|
||||
}
|
||||
|
||||
if err1 != nil {
|
||||
return fmtFuseErr(err1)
|
||||
} else {
|
||||
@ -190,30 +223,3 @@ func doMount(node *core.IpfsNode, fsdir, nsdir string) error {
|
||||
node.Mounts.Ipns = nsmount
|
||||
return nil
|
||||
}
|
||||
|
||||
var platformFuseChecks = func() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func Mount(node *core.IpfsNode, fsdir, nsdir string) error {
|
||||
// check if we already have live mounts.
|
||||
// if the user said "Mount", then there must be something wrong.
|
||||
// so, close them and try again.
|
||||
if node.Mounts.Ipfs != nil {
|
||||
node.Mounts.Ipfs.Unmount()
|
||||
}
|
||||
if node.Mounts.Ipns != nil {
|
||||
node.Mounts.Ipns.Unmount()
|
||||
}
|
||||
|
||||
if err := platformFuseChecks(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var err error
|
||||
if err = doMount(node, fsdir, nsdir); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -69,7 +69,7 @@ func setupIpnsTest(t *testing.T, node *core.IpfsNode) (*core.IpfsNode, *fstest.M
|
||||
}
|
||||
}
|
||||
|
||||
fs, err := NewIpns(node, node.PrivateKey, "")
|
||||
fs, err := NewFileSystem(node, node.PrivateKey, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@ -37,7 +37,7 @@ type FileSystem struct {
|
||||
}
|
||||
|
||||
// NewFileSystem constructs new fs using given core.IpfsNode instance.
|
||||
func NewIpns(ipfs *core.IpfsNode, sk ci.PrivKey, ipfspath string) (*FileSystem, error) {
|
||||
func NewFileSystem(ipfs *core.IpfsNode, sk ci.PrivKey, ipfspath string) (*FileSystem, error) {
|
||||
root, err := CreateRoot(ipfs, []ci.PrivKey{sk}, ipfspath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@ -1,100 +1,18 @@
|
||||
// +build linux darwin freebsd
|
||||
|
||||
package ipns
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
fuse "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
|
||||
fs "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs"
|
||||
|
||||
core "github.com/jbenet/go-ipfs/core"
|
||||
mount "github.com/jbenet/go-ipfs/fuse/mount"
|
||||
)
|
||||
|
||||
// Mount mounts an IpfsNode instance at a particular path. It
|
||||
// serves until the process receives exit signals (to Unmount).
|
||||
func Mount(ipfs *core.IpfsNode, fpath string, ipfspath string) (mount.Mount, error) {
|
||||
log.Infof("Mounting ipns at %s...", fpath)
|
||||
|
||||
// setup the Mount abstraction.
|
||||
m := mount.New(ipfs.Context(), fpath)
|
||||
|
||||
// go serve the mount
|
||||
m.Mount(func(m mount.Mount) error {
|
||||
return internalMount(ipfs, fpath, ipfspath)
|
||||
}, internalUnmount)
|
||||
|
||||
select {
|
||||
case <-m.Closed():
|
||||
return nil, fmt.Errorf("failed to mount")
|
||||
case <-time.After(time.Second):
|
||||
// assume it worked...
|
||||
}
|
||||
|
||||
// bind the mount (ContextGroup) to the node, so that when the node exits
|
||||
// the fsclosers are automatically closed.
|
||||
ipfs.AddChildGroup(m)
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// mount attempts to mount at the provided FUSE mount point
|
||||
func internalMount(ipfs *core.IpfsNode, fpath string, ipfspath string) error {
|
||||
|
||||
c, err := fuse.Mount(fpath)
|
||||
// Mount mounts ipns at a given location, and returns a mount.Mount instance.
|
||||
func Mount(ipfs *core.IpfsNode, ipnsmp, ipfsmp string) (mount.Mount, error) {
|
||||
fsys, err := NewFileSystem(ipfs, ipfs.PrivateKey, ipfsmp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
fsys, err := NewIpns(ipfs, ipfs.PrivateKey, ipfspath)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Infof("Mounted ipns at %s.", fpath)
|
||||
if err := fs.Serve(c, fsys); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// check if the mount process has an error to report
|
||||
<-c.Ready
|
||||
if err := c.MountError; err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// unmount attempts to unmount the provided FUSE mount point, forcibly
|
||||
// if necessary.
|
||||
func internalUnmount(m mount.Mount) error {
|
||||
point := m.MountPoint()
|
||||
log.Infof("Unmounting ipns at %s...", point)
|
||||
|
||||
var cmd *exec.Cmd
|
||||
switch runtime.GOOS {
|
||||
case "darwin":
|
||||
cmd = exec.Command("diskutil", "umount", "force", point)
|
||||
case "linux":
|
||||
cmd = exec.Command("fusermount", "-u", point)
|
||||
default:
|
||||
return fmt.Errorf("unmount: unimplemented")
|
||||
}
|
||||
|
||||
errc := make(chan error, 1)
|
||||
go func() {
|
||||
if err := exec.Command("umount", point).Run(); err == nil {
|
||||
errc <- err
|
||||
}
|
||||
// retry to unmount with the fallback cmd
|
||||
errc <- cmd.Run()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(1 * time.Second):
|
||||
return fmt.Errorf("umount timeout")
|
||||
case err := <-errc:
|
||||
return err
|
||||
}
|
||||
return mount.NewMount(ipfs, fsys, ipnsmp)
|
||||
}
|
||||
|
||||
@ -3,9 +3,12 @@ package mount
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
fuse "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
|
||||
fs "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs"
|
||||
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
|
||||
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
@ -13,62 +16,119 @@ import (
|
||||
|
||||
var log = u.Logger("mount")
|
||||
|
||||
var MountTimeout = time.Second * 5
|
||||
|
||||
// Mount represents a filesystem mount
|
||||
type Mount interface {
|
||||
|
||||
// MountPoint is the path at which this mount is mounted
|
||||
MountPoint() string
|
||||
|
||||
// Mount function sets up a mount + registers the unmount func
|
||||
Mount(mount MountFunc, unmount UnmountFunc)
|
||||
|
||||
// Unmount calls Close.
|
||||
// Unmounts the mount
|
||||
Unmount() error
|
||||
|
||||
ctxgroup.ContextGroup
|
||||
}
|
||||
|
||||
// UnmountFunc is a function used to Unmount a mount
|
||||
type UnmountFunc func(Mount) error
|
||||
|
||||
// MountFunc is a function used to Mount a mount
|
||||
type MountFunc func(Mount) error
|
||||
|
||||
// New constructs a new Mount instance. ctx is a context to wait upon,
|
||||
// the mountpoint is the directory that the mount was mounted at, and unmount
|
||||
// in an UnmountFunc to perform the unmounting logic.
|
||||
func New(ctx context.Context, mountpoint string) Mount {
|
||||
m := &mount{mpoint: mountpoint}
|
||||
m.ContextGroup = ctxgroup.WithContextAndTeardown(ctx, m.persistentUnmount)
|
||||
return m
|
||||
// CtxGroup returns the mount's CtxGroup to be able to link it
|
||||
// to other processes. Unmount upon closing.
|
||||
CtxGroup() ctxgroup.ContextGroup
|
||||
}
|
||||
|
||||
// mount implements go-ipfs/fuse/mount
|
||||
type mount struct {
|
||||
ctxgroup.ContextGroup
|
||||
mpoint string
|
||||
filesys fs.FS
|
||||
fuseConn *fuse.Conn
|
||||
// closeErr error
|
||||
|
||||
unmount UnmountFunc
|
||||
mpoint string
|
||||
cg ctxgroup.ContextGroup
|
||||
}
|
||||
|
||||
// umount is called after the mount is closed.
|
||||
// TODO this is hacky, make it better.
|
||||
func (m *mount) persistentUnmount() error {
|
||||
// no unmount func.
|
||||
if m.unmount == nil {
|
||||
// Mount mounts a fuse fs.FS at a given location, and returns a Mount instance.
|
||||
// parent is a ContextGroup to bind the mount's ContextGroup to.
|
||||
func NewMount(p ctxgroup.ContextGroup, fsys fs.FS, mountpoint string) (Mount, error) {
|
||||
conn, err := fuse.Mount(mountpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m := &mount{
|
||||
mpoint: mountpoint,
|
||||
fuseConn: conn,
|
||||
filesys: fsys,
|
||||
cg: ctxgroup.WithParent(p), // link it to parent.
|
||||
}
|
||||
m.cg.SetTeardown(m.unmount)
|
||||
|
||||
// launch the mounting process.
|
||||
if err := m.mount(); err != nil {
|
||||
m.Unmount() // just in case.
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (m *mount) mount() error {
|
||||
log.Infof("Mounting %s", m.MountPoint())
|
||||
|
||||
errs := make(chan error, 1)
|
||||
go func() {
|
||||
err := fs.Serve(m.fuseConn, m.filesys)
|
||||
log.Debugf("Mounting %s -- fs.Serve returned (%s)", err)
|
||||
errs <- err
|
||||
close(errs)
|
||||
}()
|
||||
|
||||
// wait for the mount process to be done, or timed out.
|
||||
select {
|
||||
case <-time.After(MountTimeout):
|
||||
return fmt.Errorf("Mounting %s timed out.", m.MountPoint())
|
||||
case err := <-errs:
|
||||
return err
|
||||
case <-m.fuseConn.Ready:
|
||||
}
|
||||
|
||||
// check if the mount process has an error to report
|
||||
if err := m.fuseConn.MountError; err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("Mounted %s", m.MountPoint())
|
||||
return nil
|
||||
}
|
||||
|
||||
// umount is called exactly once to unmount this service.
|
||||
// note that closing the connection will not always unmount
|
||||
// properly. If that happens, we bring out the big guns
|
||||
// (mount.ForceUnmountManyTimes, exec unmount).
|
||||
func (m *mount) unmount() error {
|
||||
log.Infof("Unmounting %s", m.MountPoint())
|
||||
|
||||
// try unmounting with fuse lib
|
||||
err := fuse.Unmount(m.MountPoint())
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
log.Error("fuse unmount err: %s", err)
|
||||
|
||||
// ok try to unmount a whole bunch of times...
|
||||
for i := 0; i < 34; i++ {
|
||||
err := m.unmount(m)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
time.Sleep(time.Millisecond * 300)
|
||||
// try closing the fuseConn
|
||||
err = m.fuseConn.Close()
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
log.Error("fuse conn error: %s", err)
|
||||
}
|
||||
|
||||
// didnt work.
|
||||
return fmt.Errorf("Unmount %s failed after 10 seconds of trying.")
|
||||
// try mount.ForceUnmountManyTimes
|
||||
if err := ForceUnmountManyTimes(m, 10); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("Seemingly unmounted %s", m.MountPoint())
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mount) CtxGroup() ctxgroup.ContextGroup {
|
||||
return m.cg
|
||||
}
|
||||
|
||||
func (m *mount) MountPoint() string {
|
||||
@ -76,17 +136,59 @@ func (m *mount) MountPoint() string {
|
||||
}
|
||||
|
||||
func (m *mount) Unmount() error {
|
||||
return m.Close()
|
||||
// call ContextCloser Close(), which calls unmount() exactly once.
|
||||
return m.cg.Close()
|
||||
}
|
||||
|
||||
func (m *mount) Mount(mount MountFunc, unmount UnmountFunc) {
|
||||
m.unmount = unmount
|
||||
// ForceUnmount attempts to forcibly unmount a given mount.
|
||||
// It does so by calling diskutil or fusermount directly.
|
||||
func ForceUnmount(m Mount) error {
|
||||
point := m.MountPoint()
|
||||
log.Infof("Force-Unmounting %s...", point)
|
||||
|
||||
// go serve the mount
|
||||
m.ContextGroup.AddChildFunc(func(parent ctxgroup.ContextGroup) {
|
||||
if err := mount(m); err != nil {
|
||||
log.Error("%s mount: %s", m.MountPoint(), err)
|
||||
var cmd *exec.Cmd
|
||||
switch runtime.GOOS {
|
||||
case "darwin":
|
||||
cmd = exec.Command("diskutil", "umount", "force", point)
|
||||
case "linux":
|
||||
cmd = exec.Command("fusermount", "-u", point)
|
||||
default:
|
||||
return fmt.Errorf("unmount: unimplemented")
|
||||
}
|
||||
|
||||
errc := make(chan error, 1)
|
||||
go func() {
|
||||
defer close(errc)
|
||||
|
||||
// try vanilla unmount first.
|
||||
if err := exec.Command("umount", point).Run(); err == nil {
|
||||
return
|
||||
}
|
||||
m.Unmount()
|
||||
})
|
||||
|
||||
// retry to unmount with the fallback cmd
|
||||
errc <- cmd.Run()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(2 * time.Second):
|
||||
return fmt.Errorf("umount timeout")
|
||||
case err := <-errc:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// ForceUnmountManyTimes attempts to forcibly unmount a given mount,
|
||||
// many times. It does so by calling diskutil or fusermount directly.
|
||||
// Attempts a given number of times.
|
||||
func ForceUnmountManyTimes(m Mount, attempts int) error {
|
||||
var err error
|
||||
for i := 0; i < attempts; i++ {
|
||||
err = ForceUnmount(m)
|
||||
if err == nil {
|
||||
return err
|
||||
}
|
||||
|
||||
<-time.After(time.Millisecond * 500)
|
||||
}
|
||||
return fmt.Errorf("Unmount %s failed after 10 seconds of trying.", m.MountPoint())
|
||||
}
|
||||
|
||||
14
fuse/readonly/mount_unix.go
Normal file
14
fuse/readonly/mount_unix.go
Normal file
@ -0,0 +1,14 @@
|
||||
// +build linux darwin freebsd
|
||||
|
||||
package readonly
|
||||
|
||||
import (
|
||||
core "github.com/jbenet/go-ipfs/core"
|
||||
mount "github.com/jbenet/go-ipfs/fuse/mount"
|
||||
)
|
||||
|
||||
// Mount mounts ipfs at a given location, and returns a mount.Mount instance.
|
||||
func Mount(ipfs *core.IpfsNode, mountpoint string) (mount.Mount, error) {
|
||||
fsys := NewFileSystem(ipfs)
|
||||
return mount.NewMount(ipfs, fsys, mountpoint)
|
||||
}
|
||||
@ -5,19 +5,14 @@
|
||||
package readonly
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"os/exec"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
fuse "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
|
||||
fs "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs"
|
||||
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
||||
|
||||
core "github.com/jbenet/go-ipfs/core"
|
||||
mount "github.com/jbenet/go-ipfs/fuse/mount"
|
||||
mdag "github.com/jbenet/go-ipfs/merkledag"
|
||||
uio "github.com/jbenet/go-ipfs/unixfs/io"
|
||||
ftpb "github.com/jbenet/go-ipfs/unixfs/pb"
|
||||
@ -158,86 +153,3 @@ func (s *Node) ReadAll(intr fs.Intr) ([]byte, fuse.Error) {
|
||||
// what if i have a 6TB file? GG RAM.
|
||||
return ioutil.ReadAll(r)
|
||||
}
|
||||
|
||||
// Mount mounts an IpfsNode instance at a particular path. It
|
||||
// serves until the process receives exit signals (to Unmount).
|
||||
func Mount(ipfs *core.IpfsNode, fpath string) (mount.Mount, error) {
|
||||
log.Infof("Mounting ipfs at %s...", fpath)
|
||||
|
||||
// setup the Mount abstraction.
|
||||
m := mount.New(ipfs.Context(), fpath)
|
||||
|
||||
// go serve the mount
|
||||
m.Mount(func(m mount.Mount) error {
|
||||
return internalMount(ipfs, m)
|
||||
}, internalUnmount)
|
||||
|
||||
select {
|
||||
case <-m.Closed():
|
||||
return nil, fmt.Errorf("failed to mount")
|
||||
case <-time.After(time.Second):
|
||||
// assume it worked...
|
||||
}
|
||||
|
||||
// bind the mount (ContextGroup) to the node, so that when the node exits
|
||||
// the fsclosers are automatically closed.
|
||||
ipfs.AddChildGroup(m)
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// mount attempts to mount the provided FUSE mount point
|
||||
func internalMount(ipfs *core.IpfsNode, m mount.Mount) error {
|
||||
c, err := fuse.Mount(m.MountPoint())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
fsys := FileSystem{Ipfs: ipfs}
|
||||
|
||||
log.Infof("Mounted ipfs at %s.", m.MountPoint())
|
||||
if err := fs.Serve(c, fsys); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// check if the mount process has an error to report
|
||||
<-c.Ready
|
||||
if err := c.MountError; err != nil {
|
||||
m.Unmount()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// unmount attempts to unmount the provided FUSE mount point, forcibly
|
||||
// if necessary.
|
||||
func internalUnmount(m mount.Mount) error {
|
||||
point := m.MountPoint()
|
||||
log.Infof("Unmounting ipfs at %s...", point)
|
||||
|
||||
var cmd *exec.Cmd
|
||||
switch runtime.GOOS {
|
||||
case "darwin":
|
||||
cmd = exec.Command("diskutil", "umount", "force", point)
|
||||
case "linux":
|
||||
cmd = exec.Command("fusermount", "-u", point)
|
||||
default:
|
||||
return fmt.Errorf("unmount: unimplemented")
|
||||
}
|
||||
|
||||
errc := make(chan error, 1)
|
||||
go func() {
|
||||
if err := exec.Command("umount", point).Run(); err == nil {
|
||||
errc <- err
|
||||
}
|
||||
// retry to unmount with the fallback cmd
|
||||
errc <- cmd.Run()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(1 * time.Second):
|
||||
return fmt.Errorf("umount timeout")
|
||||
case err := <-errc:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user