mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-22 10:57:42 +08:00
Extract /mfs
The /mfs module has been extracted to github.com/ipfs/go-mfs All history has been retained in the new repository. README, LICENSE, Makefiles and CI integration have been added to the new location. License: MIT Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
parent
afda4ca04f
commit
39a87a0fb0
@ -10,7 +10,6 @@ import (
|
||||
core "github.com/ipfs/go-ipfs/core"
|
||||
"github.com/ipfs/go-ipfs/core/coreunix"
|
||||
filestore "github.com/ipfs/go-ipfs/filestore"
|
||||
mfs "github.com/ipfs/go-ipfs/mfs"
|
||||
dag "gx/ipfs/QmQzSpSjkdGHW6WFBhUG6P3t9K8yv7iucucT1cQaqJ6tgd/go-merkledag"
|
||||
dagtest "gx/ipfs/QmQzSpSjkdGHW6WFBhUG6P3t9K8yv7iucucT1cQaqJ6tgd/go-merkledag/test"
|
||||
blockservice "gx/ipfs/QmTZZrpd9o4vpYr9TEADW2EoJ9fzUtAgpXqjxZHbKR2T15/go-blockservice"
|
||||
@ -20,6 +19,7 @@ import (
|
||||
files "gx/ipfs/QmPVqQHEfLpqK7JLCsUkyam7rhuV3MAeZ9gueQQCrBwCta/go-ipfs-cmdkit/files"
|
||||
mh "gx/ipfs/QmPnFwZ2JXKnXgMw8CdBPxn7FWh6LLdjUjxV1fKHuJnkr8/go-multihash"
|
||||
pb "gx/ipfs/QmPtj12fdwuAqj9sBSTNUxBNu8kCGNp8b3o8yUzMm5GHpq/pb"
|
||||
mfs "gx/ipfs/QmQeLRo7dHKpmREWkAUXRAri4Bro3BqrDVJJHjHHmKSHMc/go-mfs"
|
||||
cmds "gx/ipfs/QmUQb3xtNzkQCgTj2NjaqcJZNv2nfSSub2QAdy9DtQMRBT/go-ipfs-cmds"
|
||||
offline "gx/ipfs/QmVozMmsgK2PYyaHQsrcWLBYigb1m6mW8YhCBG2Cb4Uxq9/go-ipfs-exchange-offline"
|
||||
bstore "gx/ipfs/QmYBEfMSquSGnuxBthUoBJNs3F6p4VAPPvAgxq6XXGvTPh/go-ipfs-blockstore"
|
||||
|
||||
@ -15,7 +15,6 @@ import (
|
||||
lgc "github.com/ipfs/go-ipfs/commands/legacy"
|
||||
core "github.com/ipfs/go-ipfs/core"
|
||||
e "github.com/ipfs/go-ipfs/core/commands/e"
|
||||
mfs "github.com/ipfs/go-ipfs/mfs"
|
||||
dag "gx/ipfs/QmQzSpSjkdGHW6WFBhUG6P3t9K8yv7iucucT1cQaqJ6tgd/go-merkledag"
|
||||
bservice "gx/ipfs/QmTZZrpd9o4vpYr9TEADW2EoJ9fzUtAgpXqjxZHbKR2T15/go-blockservice"
|
||||
path "gx/ipfs/QmWMcvZbNvk5codeqbm7L89C9kqSwka4KaHnDb8HRnxsSL/go-path"
|
||||
@ -26,6 +25,7 @@ import (
|
||||
humanize "gx/ipfs/QmPSBJL4momYnE7DcUyk2DVhD6rH488ZmHBGLbxNdhU44K/go-humanize"
|
||||
cmdkit "gx/ipfs/QmPVqQHEfLpqK7JLCsUkyam7rhuV3MAeZ9gueQQCrBwCta/go-ipfs-cmdkit"
|
||||
mh "gx/ipfs/QmPnFwZ2JXKnXgMw8CdBPxn7FWh6LLdjUjxV1fKHuJnkr8/go-multihash"
|
||||
mfs "gx/ipfs/QmQeLRo7dHKpmREWkAUXRAri4Bro3BqrDVJJHjHHmKSHMc/go-mfs"
|
||||
logging "gx/ipfs/QmRREK2CAZ5Re2Bd9zZFG6FeYDppUWt5cMgsoUEp3ktgSr/go-log"
|
||||
cmds "gx/ipfs/QmUQb3xtNzkQCgTj2NjaqcJZNv2nfSSub2QAdy9DtQMRBT/go-ipfs-cmds"
|
||||
offline "gx/ipfs/QmVozMmsgK2PYyaHQsrcWLBYigb1m6mW8YhCBG2Cb4Uxq9/go-ipfs-exchange-offline"
|
||||
|
||||
@ -24,7 +24,6 @@ import (
|
||||
rp "github.com/ipfs/go-ipfs/exchange/reprovide"
|
||||
filestore "github.com/ipfs/go-ipfs/filestore"
|
||||
mount "github.com/ipfs/go-ipfs/fuse/mount"
|
||||
mfs "github.com/ipfs/go-ipfs/mfs"
|
||||
namesys "github.com/ipfs/go-ipfs/namesys"
|
||||
ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher"
|
||||
p2p "github.com/ipfs/go-ipfs/p2p"
|
||||
@ -36,6 +35,7 @@ import (
|
||||
ic "gx/ipfs/QmPvyPwuCgJ7pDmrKDxRtsScJgBaM5h4EpRL2qQJsmXf4n/go-libp2p-crypto"
|
||||
p2phost "gx/ipfs/QmQ1hwb95uSSZR8jSPJysnfHxBDQAykSXsmz5TwTzxjq2Z/go-libp2p-host"
|
||||
config "gx/ipfs/QmQSG7YCizeUH2bWatzp6uK9Vm3m7LA5jpxGa9QqgpNKw4/go-ipfs-config"
|
||||
mfs "gx/ipfs/QmQeLRo7dHKpmREWkAUXRAri4Bro3BqrDVJJHjHHmKSHMc/go-mfs"
|
||||
bitswap "gx/ipfs/QmQk1Rqy5XSBzXykMSsgiXfnhivCSnFpykx4M2j6DD1nBH/go-bitswap"
|
||||
bsnet "gx/ipfs/QmQk1Rqy5XSBzXykMSsgiXfnhivCSnFpykx4M2j6DD1nBH/go-bitswap/network"
|
||||
merkledag "gx/ipfs/QmQzSpSjkdGHW6WFBhUG6P3t9K8yv7iucucT1cQaqJ6tgd/go-merkledag"
|
||||
|
||||
@ -7,11 +7,11 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-ipfs/core"
|
||||
mfs "github.com/ipfs/go-ipfs/mfs"
|
||||
gc "github.com/ipfs/go-ipfs/pin/gc"
|
||||
repo "github.com/ipfs/go-ipfs/repo"
|
||||
|
||||
humanize "gx/ipfs/QmPSBJL4momYnE7DcUyk2DVhD6rH488ZmHBGLbxNdhU44K/go-humanize"
|
||||
mfs "gx/ipfs/QmQeLRo7dHKpmREWkAUXRAri4Bro3BqrDVJJHjHHmKSHMc/go-mfs"
|
||||
logging "gx/ipfs/QmRREK2CAZ5Re2Bd9zZFG6FeYDppUWt5cMgsoUEp3ktgSr/go-log"
|
||||
cid "gx/ipfs/QmYjnkEL7i731PirfVH1sis89evN7jt4otSHw5D2xXXwUV/go-cid"
|
||||
)
|
||||
|
||||
@ -11,7 +11,6 @@ import (
|
||||
"strconv"
|
||||
|
||||
core "github.com/ipfs/go-ipfs/core"
|
||||
mfs "github.com/ipfs/go-ipfs/mfs"
|
||||
"github.com/ipfs/go-ipfs/pin"
|
||||
dag "gx/ipfs/QmQzSpSjkdGHW6WFBhUG6P3t9K8yv7iucucT1cQaqJ6tgd/go-merkledag"
|
||||
unixfs "gx/ipfs/QmWv8MYwgPK4zXYv1et1snWJ6FWGqaL6xY2y9X1bRSKBxk/go-unixfs"
|
||||
@ -20,6 +19,7 @@ import (
|
||||
trickle "gx/ipfs/QmWv8MYwgPK4zXYv1et1snWJ6FWGqaL6xY2y9X1bRSKBxk/go-unixfs/importer/trickle"
|
||||
|
||||
files "gx/ipfs/QmPVqQHEfLpqK7JLCsUkyam7rhuV3MAeZ9gueQQCrBwCta/go-ipfs-cmdkit/files"
|
||||
mfs "gx/ipfs/QmQeLRo7dHKpmREWkAUXRAri4Bro3BqrDVJJHjHHmKSHMc/go-mfs"
|
||||
logging "gx/ipfs/QmRREK2CAZ5Re2Bd9zZFG6FeYDppUWt5cMgsoUEp3ktgSr/go-log"
|
||||
chunker "gx/ipfs/QmWbCAB5f3LDumj4ncz1UCHSiyXrXxkMxZB6Wv35xi4P8z/go-ipfs-chunker"
|
||||
bstore "gx/ipfs/QmYBEfMSquSGnuxBthUoBJNs3F6p4VAPPvAgxq6XXGvTPh/go-ipfs-blockstore"
|
||||
|
||||
@ -12,13 +12,13 @@ import (
|
||||
"os"
|
||||
|
||||
core "github.com/ipfs/go-ipfs/core"
|
||||
mfs "github.com/ipfs/go-ipfs/mfs"
|
||||
namesys "github.com/ipfs/go-ipfs/namesys"
|
||||
dag "gx/ipfs/QmQzSpSjkdGHW6WFBhUG6P3t9K8yv7iucucT1cQaqJ6tgd/go-merkledag"
|
||||
path "gx/ipfs/QmWMcvZbNvk5codeqbm7L89C9kqSwka4KaHnDb8HRnxsSL/go-path"
|
||||
ft "gx/ipfs/QmWv8MYwgPK4zXYv1et1snWJ6FWGqaL6xY2y9X1bRSKBxk/go-unixfs"
|
||||
|
||||
ci "gx/ipfs/QmPvyPwuCgJ7pDmrKDxRtsScJgBaM5h4EpRL2qQJsmXf4n/go-libp2p-crypto"
|
||||
mfs "gx/ipfs/QmQeLRo7dHKpmREWkAUXRAri4Bro3BqrDVJJHjHHmKSHMc/go-mfs"
|
||||
logging "gx/ipfs/QmRREK2CAZ5Re2Bd9zZFG6FeYDppUWt5cMgsoUEp3ktgSr/go-log"
|
||||
fuse "gx/ipfs/QmSJBsmLP1XMjv8hxYg2rUMdPDB7YUpyBo9idjrJ6Cmq6F/fuse"
|
||||
fs "gx/ipfs/QmSJBsmLP1XMjv8hxYg2rUMdPDB7YUpyBo9idjrJ6Cmq6F/fuse/fs"
|
||||
|
||||
462
mfs/dir.go
462
mfs/dir.go
@ -1,462 +0,0 @@
|
||||
package mfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
dag "gx/ipfs/QmQzSpSjkdGHW6WFBhUG6P3t9K8yv7iucucT1cQaqJ6tgd/go-merkledag"
|
||||
ft "gx/ipfs/QmWv8MYwgPK4zXYv1et1snWJ6FWGqaL6xY2y9X1bRSKBxk/go-unixfs"
|
||||
uio "gx/ipfs/QmWv8MYwgPK4zXYv1et1snWJ6FWGqaL6xY2y9X1bRSKBxk/go-unixfs/io"
|
||||
ufspb "gx/ipfs/QmWv8MYwgPK4zXYv1et1snWJ6FWGqaL6xY2y9X1bRSKBxk/go-unixfs/pb"
|
||||
|
||||
cid "gx/ipfs/QmYjnkEL7i731PirfVH1sis89evN7jt4otSHw5D2xXXwUV/go-cid"
|
||||
ipld "gx/ipfs/QmaA8GkXUYinkkndvg7T6Tx7gYXemhxjaxLisEPes7Rf1P/go-ipld-format"
|
||||
)
|
||||
|
||||
var ErrNotYetImplemented = errors.New("not yet implemented")
|
||||
var ErrInvalidChild = errors.New("invalid child node")
|
||||
var ErrDirExists = errors.New("directory already has entry by that name")
|
||||
|
||||
type Directory struct {
|
||||
dserv ipld.DAGService
|
||||
parent childCloser
|
||||
|
||||
childDirs map[string]*Directory
|
||||
files map[string]*File
|
||||
|
||||
lock sync.Mutex
|
||||
ctx context.Context
|
||||
|
||||
// UnixFS directory implementation used for creating,
|
||||
// reading and editing directories.
|
||||
unixfsDir uio.Directory
|
||||
|
||||
modTime time.Time
|
||||
|
||||
name string
|
||||
}
|
||||
|
||||
// NewDirectory constructs a new MFS directory.
|
||||
//
|
||||
// You probably don't want to call this directly. Instead, construct a new root
|
||||
// using NewRoot.
|
||||
func NewDirectory(ctx context.Context, name string, node ipld.Node, parent childCloser, dserv ipld.DAGService) (*Directory, error) {
|
||||
db, err := uio.NewDirectoryFromNode(dserv, node)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Directory{
|
||||
dserv: dserv,
|
||||
ctx: ctx,
|
||||
name: name,
|
||||
unixfsDir: db,
|
||||
parent: parent,
|
||||
childDirs: make(map[string]*Directory),
|
||||
files: make(map[string]*File),
|
||||
modTime: time.Now(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetCidBuilder gets the CID builder of the root node
|
||||
func (d *Directory) GetCidBuilder() cid.Builder {
|
||||
return d.unixfsDir.GetCidBuilder()
|
||||
}
|
||||
|
||||
// SetCidBuilder sets the CID builder
|
||||
func (d *Directory) SetCidBuilder(b cid.Builder) {
|
||||
d.unixfsDir.SetCidBuilder(b)
|
||||
}
|
||||
|
||||
// closeChild updates the child by the given name to the dag node 'nd'
|
||||
// and changes its own dag node
|
||||
func (d *Directory) closeChild(name string, nd ipld.Node, sync bool) error {
|
||||
mynd, err := d.closeChildUpdate(name, nd, sync)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if sync {
|
||||
return d.parent.closeChild(d.name, mynd, true)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// closeChildUpdate is the portion of closeChild that needs to be locked around
|
||||
func (d *Directory) closeChildUpdate(name string, nd ipld.Node, sync bool) (*dag.ProtoNode, error) {
|
||||
d.lock.Lock()
|
||||
defer d.lock.Unlock()
|
||||
|
||||
err := d.updateChild(name, nd)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if sync {
|
||||
return d.flushCurrentNode()
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (d *Directory) flushCurrentNode() (*dag.ProtoNode, error) {
|
||||
nd, err := d.unixfsDir.GetNode()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = d.dserv.Add(d.ctx, nd)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pbnd, ok := nd.(*dag.ProtoNode)
|
||||
if !ok {
|
||||
return nil, dag.ErrNotProtobuf
|
||||
}
|
||||
|
||||
return pbnd.Copy().(*dag.ProtoNode), nil
|
||||
}
|
||||
|
||||
func (d *Directory) updateChild(name string, nd ipld.Node) error {
|
||||
err := d.AddUnixFSChild(name, nd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
d.modTime = time.Now()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Directory) Type() NodeType {
|
||||
return TDir
|
||||
}
|
||||
|
||||
// childNode returns a FSNode under this directory by the given name if it exists.
|
||||
// it does *not* check the cached dirs and files
|
||||
func (d *Directory) childNode(name string) (FSNode, error) {
|
||||
nd, err := d.childFromDag(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return d.cacheNode(name, nd)
|
||||
}
|
||||
|
||||
// cacheNode caches a node into d.childDirs or d.files and returns the FSNode.
|
||||
func (d *Directory) cacheNode(name string, nd ipld.Node) (FSNode, error) {
|
||||
switch nd := nd.(type) {
|
||||
case *dag.ProtoNode:
|
||||
i, err := ft.FromBytes(nd.Data())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch i.GetType() {
|
||||
case ufspb.Data_Directory, ufspb.Data_HAMTShard:
|
||||
ndir, err := NewDirectory(d.ctx, name, nd, d, d.dserv)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
d.childDirs[name] = ndir
|
||||
return ndir, nil
|
||||
case ufspb.Data_File, ufspb.Data_Raw, ufspb.Data_Symlink:
|
||||
nfi, err := NewFile(name, nd, d, d.dserv)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
d.files[name] = nfi
|
||||
return nfi, nil
|
||||
case ufspb.Data_Metadata:
|
||||
return nil, ErrNotYetImplemented
|
||||
default:
|
||||
return nil, ErrInvalidChild
|
||||
}
|
||||
case *dag.RawNode:
|
||||
nfi, err := NewFile(name, nd, d, d.dserv)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
d.files[name] = nfi
|
||||
return nfi, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unrecognized node type in cache node")
|
||||
}
|
||||
}
|
||||
|
||||
// Child returns the child of this directory by the given name
|
||||
func (d *Directory) Child(name string) (FSNode, error) {
|
||||
d.lock.Lock()
|
||||
defer d.lock.Unlock()
|
||||
return d.childUnsync(name)
|
||||
}
|
||||
|
||||
func (d *Directory) Uncache(name string) {
|
||||
d.lock.Lock()
|
||||
defer d.lock.Unlock()
|
||||
delete(d.files, name)
|
||||
delete(d.childDirs, name)
|
||||
}
|
||||
|
||||
// childFromDag searches through this directories dag node for a child link
|
||||
// with the given name
|
||||
func (d *Directory) childFromDag(name string) (ipld.Node, error) {
|
||||
return d.unixfsDir.Find(d.ctx, name)
|
||||
}
|
||||
|
||||
// childUnsync returns the child under this directory by the given name
|
||||
// without locking, useful for operations which already hold a lock
|
||||
func (d *Directory) childUnsync(name string) (FSNode, error) {
|
||||
cdir, ok := d.childDirs[name]
|
||||
if ok {
|
||||
return cdir, nil
|
||||
}
|
||||
|
||||
cfile, ok := d.files[name]
|
||||
if ok {
|
||||
return cfile, nil
|
||||
}
|
||||
|
||||
return d.childNode(name)
|
||||
}
|
||||
|
||||
type NodeListing struct {
|
||||
Name string
|
||||
Type int
|
||||
Size int64
|
||||
Hash string
|
||||
}
|
||||
|
||||
func (d *Directory) ListNames(ctx context.Context) ([]string, error) {
|
||||
d.lock.Lock()
|
||||
defer d.lock.Unlock()
|
||||
|
||||
var out []string
|
||||
err := d.unixfsDir.ForEachLink(ctx, func(l *ipld.Link) error {
|
||||
out = append(out, l.Name)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (d *Directory) List(ctx context.Context) ([]NodeListing, error) {
|
||||
var out []NodeListing
|
||||
err := d.ForEachEntry(ctx, func(nl NodeListing) error {
|
||||
out = append(out, nl)
|
||||
return nil
|
||||
})
|
||||
return out, err
|
||||
}
|
||||
|
||||
func (d *Directory) ForEachEntry(ctx context.Context, f func(NodeListing) error) error {
|
||||
d.lock.Lock()
|
||||
defer d.lock.Unlock()
|
||||
return d.unixfsDir.ForEachLink(ctx, func(l *ipld.Link) error {
|
||||
c, err := d.childUnsync(l.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
nd, err := c.GetNode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
child := NodeListing{
|
||||
Name: l.Name,
|
||||
Type: int(c.Type()),
|
||||
Hash: nd.Cid().String(),
|
||||
}
|
||||
|
||||
if c, ok := c.(*File); ok {
|
||||
size, err := c.Size()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
child.Size = size
|
||||
}
|
||||
|
||||
return f(child)
|
||||
})
|
||||
}
|
||||
|
||||
func (d *Directory) Mkdir(name string) (*Directory, error) {
|
||||
d.lock.Lock()
|
||||
defer d.lock.Unlock()
|
||||
|
||||
fsn, err := d.childUnsync(name)
|
||||
if err == nil {
|
||||
switch fsn := fsn.(type) {
|
||||
case *Directory:
|
||||
return fsn, os.ErrExist
|
||||
case *File:
|
||||
return nil, os.ErrExist
|
||||
default:
|
||||
return nil, fmt.Errorf("unrecognized type: %#v", fsn)
|
||||
}
|
||||
}
|
||||
|
||||
ndir := ft.EmptyDirNode()
|
||||
ndir.SetCidBuilder(d.GetCidBuilder())
|
||||
|
||||
err = d.dserv.Add(d.ctx, ndir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = d.AddUnixFSChild(name, ndir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dirobj, err := NewDirectory(d.ctx, name, ndir, d, d.dserv)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
d.childDirs[name] = dirobj
|
||||
return dirobj, nil
|
||||
}
|
||||
|
||||
func (d *Directory) Unlink(name string) error {
|
||||
d.lock.Lock()
|
||||
defer d.lock.Unlock()
|
||||
|
||||
delete(d.childDirs, name)
|
||||
delete(d.files, name)
|
||||
|
||||
return d.unixfsDir.RemoveChild(d.ctx, name)
|
||||
}
|
||||
|
||||
func (d *Directory) Flush() error {
|
||||
nd, err := d.GetNode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return d.parent.closeChild(d.name, nd, true)
|
||||
}
|
||||
|
||||
// AddChild adds the node 'nd' under this directory giving it the name 'name'
|
||||
func (d *Directory) AddChild(name string, nd ipld.Node) error {
|
||||
d.lock.Lock()
|
||||
defer d.lock.Unlock()
|
||||
|
||||
_, err := d.childUnsync(name)
|
||||
if err == nil {
|
||||
return ErrDirExists
|
||||
}
|
||||
|
||||
err = d.dserv.Add(d.ctx, nd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = d.AddUnixFSChild(name, nd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
d.modTime = time.Now()
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddUnixFSChild adds a child to the inner UnixFS directory
|
||||
// and transitions to a HAMT implementation if needed.
|
||||
func (d *Directory) AddUnixFSChild(name string, node ipld.Node) error {
|
||||
if uio.UseHAMTSharding {
|
||||
// If the directory HAMT implementation is being used and this
|
||||
// directory is actually a basic implementation switch it to HAMT.
|
||||
if basicDir, ok := d.unixfsDir.(*uio.BasicDirectory); ok {
|
||||
hamtDir, err := basicDir.SwitchToSharding(d.ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.unixfsDir = hamtDir
|
||||
}
|
||||
}
|
||||
|
||||
err := d.unixfsDir.AddChild(d.ctx, name, node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Directory) sync() error {
|
||||
for name, dir := range d.childDirs {
|
||||
nd, err := dir.GetNode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = d.updateChild(name, nd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for name, file := range d.files {
|
||||
nd, err := file.GetNode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = d.updateChild(name, nd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Directory) Path() string {
|
||||
cur := d
|
||||
var out string
|
||||
for cur != nil {
|
||||
switch parent := cur.parent.(type) {
|
||||
case *Directory:
|
||||
out = path.Join(cur.name, out)
|
||||
cur = parent
|
||||
case *Root:
|
||||
return "/" + out
|
||||
default:
|
||||
panic("directory parent neither a directory nor a root")
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (d *Directory) GetNode() (ipld.Node, error) {
|
||||
d.lock.Lock()
|
||||
defer d.lock.Unlock()
|
||||
|
||||
err := d.sync()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nd, err := d.unixfsDir.GetNode()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = d.dserv.Add(d.ctx, nd)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nd.Copy(), err
|
||||
}
|
||||
151
mfs/fd.go
151
mfs/fd.go
@ -1,151 +0,0 @@
|
||||
package mfs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
mod "gx/ipfs/QmWv8MYwgPK4zXYv1et1snWJ6FWGqaL6xY2y9X1bRSKBxk/go-unixfs/mod"
|
||||
|
||||
context "context"
|
||||
)
|
||||
|
||||
type FileDescriptor interface {
|
||||
io.Reader
|
||||
CtxReadFull(context.Context, []byte) (int, error)
|
||||
|
||||
io.Writer
|
||||
io.WriterAt
|
||||
|
||||
io.Closer
|
||||
io.Seeker
|
||||
|
||||
Truncate(int64) error
|
||||
Size() (int64, error)
|
||||
Sync() error
|
||||
Flush() error
|
||||
}
|
||||
|
||||
type fileDescriptor struct {
|
||||
inode *File
|
||||
mod *mod.DagModifier
|
||||
perms int
|
||||
sync bool
|
||||
hasChanges bool
|
||||
|
||||
closed bool
|
||||
}
|
||||
|
||||
// Size returns the size of the file referred to by this descriptor
|
||||
func (fi *fileDescriptor) Size() (int64, error) {
|
||||
return fi.mod.Size()
|
||||
}
|
||||
|
||||
// Truncate truncates the file to size
|
||||
func (fi *fileDescriptor) Truncate(size int64) error {
|
||||
if fi.perms == OpenReadOnly {
|
||||
return fmt.Errorf("cannot call truncate on readonly file descriptor")
|
||||
}
|
||||
fi.hasChanges = true
|
||||
return fi.mod.Truncate(size)
|
||||
}
|
||||
|
||||
// Write writes the given data to the file at its current offset
|
||||
func (fi *fileDescriptor) Write(b []byte) (int, error) {
|
||||
if fi.perms == OpenReadOnly {
|
||||
return 0, fmt.Errorf("cannot write on not writeable descriptor")
|
||||
}
|
||||
fi.hasChanges = true
|
||||
return fi.mod.Write(b)
|
||||
}
|
||||
|
||||
// Read reads into the given buffer from the current offset
|
||||
func (fi *fileDescriptor) Read(b []byte) (int, error) {
|
||||
if fi.perms == OpenWriteOnly {
|
||||
return 0, fmt.Errorf("cannot read on write-only descriptor")
|
||||
}
|
||||
return fi.mod.Read(b)
|
||||
}
|
||||
|
||||
// Read reads into the given buffer from the current offset
|
||||
func (fi *fileDescriptor) CtxReadFull(ctx context.Context, b []byte) (int, error) {
|
||||
if fi.perms == OpenWriteOnly {
|
||||
return 0, fmt.Errorf("cannot read on write-only descriptor")
|
||||
}
|
||||
return fi.mod.CtxReadFull(ctx, b)
|
||||
}
|
||||
|
||||
// Close flushes, then propogates the modified dag node up the directory structure
|
||||
// and signals a republish to occur
|
||||
func (fi *fileDescriptor) Close() error {
|
||||
defer func() {
|
||||
switch fi.perms {
|
||||
case OpenReadOnly:
|
||||
fi.inode.desclock.RUnlock()
|
||||
case OpenWriteOnly, OpenReadWrite:
|
||||
fi.inode.desclock.Unlock()
|
||||
}
|
||||
}()
|
||||
|
||||
if fi.closed {
|
||||
panic("attempted to close file descriptor twice!")
|
||||
}
|
||||
|
||||
if fi.hasChanges {
|
||||
err := fi.mod.Sync()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fi.hasChanges = false
|
||||
|
||||
// explicitly stay locked for flushUp call,
|
||||
// it will manage the lock for us
|
||||
return fi.flushUp(fi.sync)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fi *fileDescriptor) Sync() error {
|
||||
return fi.flushUp(false)
|
||||
}
|
||||
|
||||
func (fi *fileDescriptor) Flush() error {
|
||||
return fi.flushUp(true)
|
||||
}
|
||||
|
||||
// flushUp syncs the file and adds it to the dagservice
|
||||
// it *must* be called with the File's lock taken
|
||||
func (fi *fileDescriptor) flushUp(fullsync bool) error {
|
||||
nd, err := fi.mod.GetNode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = fi.inode.dserv.Add(context.TODO(), nd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fi.inode.nodelk.Lock()
|
||||
fi.inode.node = nd
|
||||
name := fi.inode.name
|
||||
parent := fi.inode.parent
|
||||
fi.inode.nodelk.Unlock()
|
||||
|
||||
return parent.closeChild(name, nd, fullsync)
|
||||
}
|
||||
|
||||
// Seek implements io.Seeker
|
||||
func (fi *fileDescriptor) Seek(offset int64, whence int) (int64, error) {
|
||||
return fi.mod.Seek(offset, whence)
|
||||
}
|
||||
|
||||
// Write At writes the given bytes at the offset 'at'
|
||||
func (fi *fileDescriptor) WriteAt(b []byte, at int64) (int, error) {
|
||||
if fi.perms == OpenReadOnly {
|
||||
return 0, fmt.Errorf("cannot write on not writeable descriptor")
|
||||
}
|
||||
fi.hasChanges = true
|
||||
return fi.mod.WriteAt(b, at)
|
||||
}
|
||||
146
mfs/file.go
146
mfs/file.go
@ -1,146 +0,0 @@
|
||||
package mfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
dag "gx/ipfs/QmQzSpSjkdGHW6WFBhUG6P3t9K8yv7iucucT1cQaqJ6tgd/go-merkledag"
|
||||
ft "gx/ipfs/QmWv8MYwgPK4zXYv1et1snWJ6FWGqaL6xY2y9X1bRSKBxk/go-unixfs"
|
||||
mod "gx/ipfs/QmWv8MYwgPK4zXYv1et1snWJ6FWGqaL6xY2y9X1bRSKBxk/go-unixfs/mod"
|
||||
|
||||
chunker "gx/ipfs/QmWbCAB5f3LDumj4ncz1UCHSiyXrXxkMxZB6Wv35xi4P8z/go-ipfs-chunker"
|
||||
ipld "gx/ipfs/QmaA8GkXUYinkkndvg7T6Tx7gYXemhxjaxLisEPes7Rf1P/go-ipld-format"
|
||||
)
|
||||
|
||||
type File struct {
|
||||
parent childCloser
|
||||
|
||||
name string
|
||||
|
||||
desclock sync.RWMutex
|
||||
|
||||
dserv ipld.DAGService
|
||||
node ipld.Node
|
||||
nodelk sync.Mutex
|
||||
|
||||
RawLeaves bool
|
||||
}
|
||||
|
||||
// NewFile returns a NewFile object with the given parameters. If the
|
||||
// Cid version is non-zero RawLeaves will be enabled.
|
||||
func NewFile(name string, node ipld.Node, parent childCloser, dserv ipld.DAGService) (*File, error) {
|
||||
fi := &File{
|
||||
dserv: dserv,
|
||||
parent: parent,
|
||||
name: name,
|
||||
node: node,
|
||||
}
|
||||
if node.Cid().Prefix().Version > 0 {
|
||||
fi.RawLeaves = true
|
||||
}
|
||||
return fi, nil
|
||||
}
|
||||
|
||||
const (
|
||||
OpenReadOnly = iota
|
||||
OpenWriteOnly
|
||||
OpenReadWrite
|
||||
)
|
||||
|
||||
func (fi *File) Open(flags int, sync bool) (FileDescriptor, error) {
|
||||
fi.nodelk.Lock()
|
||||
node := fi.node
|
||||
fi.nodelk.Unlock()
|
||||
|
||||
switch node := node.(type) {
|
||||
case *dag.ProtoNode:
|
||||
fsn, err := ft.FSNodeFromBytes(node.Data())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch fsn.Type() {
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported fsnode type for 'file'")
|
||||
case ft.TSymlink:
|
||||
return nil, fmt.Errorf("symlinks not yet supported")
|
||||
case ft.TFile, ft.TRaw:
|
||||
// OK case
|
||||
}
|
||||
case *dag.RawNode:
|
||||
// Ok as well.
|
||||
}
|
||||
|
||||
switch flags {
|
||||
case OpenReadOnly:
|
||||
fi.desclock.RLock()
|
||||
case OpenWriteOnly, OpenReadWrite:
|
||||
fi.desclock.Lock()
|
||||
default:
|
||||
// TODO: support other modes
|
||||
return nil, fmt.Errorf("mode not supported")
|
||||
}
|
||||
|
||||
dmod, err := mod.NewDagModifier(context.TODO(), node, fi.dserv, chunker.DefaultSplitter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dmod.RawLeaves = fi.RawLeaves
|
||||
|
||||
return &fileDescriptor{
|
||||
inode: fi,
|
||||
perms: flags,
|
||||
sync: sync,
|
||||
mod: dmod,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Size returns the size of this file
|
||||
func (fi *File) Size() (int64, error) {
|
||||
fi.nodelk.Lock()
|
||||
defer fi.nodelk.Unlock()
|
||||
switch nd := fi.node.(type) {
|
||||
case *dag.ProtoNode:
|
||||
pbd, err := ft.FromBytes(nd.Data())
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return int64(pbd.GetFilesize()), nil
|
||||
case *dag.RawNode:
|
||||
return int64(len(nd.RawData())), nil
|
||||
default:
|
||||
return 0, fmt.Errorf("unrecognized node type in mfs/file.Size()")
|
||||
}
|
||||
}
|
||||
|
||||
// GetNode returns the dag node associated with this file
|
||||
func (fi *File) GetNode() (ipld.Node, error) {
|
||||
fi.nodelk.Lock()
|
||||
defer fi.nodelk.Unlock()
|
||||
return fi.node, nil
|
||||
}
|
||||
|
||||
func (fi *File) Flush() error {
|
||||
// open the file in fullsync mode
|
||||
fd, err := fi.Open(OpenWriteOnly, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer fd.Close()
|
||||
|
||||
return fd.Flush()
|
||||
}
|
||||
|
||||
func (fi *File) Sync() error {
|
||||
// just being able to take the writelock means the descriptor is synced
|
||||
fi.desclock.Lock()
|
||||
fi.desclock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Type returns the type FSNode this is
|
||||
func (fi *File) Type() NodeType {
|
||||
return TFile
|
||||
}
|
||||
1184
mfs/mfs_test.go
1184
mfs/mfs_test.go
File diff suppressed because it is too large
Load Diff
223
mfs/ops.go
223
mfs/ops.go
@ -1,223 +0,0 @@
|
||||
package mfs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
gopath "path"
|
||||
"strings"
|
||||
|
||||
path "gx/ipfs/QmWMcvZbNvk5codeqbm7L89C9kqSwka4KaHnDb8HRnxsSL/go-path"
|
||||
|
||||
cid "gx/ipfs/QmYjnkEL7i731PirfVH1sis89evN7jt4otSHw5D2xXXwUV/go-cid"
|
||||
ipld "gx/ipfs/QmaA8GkXUYinkkndvg7T6Tx7gYXemhxjaxLisEPes7Rf1P/go-ipld-format"
|
||||
)
|
||||
|
||||
// Mv moves the file or directory at 'src' to 'dst'
|
||||
func Mv(r *Root, src, dst string) error {
|
||||
srcDir, srcFname := gopath.Split(src)
|
||||
|
||||
var dstDirStr string
|
||||
var filename string
|
||||
if dst[len(dst)-1] == '/' {
|
||||
dstDirStr = dst
|
||||
filename = srcFname
|
||||
} else {
|
||||
dstDirStr, filename = gopath.Split(dst)
|
||||
}
|
||||
|
||||
// get parent directories of both src and dest first
|
||||
dstDir, err := lookupDir(r, dstDirStr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
srcDirObj, err := lookupDir(r, srcDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
srcObj, err := srcDirObj.Child(srcFname)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
nd, err := srcObj.GetNode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fsn, err := dstDir.Child(filename)
|
||||
if err == nil {
|
||||
switch n := fsn.(type) {
|
||||
case *File:
|
||||
_ = dstDir.Unlink(filename)
|
||||
case *Directory:
|
||||
dstDir = n
|
||||
default:
|
||||
return fmt.Errorf("unexpected type at path: %s", dst)
|
||||
}
|
||||
} else if err != os.ErrNotExist {
|
||||
return err
|
||||
}
|
||||
|
||||
err = dstDir.AddChild(filename, nd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return srcDirObj.Unlink(srcFname)
|
||||
}
|
||||
|
||||
func lookupDir(r *Root, path string) (*Directory, error) {
|
||||
di, err := Lookup(r, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
d, ok := di.(*Directory)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("%s is not a directory", path)
|
||||
}
|
||||
|
||||
return d, nil
|
||||
}
|
||||
|
||||
// PutNode inserts 'nd' at 'path' in the given mfs
|
||||
func PutNode(r *Root, path string, nd ipld.Node) error {
|
||||
dirp, filename := gopath.Split(path)
|
||||
if filename == "" {
|
||||
return fmt.Errorf("cannot create file with empty name")
|
||||
}
|
||||
|
||||
pdir, err := lookupDir(r, dirp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return pdir.AddChild(filename, nd)
|
||||
}
|
||||
|
||||
// MkdirOpts is used by Mkdir
|
||||
type MkdirOpts struct {
|
||||
Mkparents bool
|
||||
Flush bool
|
||||
CidBuilder cid.Builder
|
||||
}
|
||||
|
||||
// Mkdir creates a directory at 'path' under the directory 'd', creating
|
||||
// intermediary directories as needed if 'mkparents' is set to true
|
||||
func Mkdir(r *Root, pth string, opts MkdirOpts) error {
|
||||
if pth == "" {
|
||||
return fmt.Errorf("no path given to Mkdir")
|
||||
}
|
||||
parts := path.SplitList(pth)
|
||||
if parts[0] == "" {
|
||||
parts = parts[1:]
|
||||
}
|
||||
|
||||
// allow 'mkdir /a/b/c/' to create c
|
||||
if parts[len(parts)-1] == "" {
|
||||
parts = parts[:len(parts)-1]
|
||||
}
|
||||
|
||||
if len(parts) == 0 {
|
||||
// this will only happen on 'mkdir /'
|
||||
if opts.Mkparents {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("cannot create directory '/': Already exists")
|
||||
}
|
||||
|
||||
cur := r.GetDirectory()
|
||||
for i, d := range parts[:len(parts)-1] {
|
||||
fsn, err := cur.Child(d)
|
||||
if err == os.ErrNotExist && opts.Mkparents {
|
||||
mkd, err := cur.Mkdir(d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if opts.CidBuilder != nil {
|
||||
mkd.SetCidBuilder(opts.CidBuilder)
|
||||
}
|
||||
fsn = mkd
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
next, ok := fsn.(*Directory)
|
||||
if !ok {
|
||||
return fmt.Errorf("%s was not a directory", path.Join(parts[:i]))
|
||||
}
|
||||
cur = next
|
||||
}
|
||||
|
||||
final, err := cur.Mkdir(parts[len(parts)-1])
|
||||
if err != nil {
|
||||
if !opts.Mkparents || err != os.ErrExist || final == nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if opts.CidBuilder != nil {
|
||||
final.SetCidBuilder(opts.CidBuilder)
|
||||
}
|
||||
|
||||
if opts.Flush {
|
||||
err := final.Flush()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Lookup extracts the root directory and performs a lookup under it.
|
||||
// TODO: Now that the root is always a directory, can this function
|
||||
// be collapsed with `DirLookup`? Or at least be made a method of `Root`?
|
||||
func Lookup(r *Root, path string) (FSNode, error) {
|
||||
dir := r.GetDirectory()
|
||||
|
||||
return DirLookup(dir, path)
|
||||
}
|
||||
|
||||
// DirLookup will look up a file or directory at the given path
|
||||
// under the directory 'd'
|
||||
func DirLookup(d *Directory, pth string) (FSNode, error) {
|
||||
pth = strings.Trim(pth, "/")
|
||||
parts := path.SplitList(pth)
|
||||
if len(parts) == 1 && parts[0] == "" {
|
||||
return d, nil
|
||||
}
|
||||
|
||||
var cur FSNode
|
||||
cur = d
|
||||
for i, p := range parts {
|
||||
chdir, ok := cur.(*Directory)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("cannot access %s: Not a directory", path.Join(parts[:i+1]))
|
||||
}
|
||||
|
||||
child, err := chdir.Child(p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cur = child
|
||||
}
|
||||
return cur, nil
|
||||
}
|
||||
|
||||
func FlushPath(rt *Root, pth string) error {
|
||||
nd, err := Lookup(rt, pth)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = nd.Flush()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rt.repub.WaitPub()
|
||||
return nil
|
||||
}
|
||||
@ -1,77 +0,0 @@
|
||||
package mfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
ci "gx/ipfs/QmXG74iiKQnDstVQq9fPFQEB6JTNSWBbAWE1qsq6L4E5sR/go-testutil/ci"
|
||||
cid "gx/ipfs/QmYjnkEL7i731PirfVH1sis89evN7jt4otSHw5D2xXXwUV/go-cid"
|
||||
)
|
||||
|
||||
func TestRepublisher(t *testing.T) {
|
||||
if ci.IsRunning() {
|
||||
t.Skip("dont run timing tests in CI")
|
||||
}
|
||||
|
||||
ctx := context.TODO()
|
||||
|
||||
pub := make(chan struct{})
|
||||
|
||||
pf := func(ctx context.Context, c *cid.Cid) error {
|
||||
pub <- struct{}{}
|
||||
return nil
|
||||
}
|
||||
|
||||
tshort := time.Millisecond * 50
|
||||
tlong := time.Second / 2
|
||||
|
||||
rp := NewRepublisher(ctx, pf, tshort, tlong)
|
||||
go rp.Run()
|
||||
|
||||
rp.Update(nil)
|
||||
|
||||
// should hit short timeout
|
||||
select {
|
||||
case <-time.After(tshort * 2):
|
||||
t.Fatal("publish didnt happen in time")
|
||||
case <-pub:
|
||||
}
|
||||
|
||||
cctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
go func() {
|
||||
for {
|
||||
rp.Update(nil)
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
select {
|
||||
case <-cctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-pub:
|
||||
t.Fatal("shouldnt have received publish yet!")
|
||||
case <-time.After((tlong * 9) / 10):
|
||||
}
|
||||
select {
|
||||
case <-pub:
|
||||
case <-time.After(tlong / 2):
|
||||
t.Fatal("waited too long for pub!")
|
||||
}
|
||||
|
||||
cancel()
|
||||
|
||||
go func() {
|
||||
err := rp.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
// final pub from closing
|
||||
<-pub
|
||||
}
|
||||
292
mfs/system.go
292
mfs/system.go
@ -1,292 +0,0 @@
|
||||
// package mfs implements an in memory model of a mutable IPFS filesystem.
|
||||
//
|
||||
// It consists of four main structs:
|
||||
// 1) The Filesystem
|
||||
// The filesystem serves as a container and entry point for various mfs filesystems
|
||||
// 2) Root
|
||||
// Root represents an individual filesystem mounted within the mfs system as a whole
|
||||
// 3) Directories
|
||||
// 4) Files
|
||||
package mfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
dag "gx/ipfs/QmQzSpSjkdGHW6WFBhUG6P3t9K8yv7iucucT1cQaqJ6tgd/go-merkledag"
|
||||
ft "gx/ipfs/QmWv8MYwgPK4zXYv1et1snWJ6FWGqaL6xY2y9X1bRSKBxk/go-unixfs"
|
||||
|
||||
logging "gx/ipfs/QmRREK2CAZ5Re2Bd9zZFG6FeYDppUWt5cMgsoUEp3ktgSr/go-log"
|
||||
cid "gx/ipfs/QmYjnkEL7i731PirfVH1sis89evN7jt4otSHw5D2xXXwUV/go-cid"
|
||||
ipld "gx/ipfs/QmaA8GkXUYinkkndvg7T6Tx7gYXemhxjaxLisEPes7Rf1P/go-ipld-format"
|
||||
)
|
||||
|
||||
var ErrNotExist = errors.New("no such rootfs")
|
||||
|
||||
var log = logging.Logger("mfs")
|
||||
|
||||
var ErrIsDirectory = errors.New("error: is a directory")
|
||||
|
||||
type childCloser interface {
|
||||
closeChild(string, ipld.Node, bool) error
|
||||
}
|
||||
|
||||
type NodeType int
|
||||
|
||||
const (
|
||||
TFile NodeType = iota
|
||||
TDir
|
||||
)
|
||||
|
||||
// FSNode represents any node (directory, root, or file) in the mfs filesystem.
|
||||
type FSNode interface {
|
||||
GetNode() (ipld.Node, error)
|
||||
Flush() error
|
||||
Type() NodeType
|
||||
}
|
||||
|
||||
// Root represents the root of a filesystem tree.
|
||||
type Root struct {
|
||||
|
||||
// Root directory of the MFS layout.
|
||||
dir *Directory
|
||||
|
||||
repub *Republisher
|
||||
}
|
||||
|
||||
// PubFunc is the function used by the `publish()` method.
|
||||
type PubFunc func(context.Context, *cid.Cid) error
|
||||
|
||||
// NewRoot creates a new Root and starts up a republisher routine for it.
|
||||
func NewRoot(parent context.Context, ds ipld.DAGService, node *dag.ProtoNode, pf PubFunc) (*Root, error) {
|
||||
|
||||
var repub *Republisher
|
||||
if pf != nil {
|
||||
repub = NewRepublisher(parent, pf, time.Millisecond*300, time.Second*3)
|
||||
repub.setVal(node.Cid())
|
||||
go repub.Run()
|
||||
}
|
||||
|
||||
root := &Root{
|
||||
repub: repub,
|
||||
}
|
||||
|
||||
pbn, err := ft.FromBytes(node.Data())
|
||||
if err != nil {
|
||||
log.Error("IPNS pointer was not unixfs node")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch pbn.GetType() {
|
||||
case ft.TDirectory, ft.THAMTShard:
|
||||
newDir, err := NewDirectory(parent, node.String(), node, root, ds)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
root.dir = newDir
|
||||
case ft.TFile, ft.TMetadata, ft.TRaw:
|
||||
return nil, fmt.Errorf("root can't be a file (unixfs type: %s)", pbn.GetType())
|
||||
default:
|
||||
return nil, fmt.Errorf("unrecognized unixfs type: %s", pbn.GetType())
|
||||
}
|
||||
return root, nil
|
||||
}
|
||||
|
||||
// GetDirectory returns the root directory.
|
||||
func (kr *Root) GetDirectory() *Directory {
|
||||
return kr.dir
|
||||
}
|
||||
|
||||
// Flush signals that an update has occurred since the last publish,
|
||||
// and updates the Root republisher.
|
||||
func (kr *Root) Flush() error {
|
||||
nd, err := kr.GetDirectory().GetNode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if kr.repub != nil {
|
||||
kr.repub.Update(nd.Cid())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// FlushMemFree flushes the root directory and then uncaches all of its links.
|
||||
// This has the effect of clearing out potentially stale references and allows
|
||||
// them to be garbage collected.
|
||||
// CAUTION: Take care not to ever call this while holding a reference to any
|
||||
// child directories. Those directories will be bad references and using them
|
||||
// may have unintended racy side effects.
|
||||
// A better implemented mfs system (one that does smarter internal caching and
|
||||
// refcounting) shouldnt need this method.
|
||||
func (kr *Root) FlushMemFree(ctx context.Context) error {
|
||||
dir := kr.GetDirectory()
|
||||
|
||||
if err := dir.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dir.lock.Lock()
|
||||
defer dir.lock.Unlock()
|
||||
for name := range dir.files {
|
||||
delete(dir.files, name)
|
||||
}
|
||||
for name := range dir.childDirs {
|
||||
delete(dir.childDirs, name)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// closeChild implements the childCloser interface, and signals to the publisher that
|
||||
// there are changes ready to be published.
|
||||
func (kr *Root) closeChild(name string, nd ipld.Node, sync bool) error {
|
||||
err := kr.GetDirectory().dserv.Add(context.TODO(), nd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if kr.repub != nil {
|
||||
kr.repub.Update(nd.Cid())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kr *Root) Close() error {
|
||||
nd, err := kr.GetDirectory().GetNode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if kr.repub != nil {
|
||||
kr.repub.Update(nd.Cid())
|
||||
return kr.repub.Close()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Republisher manages when to publish a given entry.
|
||||
type Republisher struct {
|
||||
TimeoutLong time.Duration
|
||||
TimeoutShort time.Duration
|
||||
Publish chan struct{}
|
||||
pubfunc PubFunc
|
||||
pubnowch chan chan struct{}
|
||||
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
|
||||
lk sync.Mutex
|
||||
val *cid.Cid
|
||||
lastpub *cid.Cid
|
||||
}
|
||||
|
||||
// NewRepublisher creates a new Republisher object to republish the given root
|
||||
// using the given short and long time intervals.
|
||||
func NewRepublisher(ctx context.Context, pf PubFunc, tshort, tlong time.Duration) *Republisher {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
return &Republisher{
|
||||
TimeoutShort: tshort,
|
||||
TimeoutLong: tlong,
|
||||
Publish: make(chan struct{}, 1),
|
||||
pubfunc: pf,
|
||||
pubnowch: make(chan chan struct{}),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Republisher) setVal(c *cid.Cid) {
|
||||
p.lk.Lock()
|
||||
defer p.lk.Unlock()
|
||||
p.val = c
|
||||
}
|
||||
|
||||
// WaitPub Returns immediately if `lastpub` value is consistent with the
|
||||
// current value `val`, else will block until `val` has been published.
|
||||
func (p *Republisher) WaitPub() {
|
||||
p.lk.Lock()
|
||||
consistent := p.lastpub == p.val
|
||||
p.lk.Unlock()
|
||||
if consistent {
|
||||
return
|
||||
}
|
||||
|
||||
wait := make(chan struct{})
|
||||
p.pubnowch <- wait
|
||||
<-wait
|
||||
}
|
||||
|
||||
func (p *Republisher) Close() error {
|
||||
err := p.publish(p.ctx)
|
||||
p.cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
// Touch signals that an update has occurred since the last publish.
|
||||
// Multiple consecutive touches may extend the time period before
|
||||
// the next Publish occurs in order to more efficiently batch updates.
|
||||
func (np *Republisher) Update(c *cid.Cid) {
|
||||
np.setVal(c)
|
||||
select {
|
||||
case np.Publish <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Run is the main republisher loop.
|
||||
func (np *Republisher) Run() {
|
||||
for {
|
||||
select {
|
||||
case <-np.Publish:
|
||||
quick := time.After(np.TimeoutShort)
|
||||
longer := time.After(np.TimeoutLong)
|
||||
|
||||
wait:
|
||||
var pubnowresp chan struct{}
|
||||
|
||||
select {
|
||||
case <-np.ctx.Done():
|
||||
return
|
||||
case <-np.Publish:
|
||||
quick = time.After(np.TimeoutShort)
|
||||
goto wait
|
||||
case <-quick:
|
||||
case <-longer:
|
||||
case pubnowresp = <-np.pubnowch:
|
||||
}
|
||||
|
||||
err := np.publish(np.ctx)
|
||||
if pubnowresp != nil {
|
||||
pubnowresp <- struct{}{}
|
||||
}
|
||||
if err != nil {
|
||||
log.Errorf("republishRoot error: %s", err)
|
||||
}
|
||||
|
||||
case <-np.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// publish calls the `PubFunc`.
|
||||
func (np *Republisher) publish(ctx context.Context) error {
|
||||
np.lk.Lock()
|
||||
topub := np.val
|
||||
np.lk.Unlock()
|
||||
|
||||
err := np.pubfunc(ctx, topub)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
np.lk.Lock()
|
||||
np.lastpub = topub
|
||||
np.lk.Unlock()
|
||||
return nil
|
||||
}
|
||||
@ -527,6 +527,12 @@
|
||||
"hash": "QmfMirfpEKQFctVpBYTvETxxLoU5q4ZJWsAMrtwSSE2bkn",
|
||||
"name": "go-verifcid",
|
||||
"version": "0.0.3"
|
||||
},
|
||||
{
|
||||
"author": "hsanjuan",
|
||||
"hash": "QmQeLRo7dHKpmREWkAUXRAri4Bro3BqrDVJJHjHHmKSHMc",
|
||||
"name": "go-mfs",
|
||||
"version": "0.0.1"
|
||||
}
|
||||
],
|
||||
"gxVersion": "0.10.0",
|
||||
@ -535,3 +541,4 @@
|
||||
"name": "go-ipfs",
|
||||
"version": "0.4.18-dev"
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user