implement an HAMT for unixfs directory sharding

License: MIT
Signed-off-by: Jeromy <why@ipfs.io>
This commit is contained in:
Jeromy 2016-08-04 12:45:00 -07:00 committed by Jeromy
parent 21072a5f81
commit bb09ffd756
21 changed files with 1968 additions and 128 deletions

View File

@ -59,17 +59,27 @@ func addAssetList(nd *core.IpfsNode, l []string) (*cid.Cid, error) {
}
fname := filepath.Base(p)
c, err := cid.Decode(s)
if err != nil {
return nil, err
}
if err := dirb.AddChild(nd.Context(), fname, c); err != nil {
node, err := nd.DAG.Get(nd.Context(), c)
if err != nil {
return nil, err
}
if err := dirb.AddChild(nd.Context(), fname, node); err != nil {
return nil, fmt.Errorf("assets: could not add '%s' as a child: %s", fname, err)
}
}
dir := dirb.GetNode()
dir, err := dirb.GetNode()
if err != nil {
return nil, err
}
dcid, err := nd.DAG.Add(dir)
if err != nil {
return nil, fmt.Errorf("assets: DAG.Add(dir) failed: %s", err)

View File

@ -2,6 +2,7 @@ package commands
import (
"bufio"
"context"
"errors"
"fmt"
"io"
@ -10,7 +11,6 @@ import (
"strconv"
"time"
context "context"
"github.com/ipfs/go-ipfs/commands/files"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/repo/config"

View File

@ -265,16 +265,7 @@ func getNodeFromPath(ctx context.Context, node *core.IpfsNode, p string) (node.N
ResolveOnce: uio.ResolveUnixfsOnce,
}
nd, err := core.Resolve(ctx, node.Namesys, resolver, np)
if err != nil {
return nil, err
}
pbnd, ok := nd.(*dag.ProtoNode)
if !ok {
return nil, dag.ErrNotProtobuf
}
return pbnd, nil
return core.Resolve(ctx, node.Namesys, resolver, np)
default:
fsn, err := mfs.Lookup(node.FilesRoot, p)
if err != nil {
@ -357,7 +348,13 @@ Examples:
case *mfs.Directory:
if !long {
var output []mfs.NodeListing
for _, name := range fsn.ListNames() {
names, err := fsn.ListNames()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
for _, name := range names {
output = append(output, mfs.NodeListing{
Name: name,
})

View File

@ -228,7 +228,12 @@ func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error {
case *mfs.File:
return nil
case *mfs.Directory:
for _, name := range fsn.ListNames() {
names, err := fsn.ListNames()
if err != nil {
return err
}
for _, name := range names {
child, err := fsn.Child(name)
if err != nil {
return err

View File

@ -16,7 +16,6 @@ import (
namesys "github.com/ipfs/go-ipfs/namesys"
path "github.com/ipfs/go-ipfs/path"
ft "github.com/ipfs/go-ipfs/unixfs"
uio "github.com/ipfs/go-ipfs/unixfs/io"
ci "gx/ipfs/QmPGxZ1DP2w45WcogpW1h43BvseXbfke9N91qotpoQcUeS/go-libp2p-crypto"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
@ -100,7 +99,7 @@ func loadRoot(ctx context.Context, rt *keyRoot, ipfs *core.IpfsNode, name string
switch err {
case nil:
case namesys.ErrResolveFailed:
node = uio.NewEmptyDirectory()
node = ft.EmptyDirNode()
default:
log.Errorf("looking up %s: %s", p, err)
return nil, err

View File

@ -12,6 +12,7 @@ import (
dag "github.com/ipfs/go-ipfs/merkledag"
ft "github.com/ipfs/go-ipfs/unixfs"
uio "github.com/ipfs/go-ipfs/unixfs/io"
ufspb "github.com/ipfs/go-ipfs/unixfs/pb"
node "gx/ipfs/QmYDscK7dmdo2GZ9aumS8s5auUUAH5mR1jvj5pYhWusfK7/go-ipld-node"
@ -29,25 +30,31 @@ type Directory struct {
files map[string]*File
lock sync.Mutex
node *dag.ProtoNode
ctx context.Context
dirbuilder *uio.Directory
modTime time.Time
name string
}
func NewDirectory(ctx context.Context, name string, node *dag.ProtoNode, parent childCloser, dserv dag.DAGService) *Directory {
return &Directory{
dserv: dserv,
ctx: ctx,
name: name,
node: node,
parent: parent,
childDirs: make(map[string]*Directory),
files: make(map[string]*File),
modTime: time.Now(),
func NewDirectory(ctx context.Context, name string, node node.Node, parent childCloser, dserv dag.DAGService) (*Directory, error) {
db, err := uio.NewDirectoryFromNode(dserv, node)
if err != nil {
return nil, err
}
return &Directory{
dserv: dserv,
ctx: ctx,
name: name,
dirbuilder: db,
parent: parent,
childDirs: make(map[string]*Directory),
files: make(map[string]*File),
modTime: time.Now(),
}, nil
}
// closeChild updates the child by the given name to the dag node 'nd'
@ -81,21 +88,26 @@ func (d *Directory) closeChildUpdate(name string, nd *dag.ProtoNode, sync bool)
}
func (d *Directory) flushCurrentNode() (*dag.ProtoNode, error) {
_, err := d.dserv.Add(d.node)
nd, err := d.dirbuilder.GetNode()
if err != nil {
return nil, err
}
return d.node.Copy().(*dag.ProtoNode), nil
_, err = d.dserv.Add(nd)
if err != nil {
return nil, err
}
pbnd, ok := nd.(*dag.ProtoNode)
if !ok {
return nil, dag.ErrNotProtobuf
}
return pbnd.Copy().(*dag.ProtoNode), nil
}
func (d *Directory) updateChild(name string, nd node.Node) error {
err := d.node.RemoveNodeLink(name)
if err != nil && err != dag.ErrNotFound {
return err
}
err = d.node.AddNodeLinkClean(name, nd)
err := d.dirbuilder.AddChild(d.ctx, name, nd)
if err != nil {
return err
}
@ -130,8 +142,12 @@ func (d *Directory) cacheNode(name string, nd node.Node) (FSNode, error) {
}
switch i.GetType() {
case ufspb.Data_Directory:
ndir := NewDirectory(d.ctx, name, nd, d, d.dserv)
case ufspb.Data_Directory, ufspb.Data_HAMTShard:
ndir, err := NewDirectory(d.ctx, name, nd, d, d.dserv)
if err != nil {
return nil, err
}
d.childDirs[name] = ndir
return ndir, nil
case ufspb.Data_File, ufspb.Data_Raw, ufspb.Data_Symlink:
@ -175,15 +191,7 @@ func (d *Directory) Uncache(name string) {
// childFromDag searches through this directories dag node for a child link
// with the given name
func (d *Directory) childFromDag(name string) (node.Node, error) {
pbn, err := d.node.GetLinkedNode(d.ctx, d.dserv, name)
switch err {
case nil:
return pbn, nil
case dag.ErrLinkNotFound:
return nil, os.ErrNotExist
default:
return nil, err
}
return d.dirbuilder.Find(d.ctx, name)
}
// childUnsync returns the child under this directory by the given name
@ -209,7 +217,7 @@ type NodeListing struct {
Hash string
}
func (d *Directory) ListNames() []string {
func (d *Directory) ListNames() ([]string, error) {
d.lock.Lock()
defer d.lock.Unlock()
@ -221,7 +229,12 @@ func (d *Directory) ListNames() []string {
names[n] = struct{}{}
}
for _, l := range d.node.Links() {
links, err := d.dirbuilder.Links()
if err != nil {
return nil, err
}
for _, l := range links {
names[l.Name] = struct{}{}
}
@ -231,7 +244,7 @@ func (d *Directory) ListNames() []string {
}
sort.Strings(out)
return out
return out, nil
}
func (d *Directory) List() ([]NodeListing, error) {
@ -239,7 +252,13 @@ func (d *Directory) List() ([]NodeListing, error) {
defer d.lock.Unlock()
var out []NodeListing
for _, l := range d.node.Links() {
links, err := d.dirbuilder.Links()
if err != nil {
return nil, err
}
for _, l := range links {
child := NodeListing{}
child.Name = l.Name
@ -285,20 +304,23 @@ func (d *Directory) Mkdir(name string) (*Directory, error) {
}
}
ndir := new(dag.ProtoNode)
ndir.SetData(ft.FolderPBData())
ndir := ft.EmptyDirNode()
_, err = d.dserv.Add(ndir)
if err != nil {
return nil, err
}
err = d.node.AddNodeLinkClean(name, ndir)
err = d.dirbuilder.AddChild(d.ctx, name, ndir)
if err != nil {
return nil, err
}
dirobj, err := NewDirectory(d.ctx, name, ndir, d, d.dserv)
if err != nil {
return nil, err
}
dirobj := NewDirectory(d.ctx, name, ndir, d, d.dserv)
d.childDirs[name] = dirobj
return dirobj, nil
}
@ -310,12 +332,7 @@ func (d *Directory) Unlink(name string) error {
delete(d.childDirs, name)
delete(d.files, name)
err := d.node.RemoveNodeLink(name)
if err != nil {
return err
}
_, err = d.dserv.Add(d.node)
err := d.dirbuilder.RemoveChild(d.ctx, name)
if err != nil {
return err
}
@ -350,7 +367,7 @@ func (d *Directory) AddChild(name string, nd node.Node) error {
return err
}
err = d.node.AddNodeLinkClean(name, nd)
err = d.dirbuilder.AddChild(d.ctx, name, nd)
if err != nil {
return err
}
@ -406,10 +423,15 @@ func (d *Directory) GetNode() (node.Node, error) {
return nil, err
}
_, err = d.dserv.Add(d.node)
nd, err := d.dirbuilder.GetNode()
if err != nil {
return nil, err
}
return d.node.Copy().(*dag.ProtoNode), nil
_, err = d.dserv.Add(nd)
if err != nil {
return nil, err
}
return nd, err
}

View File

@ -747,6 +747,67 @@ func TestMfsStress(t *testing.T) {
}
}
func TestMfsHugeDir(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, rt := setupRoot(ctx, t)
for i := 0; i < 100000; i++ {
err := Mkdir(rt, fmt.Sprintf("/dir%d", i), false, false)
if err != nil {
t.Fatal(err)
}
}
}
func TestMkdirP(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, rt := setupRoot(ctx, t)
err := Mkdir(rt, "/a/b/c/d/e/f", true, true)
if err != nil {
t.Fatal(err)
}
}
func TestConcurrentWriteAndFlush(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ds, rt := setupRoot(ctx, t)
d := mkdirP(t, rt.GetValue().(*Directory), "foo/bar/baz")
fn := fileNodeFromReader(t, ds, bytes.NewBuffer(nil))
err := d.AddChild("file", fn)
if err != nil {
t.Fatal(err)
}
nloops := 5000
wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < nloops; i++ {
err := writeFile(rt, "/foo/bar/baz/file", []byte("STUFF"))
if err != nil {
t.Error("file write failed: ", err)
return
}
}
}()
for i := 0; i < nloops; i++ {
_, err := rt.GetValue().GetNode()
if err != nil {
t.Fatal(err)
}
}
wg.Wait()
}
func TestFlushing(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -892,6 +953,70 @@ func TestConcurrentReads(t *testing.T) {
}
wg.Wait()
}
func writeFile(rt *Root, path string, data []byte) error {
n, err := Lookup(rt, path)
if err != nil {
return err
}
fi, ok := n.(*File)
if !ok {
return fmt.Errorf("expected to receive a file, but didnt get one")
}
fd, err := fi.Open(OpenWriteOnly, true)
if err != nil {
return err
}
defer fd.Close()
nw, err := fd.Write(data)
if err != nil {
return err
}
if nw != 10 {
fmt.Errorf("wrote incorrect amount")
}
return nil
}
func TestConcurrentWrites(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ds, rt := setupRoot(ctx, t)
rootdir := rt.GetValue().(*Directory)
path := "a/b/c"
d := mkdirP(t, rootdir, path)
fi := fileNodeFromReader(t, ds, bytes.NewReader(make([]byte, 0)))
err := d.AddChild("afile", fi)
if err != nil {
t.Fatal(err)
}
var wg sync.WaitGroup
nloops := 100
for i := 0; i < 10; i++ {
wg.Add(1)
go func(me int) {
defer wg.Done()
mybuf := bytes.Repeat([]byte{byte(me)}, 10)
for j := 0; j < nloops; j++ {
err := writeFile(rt, "a/b/c/afile", mybuf)
if err != nil {
t.Error("writefile failed: ", err)
return
}
}
}(i)
}
wg.Wait()
}
func TestFileDescriptors(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

View File

@ -88,7 +88,12 @@ func NewRoot(parent context.Context, ds dag.DAGService, node *dag.ProtoNode, pf
switch pbn.GetType() {
case ft.TDirectory:
root.val = NewDirectory(parent, node.String(), node, root, ds)
rval, err := NewDirectory(parent, node.String(), node, root, ds)
if err != nil {
return nil, err
}
root.val = rval
case ft.TFile, ft.TMetadata, ft.TRaw:
fi, err := NewFile(node.String(), node, root, ds)
if err != nil {

View File

@ -300,6 +300,12 @@
"hash": "QmaFNtBAXX4nVMQWbUqNysXyhevUj1k4B1y5uS45LC7Vw9",
"name": "fuse",
"version": "0.1.3"
},
{
"author": "whyrusleeping",
"hash": "QmfJHywXQu98UeZtGJBQrPAR6AtmDjjbe3qjTo9piXHPnx",
"name": "murmur3",
"version": "0.0.0"
}
],
"gxVersion": "0.10.0",

View File

@ -46,6 +46,33 @@ verify_dir_contents() {
'
}
test_sharding() {
test_expect_success "make a directory" '
ipfs files mkdir /foo
'
test_expect_success "can make 1100 files in a directory" '
printf "" > list_exp_raw
for i in `seq 1100`
do
echo $i | ipfs files write --create /foo/file$i
echo file$i >> list_exp_raw
done
'
test_expect_success "listing works" '
ipfs files ls /foo |sort > list_out &&
sort list_exp_raw > list_exp &&
test_cmp list_exp list_out
'
test_expect_success "can read a file from sharded directory" '
ipfs files read /foo/file65 > file_out &&
echo "65" > file_exp &&
test_cmp file_out file_exp
'
}
test_files_api() {
test_expect_success "can mkdir in root" '
ipfs files mkdir /cats
@ -491,5 +518,6 @@ test_launch_ipfs_daemon
ONLINE=1 # set online flag so tests can easily tell
test_files_api
test_sharding
test_kill_ipfs_daemon
test_done

View File

@ -17,6 +17,7 @@ const (
TDirectory = pb.Data_Directory
TMetadata = pb.Data_Metadata
TSymlink = pb.Data_Symlink
THAMTShard = pb.Data_HAMTShard
)
var ErrMalformedFileFormat = errors.New("malformed data in file format")

464
unixfs/hamt/hamt.go Normal file
View File

@ -0,0 +1,464 @@
package hamt
import (
"context"
"fmt"
"math"
"math/big"
"os"
dag "github.com/ipfs/go-ipfs/merkledag"
format "github.com/ipfs/go-ipfs/unixfs"
upb "github.com/ipfs/go-ipfs/unixfs/pb"
node "gx/ipfs/QmYDscK7dmdo2GZ9aumS8s5auUUAH5mR1jvj5pYhWusfK7/go-ipld-node"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
"gx/ipfs/QmfJHywXQu98UeZtGJBQrPAR6AtmDjjbe3qjTo9piXHPnx/murmur3"
)
const (
HashMurmur3 uint64 = 0x22
)
type HamtShard struct {
nd *dag.ProtoNode
bitfield *big.Int
children []child
tableSize int
tableSizeLg2 int
hashFunc uint64
prefixPadStr string
maxpadlen int
dserv dag.DAGService
}
// child can either be another shard, or a leaf node value
type child interface {
Node() (node.Node, error)
Label() string
}
func NewHamtShard(dserv dag.DAGService, size int) *HamtShard {
ds := makeHamtShard(dserv, size)
ds.bitfield = big.NewInt(0)
ds.nd = new(dag.ProtoNode)
ds.hashFunc = HashMurmur3
return ds
}
func makeHamtShard(ds dag.DAGService, size int) *HamtShard {
maxpadding := fmt.Sprintf("%X", size-1)
return &HamtShard{
tableSizeLg2: int(math.Log2(float64(size))),
prefixPadStr: fmt.Sprintf("%%0%dX", len(maxpadding)),
maxpadlen: len(maxpadding),
tableSize: size,
dserv: ds,
}
}
func NewHamtFromDag(dserv dag.DAGService, nd node.Node) (*HamtShard, error) {
pbnd, ok := nd.(*dag.ProtoNode)
if !ok {
return nil, dag.ErrLinkNotFound
}
pbd, err := format.FromBytes(pbnd.Data())
if err != nil {
return nil, err
}
if pbd.GetType() != upb.Data_HAMTShard {
return nil, fmt.Errorf("node was not a dir shard")
}
if pbd.GetHashType() != HashMurmur3 {
return nil, fmt.Errorf("only murmur3 supported as hash function")
}
ds := makeHamtShard(dserv, int(pbd.GetFanout()))
ds.nd = pbnd.Copy().(*dag.ProtoNode)
ds.children = make([]child, len(pbnd.Links()))
ds.bitfield = new(big.Int).SetBytes(pbd.GetData())
ds.hashFunc = pbd.GetHashType()
return ds, nil
}
// Node serializes the HAMT structure into a merkledag node with unixfs formatting
func (ds *HamtShard) Node() (node.Node, error) {
out := new(dag.ProtoNode)
// TODO: optimized 'for each set bit'
for i := 0; i < ds.tableSize; i++ {
if ds.bitfield.Bit(i) == 0 {
continue
}
cindex := ds.indexForBitPos(i)
ch := ds.children[cindex]
if ch != nil {
cnd, err := ch.Node()
if err != nil {
return nil, err
}
err = out.AddNodeLinkClean(ds.linkNamePrefix(i)+ch.Label(), cnd)
if err != nil {
return nil, err
}
} else {
// child unloaded, just copy in link with updated name
lnk := ds.nd.Links()[cindex]
label := lnk.Name[ds.maxpadlen:]
err := out.AddRawLink(ds.linkNamePrefix(i)+label, lnk)
if err != nil {
return nil, err
}
}
}
typ := upb.Data_HAMTShard
data, err := proto.Marshal(&upb.Data{
Type: &typ,
Fanout: proto.Uint64(uint64(ds.tableSize)),
HashType: proto.Uint64(HashMurmur3),
Data: ds.bitfield.Bytes(),
})
if err != nil {
return nil, err
}
out.SetData(data)
_, err = ds.dserv.Add(out)
if err != nil {
return nil, err
}
return out, nil
}
type shardValue struct {
key string
val node.Node
}
func (sv *shardValue) Node() (node.Node, error) {
return sv.val, nil
}
func (sv *shardValue) Label() string {
return sv.key
}
func hash(val []byte) []byte {
h := murmur3.New64()
h.Write(val)
return h.Sum(nil)
}
// Label for HamtShards is the empty string, this is used to differentiate them from
// value entries
func (ds *HamtShard) Label() string {
return ""
}
// Set sets 'name' = nd in the HAMT
func (ds *HamtShard) Set(ctx context.Context, name string, nd node.Node) error {
hv := &hashBits{b: hash([]byte(name))}
return ds.modifyValue(ctx, hv, name, nd)
}
// Remove deletes the named entry if it exists, this operation is idempotent.
func (ds *HamtShard) Remove(ctx context.Context, name string) error {
hv := &hashBits{b: hash([]byte(name))}
return ds.modifyValue(ctx, hv, name, nil)
}
func (ds *HamtShard) Find(ctx context.Context, name string) (node.Node, error) {
hv := &hashBits{b: hash([]byte(name))}
var out node.Node
err := ds.getValue(ctx, hv, name, func(sv *shardValue) error {
out = sv.val
return nil
})
return out, err
}
// getChild returns the i'th child of this shard. If it is cached in the
// children array, it will return it from there. Otherwise, it loads the child
// node from disk.
func (ds *HamtShard) getChild(ctx context.Context, i int) (child, error) {
if i >= len(ds.children) || i < 0 {
return nil, fmt.Errorf("invalid index passed to getChild (likely corrupt bitfield)")
}
if len(ds.children) != len(ds.nd.Links()) {
return nil, fmt.Errorf("inconsistent lengths between children array and Links array")
}
c := ds.children[i]
if c != nil {
return c, nil
}
return ds.loadChild(ctx, i)
}
// loadChild reads the i'th child node of this shard from disk and returns it
// as a 'child' interface
func (ds *HamtShard) loadChild(ctx context.Context, i int) (child, error) {
lnk := ds.nd.Links()[i]
if len(lnk.Name) < ds.maxpadlen {
return nil, fmt.Errorf("invalid link name '%s'", lnk.Name)
}
nd, err := lnk.GetNode(ctx, ds.dserv)
if err != nil {
return nil, err
}
var c child
if len(lnk.Name) == ds.maxpadlen {
pbnd, ok := nd.(*dag.ProtoNode)
if !ok {
return nil, dag.ErrNotProtobuf
}
pbd, err := format.FromBytes(pbnd.Data())
if err != nil {
return nil, err
}
if pbd.GetType() != format.THAMTShard {
return nil, fmt.Errorf("HAMT entries must have non-zero length name")
}
cds, err := NewHamtFromDag(ds.dserv, nd)
if err != nil {
return nil, err
}
c = cds
} else {
c = &shardValue{
key: lnk.Name[ds.maxpadlen:],
val: nd,
}
}
ds.children[i] = c
return c, nil
}
func (ds *HamtShard) setChild(i int, c child) {
ds.children[i] = c
}
func (ds *HamtShard) insertChild(idx int, key string, val node.Node) error {
if val == nil {
return os.ErrNotExist
}
i := ds.indexForBitPos(idx)
ds.bitfield.SetBit(ds.bitfield, idx, 1)
sv := &shardValue{
key: key,
val: val,
}
ds.children = append(ds.children[:i], append([]child{sv}, ds.children[i:]...)...)
ds.nd.SetLinks(append(ds.nd.Links()[:i], append([]*node.Link{nil}, ds.nd.Links()[i:]...)...))
return nil
}
func (ds *HamtShard) rmChild(i int) error {
if i < 0 || i >= len(ds.children) || i >= len(ds.nd.Links()) {
return fmt.Errorf("hamt: attempted to remove child with out of range index")
}
copy(ds.children[i:], ds.children[i+1:])
ds.children = ds.children[:len(ds.children)-1]
copy(ds.nd.Links()[i:], ds.nd.Links()[i+1:])
ds.nd.SetLinks(ds.nd.Links()[:len(ds.nd.Links())-1])
return nil
}
func (ds *HamtShard) getValue(ctx context.Context, hv *hashBits, key string, cb func(*shardValue) error) error {
idx := hv.Next(ds.tableSizeLg2)
if ds.bitfield.Bit(int(idx)) == 1 {
cindex := ds.indexForBitPos(idx)
child, err := ds.getChild(ctx, cindex)
if err != nil {
return err
}
switch child := child.(type) {
case *HamtShard:
return child.getValue(ctx, hv, key, cb)
case *shardValue:
if child.key == key {
return cb(child)
}
}
}
return os.ErrNotExist
}
func (ds *HamtShard) EnumLinks() ([]*node.Link, error) {
var links []*node.Link
err := ds.walkTrie(func(sv *shardValue) error {
lnk, err := node.MakeLink(sv.val)
if err != nil {
return err
}
lnk.Name = sv.key
links = append(links, lnk)
return nil
})
if err != nil {
return nil, err
}
return links, nil
}
func (ds *HamtShard) walkTrie(cb func(*shardValue) error) error {
for i := 0; i < ds.tableSize; i++ {
if ds.bitfield.Bit(i) == 0 {
continue
}
idx := ds.indexForBitPos(i)
// NOTE: an optimized version could simply iterate over each
// element in the 'children' array.
c, err := ds.getChild(context.TODO(), idx)
if err != nil {
return err
}
switch c := c.(type) {
case *shardValue:
err := cb(c)
if err != nil {
return err
}
case *HamtShard:
err := c.walkTrie(cb)
if err != nil {
return err
}
default:
return fmt.Errorf("unexpected child type: %#v", c)
}
}
return nil
}
func (ds *HamtShard) modifyValue(ctx context.Context, hv *hashBits, key string, val node.Node) error {
idx := hv.Next(ds.tableSizeLg2)
if ds.bitfield.Bit(idx) != 1 {
return ds.insertChild(idx, key, val)
}
cindex := ds.indexForBitPos(idx)
child, err := ds.getChild(ctx, cindex)
if err != nil {
return err
}
switch child := child.(type) {
case *HamtShard:
err := child.modifyValue(ctx, hv, key, val)
if err != nil {
return err
}
if val == nil {
switch len(child.children) {
case 0:
// empty sub-shard, prune it
// Note: this shouldnt normally ever happen
// in the event of another implementation creates flawed
// structures, this will help to normalize them.
ds.bitfield.SetBit(ds.bitfield, idx, 0)
return ds.rmChild(cindex)
case 1:
nchild, ok := child.children[0].(*shardValue)
if ok {
// sub-shard with a single value element, collapse it
ds.setChild(cindex, nchild)
}
return nil
}
}
return nil
case *shardValue:
switch {
case val == nil: // passing a nil value signifies a 'delete'
ds.bitfield.SetBit(ds.bitfield, idx, 0)
return ds.rmChild(cindex)
case child.key == key: // value modification
child.val = val
return nil
default: // replace value with another shard, one level deeper
ns := NewHamtShard(ds.dserv, ds.tableSize)
chhv := &hashBits{
b: hash([]byte(child.key)),
consumed: hv.consumed,
}
err := ns.modifyValue(ctx, hv, key, val)
if err != nil {
return err
}
err = ns.modifyValue(ctx, chhv, child.key, child.val)
if err != nil {
return err
}
ds.setChild(cindex, ns)
return nil
}
default:
return fmt.Errorf("unexpected type for child: %#v", child)
}
}
func (ds *HamtShard) indexForBitPos(bp int) int {
// TODO: an optimization could reuse the same 'mask' here and change the size
// as needed. This isnt yet done as the bitset package doesnt make it easy
// to do.
mask := new(big.Int).Sub(new(big.Int).Exp(big.NewInt(2), big.NewInt(int64(bp)), nil), big.NewInt(1))
mask.And(mask, ds.bitfield)
return popCount(mask)
}
// linkNamePrefix takes in the bitfield index of an entry and returns its hex prefix
func (ds *HamtShard) linkNamePrefix(idx int) string {
return fmt.Sprintf(ds.prefixPadStr, idx)
}

View File

@ -0,0 +1,280 @@
package hamt
import (
"bufio"
"context"
"fmt"
"math/rand"
"os"
"strconv"
"strings"
"testing"
"time"
dag "github.com/ipfs/go-ipfs/merkledag"
mdtest "github.com/ipfs/go-ipfs/merkledag/test"
ft "github.com/ipfs/go-ipfs/unixfs"
)
func getNames(prefix string, count int) []string {
out := make([]string, count)
for i := 0; i < count; i++ {
out[i] = fmt.Sprintf("%s%d", prefix, i)
}
return out
}
const (
opAdd = iota
opDel
opFind
)
type testOp struct {
Op int
Val string
}
func stringArrToSet(arr []string) map[string]bool {
out := make(map[string]bool)
for _, s := range arr {
out[s] = true
}
return out
}
// generate two different random sets of operations to result in the same
// ending directory (same set of entries at the end) and execute each of them
// in turn, then compare to ensure the output is the same on each.
func TestOrderConsistency(t *testing.T) {
seed := time.Now().UnixNano()
t.Logf("using seed = %d", seed)
ds := mdtest.Mock()
shardWidth := 1024
keep := getNames("good", 4000)
temp := getNames("tempo", 6000)
ops := genOpSet(seed, keep, temp)
s, err := executeOpSet(t, ds, shardWidth, ops)
if err != nil {
t.Fatal(err)
}
err = validateOpSetCompletion(t, s, keep, temp)
if err != nil {
t.Fatal(err)
}
ops2 := genOpSet(seed+1000, keep, temp)
s2, err := executeOpSet(t, ds, shardWidth, ops2)
if err != nil {
t.Fatal(err)
}
err = validateOpSetCompletion(t, s2, keep, temp)
if err != nil {
t.Fatal(err)
}
nd, err := s.Node()
if err != nil {
t.Fatal(err)
}
nd2, err := s2.Node()
if err != nil {
t.Fatal(err)
}
k := nd.Cid()
k2 := nd2.Cid()
if !k.Equals(k2) {
t.Fatal("got different results: ", k, k2)
}
}
func validateOpSetCompletion(t *testing.T, s *HamtShard, keep, temp []string) error {
ctx := context.TODO()
for _, n := range keep {
_, err := s.Find(ctx, n)
if err != nil {
return fmt.Errorf("couldnt find %s: %s", n, err)
}
}
for _, n := range temp {
_, err := s.Find(ctx, n)
if err != os.ErrNotExist {
return fmt.Errorf("expected not to find: %s", err)
}
}
return nil
}
func executeOpSet(t *testing.T, ds dag.DAGService, width int, ops []testOp) (*HamtShard, error) {
ctx := context.TODO()
s := NewHamtShard(ds, width)
e := ft.EmptyDirNode()
ds.Add(e)
for _, o := range ops {
switch o.Op {
case opAdd:
err := s.Set(ctx, o.Val, e)
if err != nil {
return nil, fmt.Errorf("inserting %s: %s", o.Val, err)
}
case opDel:
err := s.Remove(ctx, o.Val)
if err != nil {
return nil, fmt.Errorf("deleting %s: %s", o.Val, err)
}
case opFind:
_, err := s.Find(ctx, o.Val)
if err != nil {
return nil, fmt.Errorf("finding %s: %s", o.Val, err)
}
}
}
return s, nil
}
func genOpSet(seed int64, keep, temp []string) []testOp {
tempset := stringArrToSet(temp)
allnames := append(keep, temp...)
shuffle(seed, allnames)
var todel []string
var ops []testOp
for {
n := len(allnames) + len(todel)
if n == 0 {
return ops
}
rn := rand.Intn(n)
if rn < len(allnames) {
next := allnames[0]
allnames = allnames[1:]
ops = append(ops, testOp{
Op: opAdd,
Val: next,
})
if tempset[next] {
todel = append(todel, next)
}
} else {
shuffle(seed+100, todel)
next := todel[0]
todel = todel[1:]
ops = append(ops, testOp{
Op: opDel,
Val: next,
})
}
}
}
// executes the given op set with a repl to allow easier debugging
func debugExecuteOpSet(ds dag.DAGService, width int, ops []testOp) (*HamtShard, error) {
s := NewHamtShard(ds, width)
e := ft.EmptyDirNode()
ds.Add(e)
ctx := context.TODO()
run := 0
opnames := map[int]string{
opAdd: "add",
opDel: "del",
}
mainloop:
for i := 0; i < len(ops); i++ {
o := ops[i]
fmt.Printf("Op %d: %s %s\n", i, opnames[o.Op], o.Val)
for run == 0 {
cmd := readCommand()
parts := strings.Split(cmd, " ")
switch parts[0] {
case "":
run = 1
case "find":
_, err := s.Find(ctx, parts[1])
if err == nil {
fmt.Println("success")
} else {
fmt.Println(err)
}
case "run":
if len(parts) > 1 {
n, err := strconv.Atoi(parts[1])
if err != nil {
panic(err)
}
run = n
} else {
run = -1
}
case "lookop":
for k := 0; k < len(ops); k++ {
if ops[k].Val == parts[1] {
fmt.Printf(" Op %d: %s %s\n", k, opnames[ops[k].Op], parts[1])
}
}
case "restart":
s = NewHamtShard(ds, width)
i = -1
continue mainloop
case "print":
nd, err := s.Node()
if err != nil {
panic(err)
}
printDag(ds, nd.(*dag.ProtoNode), 0)
}
}
run--
switch o.Op {
case opAdd:
err := s.Set(ctx, o.Val, e)
if err != nil {
return nil, fmt.Errorf("inserting %s: %s", o.Val, err)
}
case opDel:
fmt.Println("deleting: ", o.Val)
err := s.Remove(ctx, o.Val)
if err != nil {
return nil, fmt.Errorf("deleting %s: %s", o.Val, err)
}
case opFind:
_, err := s.Find(ctx, o.Val)
if err != nil {
return nil, fmt.Errorf("finding %s: %s", o.Val, err)
}
}
}
return s, nil
}
func readCommand() string {
fmt.Print("> ")
scan := bufio.NewScanner(os.Stdin)
scan.Scan()
return scan.Text()
}

552
unixfs/hamt/hamt_test.go Normal file
View File

@ -0,0 +1,552 @@
package hamt
import (
"context"
"fmt"
"math/rand"
"os"
"sort"
"strings"
"testing"
"time"
dag "github.com/ipfs/go-ipfs/merkledag"
mdtest "github.com/ipfs/go-ipfs/merkledag/test"
dagutils "github.com/ipfs/go-ipfs/merkledag/utils"
ft "github.com/ipfs/go-ipfs/unixfs"
)
func shuffle(seed int64, arr []string) {
r := rand.New(rand.NewSource(seed))
for i := 0; i < len(arr); i++ {
a := r.Intn(len(arr))
b := r.Intn(len(arr))
arr[a], arr[b] = arr[b], arr[a]
}
}
func makeDir(ds dag.DAGService, size int) ([]string, *HamtShard, error) {
return makeDirWidth(ds, size, 256)
}
func makeDirWidth(ds dag.DAGService, size, width int) ([]string, *HamtShard, error) {
s := NewHamtShard(ds, width)
var dirs []string
for i := 0; i < size; i++ {
dirs = append(dirs, fmt.Sprintf("DIRNAME%d", i))
}
shuffle(time.Now().UnixNano(), dirs)
for i := 0; i < len(dirs); i++ {
nd := ft.EmptyDirNode()
ds.Add(nd)
err := s.Set(context.Background(), dirs[i], nd)
if err != nil {
return nil, nil, err
}
}
return dirs, s, nil
}
func assertLink(s *HamtShard, name string, found bool) error {
_, err := s.Find(context.Background(), name)
switch err {
case os.ErrNotExist:
if found {
return err
}
return nil
case nil:
if found {
return nil
}
return fmt.Errorf("expected not to find link named %s", name)
default:
return err
}
}
func assertSerializationWorks(ds dag.DAGService, s *HamtShard) error {
nd, err := s.Node()
if err != nil {
return err
}
nds, err := NewHamtFromDag(ds, nd)
if err != nil {
return err
}
linksA, err := s.EnumLinks()
if err != nil {
return err
}
linksB, err := nds.EnumLinks()
if err != nil {
return err
}
if len(linksA) != len(linksB) {
return fmt.Errorf("links arrays are different sizes")
}
for i, a := range linksA {
b := linksB[i]
if a.Name != b.Name {
return fmt.Errorf("links names mismatch")
}
if a.Cid.String() != b.Cid.String() {
return fmt.Errorf("link hashes dont match")
}
if a.Size != b.Size {
return fmt.Errorf("link sizes not the same")
}
}
return nil
}
func TestBasicSet(t *testing.T) {
ds := mdtest.Mock()
for _, w := range []int{128, 256, 512, 1024, 2048, 4096} {
t.Run(fmt.Sprintf("BasicSet%d", w), func(t *testing.T) {
names, s, err := makeDirWidth(ds, 1000, w)
if err != nil {
t.Fatal(err)
}
ctx := context.Background()
for _, d := range names {
_, err := s.Find(ctx, d)
if err != nil {
t.Fatal(err)
}
}
})
}
}
func TestDirBuilding(t *testing.T) {
ds := mdtest.Mock()
s := NewHamtShard(ds, 256)
_, s, err := makeDir(ds, 200)
if err != nil {
t.Fatal(err)
}
nd, err := s.Node()
if err != nil {
t.Fatal(err)
}
//printDag(ds, nd, 0)
k := nd.Cid()
if k.String() != "QmY89TkSEVHykWMHDmyejSWFj9CYNtvzw4UwnT9xbc4Zjc" {
t.Fatalf("output didnt match what we expected (got %s)", k.String())
}
}
func TestShardReload(t *testing.T) {
ds := mdtest.Mock()
s := NewHamtShard(ds, 256)
ctx := context.Background()
_, s, err := makeDir(ds, 200)
if err != nil {
t.Fatal(err)
}
nd, err := s.Node()
if err != nil {
t.Fatal(err)
}
nds, err := NewHamtFromDag(ds, nd)
if err != nil {
t.Fatal(err)
}
lnks, err := nds.EnumLinks()
if err != nil {
t.Fatal(err)
}
if len(lnks) != 200 {
t.Fatal("not enough links back")
}
_, err = nds.Find(ctx, "DIRNAME50")
if err != nil {
t.Fatal(err)
}
// Now test roundtrip marshal with no operations
nds, err = NewHamtFromDag(ds, nd)
if err != nil {
t.Fatal(err)
}
ond, err := nds.Node()
if err != nil {
t.Fatal(err)
}
outk := ond.Cid()
ndk := nd.Cid()
if !outk.Equals(ndk) {
printDiff(ds, nd.(*dag.ProtoNode), ond.(*dag.ProtoNode))
t.Fatal("roundtrip serialization failed")
}
}
func TestRemoveElems(t *testing.T) {
ds := mdtest.Mock()
dirs, s, err := makeDir(ds, 500)
if err != nil {
t.Fatal(err)
}
ctx := context.Background()
shuffle(time.Now().UnixNano(), dirs)
for _, d := range dirs {
err := s.Remove(ctx, d)
if err != nil {
t.Fatal(err)
}
}
nd, err := s.Node()
if err != nil {
t.Fatal(err)
}
if len(nd.Links()) > 0 {
t.Fatal("shouldnt have any links here")
}
err = s.Remove(ctx, "doesnt exist")
if err != os.ErrNotExist {
t.Fatal("expected error does not exist")
}
}
func TestSetAfterMarshal(t *testing.T) {
ds := mdtest.Mock()
_, s, err := makeDir(ds, 300)
if err != nil {
t.Fatal(err)
}
ctx := context.Background()
nd, err := s.Node()
if err != nil {
t.Fatal(err)
}
nds, err := NewHamtFromDag(ds, nd)
if err != nil {
t.Fatal(err)
}
empty := ft.EmptyDirNode()
for i := 0; i < 100; i++ {
err := nds.Set(ctx, fmt.Sprintf("moredirs%d", i), empty)
if err != nil {
t.Fatal(err)
}
}
links, err := nds.EnumLinks()
if err != nil {
t.Fatal(err)
}
if len(links) != 400 {
t.Fatal("expected 400 links")
}
err = assertSerializationWorks(ds, nds)
if err != nil {
t.Fatal(err)
}
}
func TestDuplicateAddShard(t *testing.T) {
ds := mdtest.Mock()
dir := NewHamtShard(ds, 256)
nd := new(dag.ProtoNode)
ctx := context.Background()
err := dir.Set(ctx, "test", nd)
if err != nil {
t.Fatal(err)
}
err = dir.Set(ctx, "test", nd)
if err != nil {
t.Fatal(err)
}
lnks, err := dir.EnumLinks()
if err != nil {
t.Fatal(err)
}
if len(lnks) != 1 {
t.Fatal("expected only one link")
}
}
func TestLoadFailsFromNonShard(t *testing.T) {
ds := mdtest.Mock()
nd := ft.EmptyDirNode()
_, err := NewHamtFromDag(ds, nd)
if err == nil {
t.Fatal("expected dir shard creation to fail when given normal directory")
}
nd = new(dag.ProtoNode)
_, err = NewHamtFromDag(ds, nd)
if err == nil {
t.Fatal("expected dir shard creation to fail when given normal directory")
}
}
func TestFindNonExisting(t *testing.T) {
ds := mdtest.Mock()
_, s, err := makeDir(ds, 100)
if err != nil {
t.Fatal(err)
}
ctx := context.Background()
for i := 0; i < 200; i++ {
_, err := s.Find(ctx, fmt.Sprintf("notfound%d", i))
if err != os.ErrNotExist {
t.Fatal("expected ErrNotExist")
}
}
}
func TestRemoveElemsAfterMarshal(t *testing.T) {
ds := mdtest.Mock()
dirs, s, err := makeDir(ds, 30)
if err != nil {
t.Fatal(err)
}
ctx := context.Background()
sort.Strings(dirs)
err = s.Remove(ctx, dirs[0])
if err != nil {
t.Fatal(err)
}
out, err := s.Find(ctx, dirs[0])
if err == nil {
t.Fatal("expected error, got: ", out)
}
nd, err := s.Node()
if err != nil {
t.Fatal(err)
}
nds, err := NewHamtFromDag(ds, nd)
if err != nil {
t.Fatal(err)
}
_, err = nds.Find(ctx, dirs[0])
if err == nil {
t.Fatal("expected not to find ", dirs[0])
}
for _, d := range dirs[1:] {
_, err := nds.Find(ctx, d)
if err != nil {
t.Fatal("could not find expected link after unmarshaling")
}
}
for _, d := range dirs[1:] {
err := nds.Remove(ctx, d)
if err != nil {
t.Fatal(err)
}
}
links, err := nds.EnumLinks()
if err != nil {
t.Fatal(err)
}
if len(links) != 0 {
t.Fatal("expected all links to be removed")
}
err = assertSerializationWorks(ds, nds)
if err != nil {
t.Fatal(err)
}
}
func TestBitfieldIndexing(t *testing.T) {
ds := mdtest.Mock()
s := NewHamtShard(ds, 256)
set := func(i int) {
s.bitfield.SetBit(s.bitfield, i, 1)
}
assert := func(i int, val int) {
if s.indexForBitPos(i) != val {
t.Fatalf("expected index %d to be %d", i, val)
}
}
assert(50, 0)
set(4)
set(5)
set(60)
assert(10, 2)
set(3)
assert(10, 3)
assert(1, 0)
assert(100, 4)
set(50)
assert(45, 3)
set(100)
assert(100, 5)
}
// test adding a sharded directory node as the child of another directory node.
// if improperly implemented, the parent hamt may assume the child is a part of
// itself.
func TestSetHamtChild(t *testing.T) {
ds := mdtest.Mock()
s := NewHamtShard(ds, 256)
ctx := context.Background()
e := ft.EmptyDirNode()
ds.Add(e)
err := s.Set(ctx, "bar", e)
if err != nil {
t.Fatal(err)
}
snd, err := s.Node()
if err != nil {
t.Fatal(err)
}
_, ns, err := makeDir(ds, 50)
if err != nil {
t.Fatal(err)
}
err = ns.Set(ctx, "foo", snd)
if err != nil {
t.Fatal(err)
}
nsnd, err := ns.Node()
if err != nil {
t.Fatal(err)
}
hs, err := NewHamtFromDag(ds, nsnd)
if err != nil {
t.Fatal(err)
}
err = assertLink(hs, "bar", false)
if err != nil {
t.Fatal(err)
}
err = assertLink(hs, "foo", true)
if err != nil {
t.Fatal(err)
}
}
func printDag(ds dag.DAGService, nd *dag.ProtoNode, depth int) {
padding := strings.Repeat(" ", depth)
fmt.Println("{")
for _, l := range nd.Links() {
fmt.Printf("%s%s: %s", padding, l.Name, l.Cid.String())
ch, err := ds.Get(context.Background(), l.Cid)
if err != nil {
panic(err)
}
printDag(ds, ch.(*dag.ProtoNode), depth+1)
}
fmt.Println(padding + "}")
}
func printDiff(ds dag.DAGService, a, b *dag.ProtoNode) {
diff, err := dagutils.Diff(context.TODO(), ds, a, b)
if err != nil {
panic(err)
}
for _, d := range diff {
fmt.Println(d)
}
}
func BenchmarkHAMTSet(b *testing.B) {
ds := mdtest.Mock()
sh := NewHamtShard(ds, 256)
nd, err := sh.Node()
if err != nil {
b.Fatal(err)
}
_, err = ds.Add(nd)
if err != nil {
b.Fatal(err)
}
ds.Add(ft.EmptyDirNode())
for i := 0; i < b.N; i++ {
s, err := NewHamtFromDag(ds, nd)
if err != nil {
b.Fatal(err)
}
err = s.Set(context.TODO(), fmt.Sprint(i), ft.EmptyDirNode())
if err != nil {
b.Fatal(err)
}
out, err := s.Node()
if err != nil {
b.Fatal(err)
}
nd = out
}
}

61
unixfs/hamt/util.go Normal file
View File

@ -0,0 +1,61 @@
package hamt
import (
"math/big"
)
type hashBits struct {
b []byte
consumed int
}
func mkmask(n int) byte {
return (1 << uint(n)) - 1
}
func (hb *hashBits) Next(i int) int {
curbi := hb.consumed / 8
leftb := 8 - (hb.consumed % 8)
curb := hb.b[curbi]
if i == leftb {
out := int(mkmask(i) & curb)
hb.consumed += i
return out
} else if i < leftb {
a := curb & mkmask(leftb) // mask out the high bits we don't want
b := a & ^mkmask(leftb-i) // mask out the low bits we don't want
c := b >> uint(leftb-i) // shift whats left down
hb.consumed += i
return int(c)
} else {
out := int(mkmask(leftb) & curb)
out <<= uint(i - leftb)
hb.consumed += leftb
out += hb.Next(i - leftb)
return out
}
}
const (
m1 = 0x5555555555555555 //binary: 0101...
m2 = 0x3333333333333333 //binary: 00110011..
m4 = 0x0f0f0f0f0f0f0f0f //binary: 4 zeros, 4 ones ...
h01 = 0x0101010101010101 //the sum of 256 to the power of 0,1,2,3...
)
// from https://en.wikipedia.org/wiki/Hamming_weight
func popCountUint64(x uint64) int {
x -= (x >> 1) & m1 //put count of each 2 bits into those 2 bits
x = (x & m2) + ((x >> 2) & m2) //put count of each 4 bits into those 4 bits
x = (x + (x >> 4)) & m4 //put count of each 8 bits into those 8 bits
return int((x * h01) >> 56)
}
func popCount(i *big.Int) int {
var n int
for _, v := range i.Bits() {
n += popCountUint64(uint64(v))
}
return n
}

58
unixfs/hamt/util_test.go Normal file
View File

@ -0,0 +1,58 @@
package hamt
import (
"math/big"
"testing"
)
func TestPopCount(t *testing.T) {
x := big.NewInt(0)
for i := 0; i < 50; i++ {
x.SetBit(x, i, 1)
}
if popCount(x) != 50 {
t.Fatal("expected popcount to be 50")
}
}
func TestHashBitsEvenSizes(t *testing.T) {
buf := []byte{255, 127, 79, 45, 116, 99, 35, 17}
hb := hashBits{b: buf}
for _, v := range buf {
if hb.Next(8) != int(v) {
t.Fatal("got wrong numbers back")
}
}
}
func TestHashBitsUneven(t *testing.T) {
buf := []byte{255, 127, 79, 45, 116, 99, 35, 17}
hb := hashBits{b: buf}
v := hb.Next(4)
if v != 15 {
t.Fatal("should have gotten 15: ", v)
}
v = hb.Next(4)
if v != 15 {
t.Fatal("should have gotten 15: ", v)
}
if v := hb.Next(3); v != 3 {
t.Fatalf("expected 3, but got %b", v)
}
if v := hb.Next(3); v != 7 {
t.Fatalf("expected 7, but got %b", v)
}
if v := hb.Next(3); v != 6 {
t.Fatalf("expected 6, but got %b", v)
}
if v := hb.Next(15); v != 20269 {
t.Fatalf("expected 20269, but got %b (%d)", v, v)
}
}

View File

@ -2,48 +2,144 @@ package io
import (
"context"
"fmt"
"os"
mdag "github.com/ipfs/go-ipfs/merkledag"
format "github.com/ipfs/go-ipfs/unixfs"
cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
hamt "github.com/ipfs/go-ipfs/unixfs/hamt"
node "gx/ipfs/QmYDscK7dmdo2GZ9aumS8s5auUUAH5mR1jvj5pYhWusfK7/go-ipld-node"
)
type directoryBuilder struct {
// ShardSplitThreshold specifies how large of an unsharded directory
// the Directory code will generate. Adding entries over this value will
// result in the node being restructured into a sharded object.
var ShardSplitThreshold = 1000
// DefaultShardWidth is the default value used for hamt sharding width.
var DefaultShardWidth = 256
type Directory struct {
dserv mdag.DAGService
dirnode *mdag.ProtoNode
shard *hamt.HamtShard
}
// NewEmptyDirectory returns an empty merkledag Node with a folder Data chunk
func NewEmptyDirectory() *mdag.ProtoNode {
nd := new(mdag.ProtoNode)
nd.SetData(format.FolderPBData())
return nd
}
// NewDirectory returns a directoryBuilder. It needs a DAGService to add the Children
func NewDirectory(dserv mdag.DAGService) *directoryBuilder {
db := new(directoryBuilder)
// NewDirectory returns a Directory. It needs a DAGService to add the Children
func NewDirectory(dserv mdag.DAGService) *Directory {
db := new(Directory)
db.dserv = dserv
db.dirnode = NewEmptyDirectory()
db.dirnode = format.EmptyDirNode()
return db
}
// AddChild adds a (name, key)-pair to the root node.
func (d *directoryBuilder) AddChild(ctx context.Context, name string, c *cid.Cid) error {
cnode, err := d.dserv.Get(ctx, c)
if err != nil {
return err
}
cnpb, ok := cnode.(*mdag.ProtoNode)
func NewDirectoryFromNode(dserv mdag.DAGService, nd node.Node) (*Directory, error) {
pbnd, ok := nd.(*mdag.ProtoNode)
if !ok {
return mdag.ErrNotProtobuf
return nil, mdag.ErrNotProtobuf
}
return d.dirnode.AddNodeLinkClean(name, cnpb)
pbd, err := format.FromBytes(pbnd.Data())
if err != nil {
return nil, err
}
switch pbd.GetType() {
case format.TDirectory:
return &Directory{
dserv: dserv,
dirnode: pbnd.Copy().(*mdag.ProtoNode),
}, nil
case format.THAMTShard:
shard, err := hamt.NewHamtFromDag(dserv, nd)
if err != nil {
return nil, err
}
return &Directory{
dserv: dserv,
shard: shard,
}, nil
default:
return nil, fmt.Errorf("merkledag node was not a directory or shard")
}
}
// GetNode returns the root of this directoryBuilder
func (d *directoryBuilder) GetNode() *mdag.ProtoNode {
return d.dirnode
// AddChild adds a (name, key)-pair to the root node.
func (d *Directory) AddChild(ctx context.Context, name string, nd node.Node) error {
if d.shard == nil {
if len(d.dirnode.Links()) < ShardSplitThreshold {
_ = d.dirnode.RemoveNodeLink(name)
return d.dirnode.AddNodeLinkClean(name, nd)
}
err := d.switchToSharding(ctx)
if err != nil {
return err
}
}
return d.shard.Set(ctx, name, nd)
}
func (d *Directory) switchToSharding(ctx context.Context) error {
d.shard = hamt.NewHamtShard(d.dserv, DefaultShardWidth)
for _, lnk := range d.dirnode.Links() {
cnd, err := d.dserv.Get(ctx, lnk.Cid)
if err != nil {
return err
}
err = d.shard.Set(ctx, lnk.Name, cnd)
if err != nil {
return err
}
}
d.dirnode = nil
return nil
}
func (d *Directory) Links() ([]*node.Link, error) {
if d.shard == nil {
return d.dirnode.Links(), nil
}
return d.shard.EnumLinks()
}
func (d *Directory) Find(ctx context.Context, name string) (node.Node, error) {
if d.shard == nil {
lnk, err := d.dirnode.GetNodeLink(name)
switch err {
case mdag.ErrLinkNotFound:
return nil, os.ErrNotExist
default:
return nil, err
case nil:
}
return d.dserv.Get(ctx, lnk.Cid)
}
return d.shard.Find(ctx, name)
}
func (d *Directory) RemoveChild(ctx context.Context, name string) error {
if d.shard == nil {
return d.dirnode.RemoveNodeLink(name)
}
return d.shard.Remove(ctx, name)
}
// GetNode returns the root of this Directory
func (d *Directory) GetNode() (node.Node, error) {
if d.shard == nil {
return d.dirnode, nil
}
return d.shard.Node()
}

View File

@ -2,49 +2,157 @@ package io
import (
"context"
"io/ioutil"
"fmt"
"testing"
testu "github.com/ipfs/go-ipfs/unixfs/test"
mdtest "github.com/ipfs/go-ipfs/merkledag/test"
ft "github.com/ipfs/go-ipfs/unixfs"
)
func TestEmptyNode(t *testing.T) {
n := NewEmptyDirectory()
n := ft.EmptyDirNode()
if len(n.Links()) != 0 {
t.Fatal("empty node should have 0 links")
}
}
func TestDirBuilder(t *testing.T) {
dserv := testu.GetDAGServ()
ctx, closer := context.WithCancel(context.Background())
defer closer()
inbuf, node := testu.GetRandomNode(t, dserv, 1024)
key := node.Cid()
func TestDirectoryGrowth(t *testing.T) {
ds := mdtest.Mock()
dir := NewDirectory(ds)
ctx := context.Background()
b := NewDirectory(dserv)
d := ft.EmptyDirNode()
ds.Add(d)
b.AddChild(ctx, "random", key)
nelems := 10000
dir := b.GetNode()
outn, err := dir.GetLinkedProtoNode(ctx, dserv, "random")
for i := 0; i < nelems; i++ {
err := dir.AddChild(ctx, fmt.Sprintf("dir%d", i), d)
if err != nil {
t.Fatal(err)
}
}
_, err := dir.GetNode()
if err != nil {
t.Fatal(err)
}
reader, err := NewDagReader(ctx, outn, dserv)
links, err := dir.Links()
if err != nil {
t.Fatal(err)
}
outbuf, err := ioutil.ReadAll(reader)
if err != nil {
t.Fatal(err)
if len(links) != nelems {
t.Fatal("didnt get right number of elements")
}
err = testu.ArrComp(inbuf, outbuf)
if err != nil {
t.Fatal(err)
dirc := d.Cid()
names := make(map[string]bool)
for _, l := range links {
names[l.Name] = true
if !l.Cid.Equals(dirc) {
t.Fatal("link wasnt correct")
}
}
for i := 0; i < nelems; i++ {
dn := fmt.Sprintf("dir%d", i)
if !names[dn] {
t.Fatal("didnt find directory: ", dn)
}
_, err := dir.Find(context.Background(), dn)
if err != nil {
t.Fatal(err)
}
}
}
func TestDuplicateAddDir(t *testing.T) {
ds := mdtest.Mock()
dir := NewDirectory(ds)
ctx := context.Background()
nd := ft.EmptyDirNode()
err := dir.AddChild(ctx, "test", nd)
if err != nil {
t.Fatal(err)
}
err = dir.AddChild(ctx, "test", nd)
if err != nil {
t.Fatal(err)
}
lnks, err := dir.Links()
if err != nil {
t.Fatal(err)
}
if len(lnks) != 1 {
t.Fatal("expected only one link")
}
}
func TestDirBuilder(t *testing.T) {
ds := mdtest.Mock()
dir := NewDirectory(ds)
ctx := context.Background()
child := ft.EmptyDirNode()
_, err := ds.Add(child)
if err != nil {
t.Fatal(err)
}
count := 5000
for i := 0; i < count; i++ {
err := dir.AddChild(ctx, fmt.Sprintf("entry %d", i), child)
if err != nil {
t.Fatal(err)
}
}
dirnd, err := dir.GetNode()
if err != nil {
t.Fatal(err)
}
links, err := dir.Links()
if err != nil {
t.Fatal(err)
}
if len(links) != count {
t.Fatal("not enough links dawg", len(links), count)
}
adir, err := NewDirectoryFromNode(ds, dirnd)
if err != nil {
t.Fatal(err)
}
links, err = adir.Links()
if err != nil {
t.Fatal(err)
}
names := make(map[string]bool)
for _, lnk := range links {
names[lnk.Name] = true
}
for i := 0; i < count; i++ {
n := fmt.Sprintf("entry %d", i)
if !names[n] {
t.Fatal("COULDNT FIND: ", n)
}
}
if len(links) != count {
t.Fatal("wrong number of links", len(links), count)
}
}

View File

@ -5,26 +5,21 @@ import (
dag "github.com/ipfs/go-ipfs/merkledag"
ft "github.com/ipfs/go-ipfs/unixfs"
hamt "github.com/ipfs/go-ipfs/unixfs/hamt"
node "gx/ipfs/QmYDscK7dmdo2GZ9aumS8s5auUUAH5mR1jvj5pYhWusfK7/go-ipld-node"
)
func ResolveUnixfsOnce(ctx context.Context, ds dag.DAGService, nd node.Node, name string) (*node.Link, error) {
pbnd, ok := nd.(*dag.ProtoNode)
if !ok {
lnk, _, err := nd.ResolveLink([]string{name})
return lnk, err
}
switch nd := nd.(type) {
case *dag.ProtoNode:
upb, err := ft.FromBytes(nd.Data())
if err != nil {
// Not a unixfs node, use standard object traversal code
return nd.GetNodeLink(name)
}
upb, err := ft.FromBytes(pbnd.Data())
if err != nil {
// Not a unixfs node, use standard object traversal code
lnk, _, err := nd.ResolveLink([]string{name})
return lnk, err
}
switch upb.GetType() {
/*
switch upb.GetType() {
case ft.THAMTShard:
s, err := hamt.NewHamtFromDag(ds, nd)
if err != nil {
@ -37,10 +32,15 @@ func ResolveUnixfsOnce(ctx context.Context, ds dag.DAGService, nd node.Node, nam
return nil, err
}
return dag.MakeLink(out)
*/
return node.MakeLink(out)
default:
return nd.GetNodeLink(name)
}
default:
lnk, _, err := nd.ResolveLink([]string{name})
return lnk, err
if err != nil {
return nil, err
}
return lnk, nil
}
}

View File

@ -31,6 +31,7 @@ const (
Data_File Data_DataType = 2
Data_Metadata Data_DataType = 3
Data_Symlink Data_DataType = 4
Data_HAMTShard Data_DataType = 5
)
var Data_DataType_name = map[int32]string{
@ -39,6 +40,7 @@ var Data_DataType_name = map[int32]string{
2: "File",
3: "Metadata",
4: "Symlink",
5: "HAMTShard",
}
var Data_DataType_value = map[string]int32{
"Raw": 0,
@ -46,6 +48,7 @@ var Data_DataType_value = map[string]int32{
"File": 2,
"Metadata": 3,
"Symlink": 4,
"HAMTShard": 5,
}
func (x Data_DataType) Enum() *Data_DataType {
@ -70,6 +73,8 @@ type Data struct {
Data []byte `protobuf:"bytes,2,opt,name=Data" json:"Data,omitempty"`
Filesize *uint64 `protobuf:"varint,3,opt,name=filesize" json:"filesize,omitempty"`
Blocksizes []uint64 `protobuf:"varint,4,rep,name=blocksizes" json:"blocksizes,omitempty"`
HashType *uint64 `protobuf:"varint,5,opt,name=hashType" json:"hashType,omitempty"`
Fanout *uint64 `protobuf:"varint,6,opt,name=fanout" json:"fanout,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -105,6 +110,20 @@ func (m *Data) GetBlocksizes() []uint64 {
return nil
}
func (m *Data) GetHashType() uint64 {
if m != nil && m.HashType != nil {
return *m.HashType
}
return 0
}
func (m *Data) GetFanout() uint64 {
if m != nil && m.Fanout != nil {
return *m.Fanout
}
return 0
}
type Metadata struct {
MimeType *string `protobuf:"bytes,1,opt,name=MimeType" json:"MimeType,omitempty"`
XXX_unrecognized []byte `json:"-"`

View File

@ -7,12 +7,16 @@ message Data {
File = 2;
Metadata = 3;
Symlink = 4;
HAMTShard = 5;
}
required DataType Type = 1;
optional bytes Data = 2;
optional uint64 filesize = 3;
repeated uint64 blocksizes = 4;
optional uint64 hashType = 5;
optional uint64 fanout = 6;
}
message Metadata {