From bb09ffd756f17876589baf76a008f3c5fa31beca Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 4 Aug 2016 12:45:00 -0700 Subject: [PATCH] implement an HAMT for unixfs directory sharding License: MIT Signed-off-by: Jeromy --- assets/assets.go | 14 +- commands/request.go | 2 +- core/commands/files/files.go | 19 +- core/coreunix/add.go | 7 +- fuse/ipns/ipns_unix.go | 3 +- mfs/dir.go | 116 ++++--- mfs/mfs_test.go | 125 +++++++ mfs/system.go | 7 +- package.json | 6 + test/sharness/t0250-files-api.sh | 28 ++ unixfs/format.go | 1 + unixfs/hamt/hamt.go | 464 ++++++++++++++++++++++++++ unixfs/hamt/hamt_stress_test.go | 280 ++++++++++++++++ unixfs/hamt/hamt_test.go | 552 +++++++++++++++++++++++++++++++ unixfs/hamt/util.go | 61 ++++ unixfs/hamt/util_test.go | 58 ++++ unixfs/io/dirbuilder.go | 148 +++++++-- unixfs/io/dirbuilder_test.go | 148 +++++++-- unixfs/io/resolve.go | 34 +- unixfs/pb/unixfs.pb.go | 19 ++ unixfs/pb/unixfs.proto | 4 + 21 files changed, 1968 insertions(+), 128 deletions(-) create mode 100644 unixfs/hamt/hamt.go create mode 100644 unixfs/hamt/hamt_stress_test.go create mode 100644 unixfs/hamt/hamt_test.go create mode 100644 unixfs/hamt/util.go create mode 100644 unixfs/hamt/util_test.go diff --git a/assets/assets.go b/assets/assets.go index 562a54420..092585db2 100644 --- a/assets/assets.go +++ b/assets/assets.go @@ -59,17 +59,27 @@ func addAssetList(nd *core.IpfsNode, l []string) (*cid.Cid, error) { } fname := filepath.Base(p) + c, err := cid.Decode(s) if err != nil { return nil, err } - if err := dirb.AddChild(nd.Context(), fname, c); err != nil { + node, err := nd.DAG.Get(nd.Context(), c) + if err != nil { + return nil, err + } + + if err := dirb.AddChild(nd.Context(), fname, node); err != nil { return nil, fmt.Errorf("assets: could not add '%s' as a child: %s", fname, err) } } - dir := dirb.GetNode() + dir, err := dirb.GetNode() + if err != nil { + return nil, err + } + dcid, err := nd.DAG.Add(dir) if err != nil { return nil, fmt.Errorf("assets: DAG.Add(dir) failed: %s", err) diff --git a/commands/request.go b/commands/request.go index 7cb78ca3e..7020ea078 100644 --- a/commands/request.go +++ b/commands/request.go @@ -2,6 +2,7 @@ package commands import ( "bufio" + "context" "errors" "fmt" "io" @@ -10,7 +11,6 @@ import ( "strconv" "time" - context "context" "github.com/ipfs/go-ipfs/commands/files" "github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/repo/config" diff --git a/core/commands/files/files.go b/core/commands/files/files.go index 0d69f9016..86ab3f3f3 100644 --- a/core/commands/files/files.go +++ b/core/commands/files/files.go @@ -265,16 +265,7 @@ func getNodeFromPath(ctx context.Context, node *core.IpfsNode, p string) (node.N ResolveOnce: uio.ResolveUnixfsOnce, } - nd, err := core.Resolve(ctx, node.Namesys, resolver, np) - if err != nil { - return nil, err - } - pbnd, ok := nd.(*dag.ProtoNode) - if !ok { - return nil, dag.ErrNotProtobuf - } - - return pbnd, nil + return core.Resolve(ctx, node.Namesys, resolver, np) default: fsn, err := mfs.Lookup(node.FilesRoot, p) if err != nil { @@ -357,7 +348,13 @@ Examples: case *mfs.Directory: if !long { var output []mfs.NodeListing - for _, name := range fsn.ListNames() { + names, err := fsn.ListNames() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + for _, name := range names { output = append(output, mfs.NodeListing{ Name: name, }) diff --git a/core/coreunix/add.go b/core/coreunix/add.go index 322d1b5ca..4ace57742 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -228,7 +228,12 @@ func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error { case *mfs.File: return nil case *mfs.Directory: - for _, name := range fsn.ListNames() { + names, err := fsn.ListNames() + if err != nil { + return err + } + + for _, name := range names { child, err := fsn.Child(name) if err != nil { return err diff --git a/fuse/ipns/ipns_unix.go b/fuse/ipns/ipns_unix.go index 77ca6539d..96badb230 100644 --- a/fuse/ipns/ipns_unix.go +++ b/fuse/ipns/ipns_unix.go @@ -16,7 +16,6 @@ import ( namesys "github.com/ipfs/go-ipfs/namesys" path "github.com/ipfs/go-ipfs/path" ft "github.com/ipfs/go-ipfs/unixfs" - uio "github.com/ipfs/go-ipfs/unixfs/io" ci "gx/ipfs/QmPGxZ1DP2w45WcogpW1h43BvseXbfke9N91qotpoQcUeS/go-libp2p-crypto" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" @@ -100,7 +99,7 @@ func loadRoot(ctx context.Context, rt *keyRoot, ipfs *core.IpfsNode, name string switch err { case nil: case namesys.ErrResolveFailed: - node = uio.NewEmptyDirectory() + node = ft.EmptyDirNode() default: log.Errorf("looking up %s: %s", p, err) return nil, err diff --git a/mfs/dir.go b/mfs/dir.go index f1a61eefa..51a26cf13 100644 --- a/mfs/dir.go +++ b/mfs/dir.go @@ -12,6 +12,7 @@ import ( dag "github.com/ipfs/go-ipfs/merkledag" ft "github.com/ipfs/go-ipfs/unixfs" + uio "github.com/ipfs/go-ipfs/unixfs/io" ufspb "github.com/ipfs/go-ipfs/unixfs/pb" node "gx/ipfs/QmYDscK7dmdo2GZ9aumS8s5auUUAH5mR1jvj5pYhWusfK7/go-ipld-node" @@ -29,25 +30,31 @@ type Directory struct { files map[string]*File lock sync.Mutex - node *dag.ProtoNode ctx context.Context + dirbuilder *uio.Directory + modTime time.Time name string } -func NewDirectory(ctx context.Context, name string, node *dag.ProtoNode, parent childCloser, dserv dag.DAGService) *Directory { - return &Directory{ - dserv: dserv, - ctx: ctx, - name: name, - node: node, - parent: parent, - childDirs: make(map[string]*Directory), - files: make(map[string]*File), - modTime: time.Now(), +func NewDirectory(ctx context.Context, name string, node node.Node, parent childCloser, dserv dag.DAGService) (*Directory, error) { + db, err := uio.NewDirectoryFromNode(dserv, node) + if err != nil { + return nil, err } + + return &Directory{ + dserv: dserv, + ctx: ctx, + name: name, + dirbuilder: db, + parent: parent, + childDirs: make(map[string]*Directory), + files: make(map[string]*File), + modTime: time.Now(), + }, nil } // closeChild updates the child by the given name to the dag node 'nd' @@ -81,21 +88,26 @@ func (d *Directory) closeChildUpdate(name string, nd *dag.ProtoNode, sync bool) } func (d *Directory) flushCurrentNode() (*dag.ProtoNode, error) { - _, err := d.dserv.Add(d.node) + nd, err := d.dirbuilder.GetNode() if err != nil { return nil, err } - return d.node.Copy().(*dag.ProtoNode), nil + _, err = d.dserv.Add(nd) + if err != nil { + return nil, err + } + + pbnd, ok := nd.(*dag.ProtoNode) + if !ok { + return nil, dag.ErrNotProtobuf + } + + return pbnd.Copy().(*dag.ProtoNode), nil } func (d *Directory) updateChild(name string, nd node.Node) error { - err := d.node.RemoveNodeLink(name) - if err != nil && err != dag.ErrNotFound { - return err - } - - err = d.node.AddNodeLinkClean(name, nd) + err := d.dirbuilder.AddChild(d.ctx, name, nd) if err != nil { return err } @@ -130,8 +142,12 @@ func (d *Directory) cacheNode(name string, nd node.Node) (FSNode, error) { } switch i.GetType() { - case ufspb.Data_Directory: - ndir := NewDirectory(d.ctx, name, nd, d, d.dserv) + case ufspb.Data_Directory, ufspb.Data_HAMTShard: + ndir, err := NewDirectory(d.ctx, name, nd, d, d.dserv) + if err != nil { + return nil, err + } + d.childDirs[name] = ndir return ndir, nil case ufspb.Data_File, ufspb.Data_Raw, ufspb.Data_Symlink: @@ -175,15 +191,7 @@ func (d *Directory) Uncache(name string) { // childFromDag searches through this directories dag node for a child link // with the given name func (d *Directory) childFromDag(name string) (node.Node, error) { - pbn, err := d.node.GetLinkedNode(d.ctx, d.dserv, name) - switch err { - case nil: - return pbn, nil - case dag.ErrLinkNotFound: - return nil, os.ErrNotExist - default: - return nil, err - } + return d.dirbuilder.Find(d.ctx, name) } // childUnsync returns the child under this directory by the given name @@ -209,7 +217,7 @@ type NodeListing struct { Hash string } -func (d *Directory) ListNames() []string { +func (d *Directory) ListNames() ([]string, error) { d.lock.Lock() defer d.lock.Unlock() @@ -221,7 +229,12 @@ func (d *Directory) ListNames() []string { names[n] = struct{}{} } - for _, l := range d.node.Links() { + links, err := d.dirbuilder.Links() + if err != nil { + return nil, err + } + + for _, l := range links { names[l.Name] = struct{}{} } @@ -231,7 +244,7 @@ func (d *Directory) ListNames() []string { } sort.Strings(out) - return out + return out, nil } func (d *Directory) List() ([]NodeListing, error) { @@ -239,7 +252,13 @@ func (d *Directory) List() ([]NodeListing, error) { defer d.lock.Unlock() var out []NodeListing - for _, l := range d.node.Links() { + + links, err := d.dirbuilder.Links() + if err != nil { + return nil, err + } + + for _, l := range links { child := NodeListing{} child.Name = l.Name @@ -285,20 +304,23 @@ func (d *Directory) Mkdir(name string) (*Directory, error) { } } - ndir := new(dag.ProtoNode) - ndir.SetData(ft.FolderPBData()) + ndir := ft.EmptyDirNode() _, err = d.dserv.Add(ndir) if err != nil { return nil, err } - err = d.node.AddNodeLinkClean(name, ndir) + err = d.dirbuilder.AddChild(d.ctx, name, ndir) + if err != nil { + return nil, err + } + + dirobj, err := NewDirectory(d.ctx, name, ndir, d, d.dserv) if err != nil { return nil, err } - dirobj := NewDirectory(d.ctx, name, ndir, d, d.dserv) d.childDirs[name] = dirobj return dirobj, nil } @@ -310,12 +332,7 @@ func (d *Directory) Unlink(name string) error { delete(d.childDirs, name) delete(d.files, name) - err := d.node.RemoveNodeLink(name) - if err != nil { - return err - } - - _, err = d.dserv.Add(d.node) + err := d.dirbuilder.RemoveChild(d.ctx, name) if err != nil { return err } @@ -350,7 +367,7 @@ func (d *Directory) AddChild(name string, nd node.Node) error { return err } - err = d.node.AddNodeLinkClean(name, nd) + err = d.dirbuilder.AddChild(d.ctx, name, nd) if err != nil { return err } @@ -406,10 +423,15 @@ func (d *Directory) GetNode() (node.Node, error) { return nil, err } - _, err = d.dserv.Add(d.node) + nd, err := d.dirbuilder.GetNode() if err != nil { return nil, err } - return d.node.Copy().(*dag.ProtoNode), nil + _, err = d.dserv.Add(nd) + if err != nil { + return nil, err + } + + return nd, err } diff --git a/mfs/mfs_test.go b/mfs/mfs_test.go index 74e0d6dfb..8471fd02c 100644 --- a/mfs/mfs_test.go +++ b/mfs/mfs_test.go @@ -747,6 +747,67 @@ func TestMfsStress(t *testing.T) { } } +func TestMfsHugeDir(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _, rt := setupRoot(ctx, t) + + for i := 0; i < 100000; i++ { + err := Mkdir(rt, fmt.Sprintf("/dir%d", i), false, false) + if err != nil { + t.Fatal(err) + } + } +} + +func TestMkdirP(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _, rt := setupRoot(ctx, t) + + err := Mkdir(rt, "/a/b/c/d/e/f", true, true) + if err != nil { + t.Fatal(err) + } +} + +func TestConcurrentWriteAndFlush(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ds, rt := setupRoot(ctx, t) + + d := mkdirP(t, rt.GetValue().(*Directory), "foo/bar/baz") + fn := fileNodeFromReader(t, ds, bytes.NewBuffer(nil)) + err := d.AddChild("file", fn) + if err != nil { + t.Fatal(err) + } + + nloops := 5000 + + wg := new(sync.WaitGroup) + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < nloops; i++ { + err := writeFile(rt, "/foo/bar/baz/file", []byte("STUFF")) + if err != nil { + t.Error("file write failed: ", err) + return + } + } + }() + + for i := 0; i < nloops; i++ { + _, err := rt.GetValue().GetNode() + if err != nil { + t.Fatal(err) + } + } + + wg.Wait() +} + func TestFlushing(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -892,6 +953,70 @@ func TestConcurrentReads(t *testing.T) { } wg.Wait() } +func writeFile(rt *Root, path string, data []byte) error { + n, err := Lookup(rt, path) + if err != nil { + return err + } + + fi, ok := n.(*File) + if !ok { + return fmt.Errorf("expected to receive a file, but didnt get one") + } + + fd, err := fi.Open(OpenWriteOnly, true) + if err != nil { + return err + } + defer fd.Close() + + nw, err := fd.Write(data) + if err != nil { + return err + } + + if nw != 10 { + fmt.Errorf("wrote incorrect amount") + } + + return nil +} + +func TestConcurrentWrites(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ds, rt := setupRoot(ctx, t) + + rootdir := rt.GetValue().(*Directory) + + path := "a/b/c" + d := mkdirP(t, rootdir, path) + + fi := fileNodeFromReader(t, ds, bytes.NewReader(make([]byte, 0))) + err := d.AddChild("afile", fi) + if err != nil { + t.Fatal(err) + } + + var wg sync.WaitGroup + nloops := 100 + for i := 0; i < 10; i++ { + wg.Add(1) + go func(me int) { + defer wg.Done() + mybuf := bytes.Repeat([]byte{byte(me)}, 10) + for j := 0; j < nloops; j++ { + err := writeFile(rt, "a/b/c/afile", mybuf) + if err != nil { + t.Error("writefile failed: ", err) + return + } + } + }(i) + } + wg.Wait() +} func TestFileDescriptors(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) diff --git a/mfs/system.go b/mfs/system.go index 1c57677b5..05d2a1c53 100644 --- a/mfs/system.go +++ b/mfs/system.go @@ -88,7 +88,12 @@ func NewRoot(parent context.Context, ds dag.DAGService, node *dag.ProtoNode, pf switch pbn.GetType() { case ft.TDirectory: - root.val = NewDirectory(parent, node.String(), node, root, ds) + rval, err := NewDirectory(parent, node.String(), node, root, ds) + if err != nil { + return nil, err + } + + root.val = rval case ft.TFile, ft.TMetadata, ft.TRaw: fi, err := NewFile(node.String(), node, root, ds) if err != nil { diff --git a/package.json b/package.json index 15148ff8d..406c234e3 100644 --- a/package.json +++ b/package.json @@ -300,6 +300,12 @@ "hash": "QmaFNtBAXX4nVMQWbUqNysXyhevUj1k4B1y5uS45LC7Vw9", "name": "fuse", "version": "0.1.3" + }, + { + "author": "whyrusleeping", + "hash": "QmfJHywXQu98UeZtGJBQrPAR6AtmDjjbe3qjTo9piXHPnx", + "name": "murmur3", + "version": "0.0.0" } ], "gxVersion": "0.10.0", diff --git a/test/sharness/t0250-files-api.sh b/test/sharness/t0250-files-api.sh index 5c9f1332d..9e5795006 100755 --- a/test/sharness/t0250-files-api.sh +++ b/test/sharness/t0250-files-api.sh @@ -46,6 +46,33 @@ verify_dir_contents() { ' } +test_sharding() { + test_expect_success "make a directory" ' + ipfs files mkdir /foo + ' + + test_expect_success "can make 1100 files in a directory" ' + printf "" > list_exp_raw + for i in `seq 1100` + do + echo $i | ipfs files write --create /foo/file$i + echo file$i >> list_exp_raw + done + ' + + test_expect_success "listing works" ' + ipfs files ls /foo |sort > list_out && + sort list_exp_raw > list_exp && + test_cmp list_exp list_out + ' + + test_expect_success "can read a file from sharded directory" ' + ipfs files read /foo/file65 > file_out && + echo "65" > file_exp && + test_cmp file_out file_exp + ' +} + test_files_api() { test_expect_success "can mkdir in root" ' ipfs files mkdir /cats @@ -491,5 +518,6 @@ test_launch_ipfs_daemon ONLINE=1 # set online flag so tests can easily tell test_files_api +test_sharding test_kill_ipfs_daemon test_done diff --git a/unixfs/format.go b/unixfs/format.go index 96dd109d1..4157ec0e5 100644 --- a/unixfs/format.go +++ b/unixfs/format.go @@ -17,6 +17,7 @@ const ( TDirectory = pb.Data_Directory TMetadata = pb.Data_Metadata TSymlink = pb.Data_Symlink + THAMTShard = pb.Data_HAMTShard ) var ErrMalformedFileFormat = errors.New("malformed data in file format") diff --git a/unixfs/hamt/hamt.go b/unixfs/hamt/hamt.go new file mode 100644 index 000000000..05b9402fd --- /dev/null +++ b/unixfs/hamt/hamt.go @@ -0,0 +1,464 @@ +package hamt + +import ( + "context" + "fmt" + "math" + "math/big" + "os" + + dag "github.com/ipfs/go-ipfs/merkledag" + format "github.com/ipfs/go-ipfs/unixfs" + upb "github.com/ipfs/go-ipfs/unixfs/pb" + + node "gx/ipfs/QmYDscK7dmdo2GZ9aumS8s5auUUAH5mR1jvj5pYhWusfK7/go-ipld-node" + proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto" + "gx/ipfs/QmfJHywXQu98UeZtGJBQrPAR6AtmDjjbe3qjTo9piXHPnx/murmur3" +) + +const ( + HashMurmur3 uint64 = 0x22 +) + +type HamtShard struct { + nd *dag.ProtoNode + + bitfield *big.Int + + children []child + + tableSize int + tableSizeLg2 int + + hashFunc uint64 + + prefixPadStr string + maxpadlen int + + dserv dag.DAGService +} + +// child can either be another shard, or a leaf node value +type child interface { + Node() (node.Node, error) + Label() string +} + +func NewHamtShard(dserv dag.DAGService, size int) *HamtShard { + ds := makeHamtShard(dserv, size) + ds.bitfield = big.NewInt(0) + ds.nd = new(dag.ProtoNode) + ds.hashFunc = HashMurmur3 + return ds +} + +func makeHamtShard(ds dag.DAGService, size int) *HamtShard { + maxpadding := fmt.Sprintf("%X", size-1) + return &HamtShard{ + tableSizeLg2: int(math.Log2(float64(size))), + prefixPadStr: fmt.Sprintf("%%0%dX", len(maxpadding)), + maxpadlen: len(maxpadding), + tableSize: size, + dserv: ds, + } +} + +func NewHamtFromDag(dserv dag.DAGService, nd node.Node) (*HamtShard, error) { + pbnd, ok := nd.(*dag.ProtoNode) + if !ok { + return nil, dag.ErrLinkNotFound + } + + pbd, err := format.FromBytes(pbnd.Data()) + if err != nil { + return nil, err + } + + if pbd.GetType() != upb.Data_HAMTShard { + return nil, fmt.Errorf("node was not a dir shard") + } + + if pbd.GetHashType() != HashMurmur3 { + return nil, fmt.Errorf("only murmur3 supported as hash function") + } + + ds := makeHamtShard(dserv, int(pbd.GetFanout())) + ds.nd = pbnd.Copy().(*dag.ProtoNode) + ds.children = make([]child, len(pbnd.Links())) + ds.bitfield = new(big.Int).SetBytes(pbd.GetData()) + ds.hashFunc = pbd.GetHashType() + + return ds, nil +} + +// Node serializes the HAMT structure into a merkledag node with unixfs formatting +func (ds *HamtShard) Node() (node.Node, error) { + out := new(dag.ProtoNode) + + // TODO: optimized 'for each set bit' + for i := 0; i < ds.tableSize; i++ { + if ds.bitfield.Bit(i) == 0 { + continue + } + + cindex := ds.indexForBitPos(i) + ch := ds.children[cindex] + if ch != nil { + cnd, err := ch.Node() + if err != nil { + return nil, err + } + + err = out.AddNodeLinkClean(ds.linkNamePrefix(i)+ch.Label(), cnd) + if err != nil { + return nil, err + } + } else { + // child unloaded, just copy in link with updated name + lnk := ds.nd.Links()[cindex] + label := lnk.Name[ds.maxpadlen:] + + err := out.AddRawLink(ds.linkNamePrefix(i)+label, lnk) + if err != nil { + return nil, err + } + } + } + + typ := upb.Data_HAMTShard + data, err := proto.Marshal(&upb.Data{ + Type: &typ, + Fanout: proto.Uint64(uint64(ds.tableSize)), + HashType: proto.Uint64(HashMurmur3), + Data: ds.bitfield.Bytes(), + }) + if err != nil { + return nil, err + } + + out.SetData(data) + + _, err = ds.dserv.Add(out) + if err != nil { + return nil, err + } + + return out, nil +} + +type shardValue struct { + key string + val node.Node +} + +func (sv *shardValue) Node() (node.Node, error) { + return sv.val, nil +} + +func (sv *shardValue) Label() string { + return sv.key +} + +func hash(val []byte) []byte { + h := murmur3.New64() + h.Write(val) + return h.Sum(nil) +} + +// Label for HamtShards is the empty string, this is used to differentiate them from +// value entries +func (ds *HamtShard) Label() string { + return "" +} + +// Set sets 'name' = nd in the HAMT +func (ds *HamtShard) Set(ctx context.Context, name string, nd node.Node) error { + hv := &hashBits{b: hash([]byte(name))} + return ds.modifyValue(ctx, hv, name, nd) +} + +// Remove deletes the named entry if it exists, this operation is idempotent. +func (ds *HamtShard) Remove(ctx context.Context, name string) error { + hv := &hashBits{b: hash([]byte(name))} + return ds.modifyValue(ctx, hv, name, nil) +} + +func (ds *HamtShard) Find(ctx context.Context, name string) (node.Node, error) { + hv := &hashBits{b: hash([]byte(name))} + + var out node.Node + err := ds.getValue(ctx, hv, name, func(sv *shardValue) error { + out = sv.val + return nil + }) + + return out, err +} + +// getChild returns the i'th child of this shard. If it is cached in the +// children array, it will return it from there. Otherwise, it loads the child +// node from disk. +func (ds *HamtShard) getChild(ctx context.Context, i int) (child, error) { + if i >= len(ds.children) || i < 0 { + return nil, fmt.Errorf("invalid index passed to getChild (likely corrupt bitfield)") + } + + if len(ds.children) != len(ds.nd.Links()) { + return nil, fmt.Errorf("inconsistent lengths between children array and Links array") + } + + c := ds.children[i] + if c != nil { + return c, nil + } + + return ds.loadChild(ctx, i) +} + +// loadChild reads the i'th child node of this shard from disk and returns it +// as a 'child' interface +func (ds *HamtShard) loadChild(ctx context.Context, i int) (child, error) { + lnk := ds.nd.Links()[i] + if len(lnk.Name) < ds.maxpadlen { + return nil, fmt.Errorf("invalid link name '%s'", lnk.Name) + } + + nd, err := lnk.GetNode(ctx, ds.dserv) + if err != nil { + return nil, err + } + + var c child + if len(lnk.Name) == ds.maxpadlen { + pbnd, ok := nd.(*dag.ProtoNode) + if !ok { + return nil, dag.ErrNotProtobuf + } + + pbd, err := format.FromBytes(pbnd.Data()) + if err != nil { + return nil, err + } + + if pbd.GetType() != format.THAMTShard { + return nil, fmt.Errorf("HAMT entries must have non-zero length name") + } + + cds, err := NewHamtFromDag(ds.dserv, nd) + if err != nil { + return nil, err + } + + c = cds + } else { + c = &shardValue{ + key: lnk.Name[ds.maxpadlen:], + val: nd, + } + } + + ds.children[i] = c + return c, nil +} + +func (ds *HamtShard) setChild(i int, c child) { + ds.children[i] = c +} + +func (ds *HamtShard) insertChild(idx int, key string, val node.Node) error { + if val == nil { + return os.ErrNotExist + } + + i := ds.indexForBitPos(idx) + ds.bitfield.SetBit(ds.bitfield, idx, 1) + sv := &shardValue{ + key: key, + val: val, + } + + ds.children = append(ds.children[:i], append([]child{sv}, ds.children[i:]...)...) + ds.nd.SetLinks(append(ds.nd.Links()[:i], append([]*node.Link{nil}, ds.nd.Links()[i:]...)...)) + return nil +} + +func (ds *HamtShard) rmChild(i int) error { + if i < 0 || i >= len(ds.children) || i >= len(ds.nd.Links()) { + return fmt.Errorf("hamt: attempted to remove child with out of range index") + } + + copy(ds.children[i:], ds.children[i+1:]) + ds.children = ds.children[:len(ds.children)-1] + + copy(ds.nd.Links()[i:], ds.nd.Links()[i+1:]) + ds.nd.SetLinks(ds.nd.Links()[:len(ds.nd.Links())-1]) + + return nil +} + +func (ds *HamtShard) getValue(ctx context.Context, hv *hashBits, key string, cb func(*shardValue) error) error { + idx := hv.Next(ds.tableSizeLg2) + if ds.bitfield.Bit(int(idx)) == 1 { + cindex := ds.indexForBitPos(idx) + + child, err := ds.getChild(ctx, cindex) + if err != nil { + return err + } + + switch child := child.(type) { + case *HamtShard: + return child.getValue(ctx, hv, key, cb) + case *shardValue: + if child.key == key { + return cb(child) + } + } + } + + return os.ErrNotExist +} + +func (ds *HamtShard) EnumLinks() ([]*node.Link, error) { + var links []*node.Link + err := ds.walkTrie(func(sv *shardValue) error { + lnk, err := node.MakeLink(sv.val) + if err != nil { + return err + } + + lnk.Name = sv.key + + links = append(links, lnk) + return nil + }) + if err != nil { + return nil, err + } + + return links, nil +} + +func (ds *HamtShard) walkTrie(cb func(*shardValue) error) error { + for i := 0; i < ds.tableSize; i++ { + if ds.bitfield.Bit(i) == 0 { + continue + } + + idx := ds.indexForBitPos(i) + // NOTE: an optimized version could simply iterate over each + // element in the 'children' array. + c, err := ds.getChild(context.TODO(), idx) + if err != nil { + return err + } + + switch c := c.(type) { + case *shardValue: + err := cb(c) + if err != nil { + return err + } + + case *HamtShard: + err := c.walkTrie(cb) + if err != nil { + return err + } + default: + return fmt.Errorf("unexpected child type: %#v", c) + } + } + return nil +} + +func (ds *HamtShard) modifyValue(ctx context.Context, hv *hashBits, key string, val node.Node) error { + idx := hv.Next(ds.tableSizeLg2) + + if ds.bitfield.Bit(idx) != 1 { + return ds.insertChild(idx, key, val) + } + + cindex := ds.indexForBitPos(idx) + + child, err := ds.getChild(ctx, cindex) + if err != nil { + return err + } + + switch child := child.(type) { + case *HamtShard: + err := child.modifyValue(ctx, hv, key, val) + if err != nil { + return err + } + + if val == nil { + switch len(child.children) { + case 0: + // empty sub-shard, prune it + // Note: this shouldnt normally ever happen + // in the event of another implementation creates flawed + // structures, this will help to normalize them. + ds.bitfield.SetBit(ds.bitfield, idx, 0) + return ds.rmChild(cindex) + case 1: + nchild, ok := child.children[0].(*shardValue) + if ok { + // sub-shard with a single value element, collapse it + ds.setChild(cindex, nchild) + } + return nil + } + } + + return nil + case *shardValue: + switch { + case val == nil: // passing a nil value signifies a 'delete' + ds.bitfield.SetBit(ds.bitfield, idx, 0) + return ds.rmChild(cindex) + + case child.key == key: // value modification + child.val = val + return nil + + default: // replace value with another shard, one level deeper + ns := NewHamtShard(ds.dserv, ds.tableSize) + chhv := &hashBits{ + b: hash([]byte(child.key)), + consumed: hv.consumed, + } + + err := ns.modifyValue(ctx, hv, key, val) + if err != nil { + return err + } + + err = ns.modifyValue(ctx, chhv, child.key, child.val) + if err != nil { + return err + } + + ds.setChild(cindex, ns) + return nil + } + default: + return fmt.Errorf("unexpected type for child: %#v", child) + } +} + +func (ds *HamtShard) indexForBitPos(bp int) int { + // TODO: an optimization could reuse the same 'mask' here and change the size + // as needed. This isnt yet done as the bitset package doesnt make it easy + // to do. + mask := new(big.Int).Sub(new(big.Int).Exp(big.NewInt(2), big.NewInt(int64(bp)), nil), big.NewInt(1)) + mask.And(mask, ds.bitfield) + + return popCount(mask) +} + +// linkNamePrefix takes in the bitfield index of an entry and returns its hex prefix +func (ds *HamtShard) linkNamePrefix(idx int) string { + return fmt.Sprintf(ds.prefixPadStr, idx) +} diff --git a/unixfs/hamt/hamt_stress_test.go b/unixfs/hamt/hamt_stress_test.go new file mode 100644 index 000000000..83ad25597 --- /dev/null +++ b/unixfs/hamt/hamt_stress_test.go @@ -0,0 +1,280 @@ +package hamt + +import ( + "bufio" + "context" + "fmt" + "math/rand" + "os" + "strconv" + "strings" + "testing" + "time" + + dag "github.com/ipfs/go-ipfs/merkledag" + mdtest "github.com/ipfs/go-ipfs/merkledag/test" + ft "github.com/ipfs/go-ipfs/unixfs" +) + +func getNames(prefix string, count int) []string { + out := make([]string, count) + for i := 0; i < count; i++ { + out[i] = fmt.Sprintf("%s%d", prefix, i) + } + return out +} + +const ( + opAdd = iota + opDel + opFind +) + +type testOp struct { + Op int + Val string +} + +func stringArrToSet(arr []string) map[string]bool { + out := make(map[string]bool) + for _, s := range arr { + out[s] = true + } + return out +} + +// generate two different random sets of operations to result in the same +// ending directory (same set of entries at the end) and execute each of them +// in turn, then compare to ensure the output is the same on each. +func TestOrderConsistency(t *testing.T) { + seed := time.Now().UnixNano() + t.Logf("using seed = %d", seed) + ds := mdtest.Mock() + + shardWidth := 1024 + + keep := getNames("good", 4000) + temp := getNames("tempo", 6000) + + ops := genOpSet(seed, keep, temp) + s, err := executeOpSet(t, ds, shardWidth, ops) + if err != nil { + t.Fatal(err) + } + + err = validateOpSetCompletion(t, s, keep, temp) + if err != nil { + t.Fatal(err) + } + + ops2 := genOpSet(seed+1000, keep, temp) + s2, err := executeOpSet(t, ds, shardWidth, ops2) + if err != nil { + t.Fatal(err) + } + + err = validateOpSetCompletion(t, s2, keep, temp) + if err != nil { + t.Fatal(err) + } + + nd, err := s.Node() + if err != nil { + t.Fatal(err) + } + + nd2, err := s2.Node() + if err != nil { + t.Fatal(err) + } + + k := nd.Cid() + k2 := nd2.Cid() + + if !k.Equals(k2) { + t.Fatal("got different results: ", k, k2) + } +} + +func validateOpSetCompletion(t *testing.T, s *HamtShard, keep, temp []string) error { + ctx := context.TODO() + for _, n := range keep { + _, err := s.Find(ctx, n) + if err != nil { + return fmt.Errorf("couldnt find %s: %s", n, err) + } + } + + for _, n := range temp { + _, err := s.Find(ctx, n) + if err != os.ErrNotExist { + return fmt.Errorf("expected not to find: %s", err) + } + } + + return nil +} + +func executeOpSet(t *testing.T, ds dag.DAGService, width int, ops []testOp) (*HamtShard, error) { + ctx := context.TODO() + s := NewHamtShard(ds, width) + e := ft.EmptyDirNode() + ds.Add(e) + + for _, o := range ops { + switch o.Op { + case opAdd: + err := s.Set(ctx, o.Val, e) + if err != nil { + return nil, fmt.Errorf("inserting %s: %s", o.Val, err) + } + case opDel: + err := s.Remove(ctx, o.Val) + if err != nil { + return nil, fmt.Errorf("deleting %s: %s", o.Val, err) + } + case opFind: + _, err := s.Find(ctx, o.Val) + if err != nil { + return nil, fmt.Errorf("finding %s: %s", o.Val, err) + } + } + } + + return s, nil +} + +func genOpSet(seed int64, keep, temp []string) []testOp { + tempset := stringArrToSet(temp) + + allnames := append(keep, temp...) + shuffle(seed, allnames) + + var todel []string + + var ops []testOp + + for { + n := len(allnames) + len(todel) + if n == 0 { + return ops + } + + rn := rand.Intn(n) + + if rn < len(allnames) { + next := allnames[0] + allnames = allnames[1:] + ops = append(ops, testOp{ + Op: opAdd, + Val: next, + }) + + if tempset[next] { + todel = append(todel, next) + } + } else { + shuffle(seed+100, todel) + next := todel[0] + todel = todel[1:] + + ops = append(ops, testOp{ + Op: opDel, + Val: next, + }) + } + } +} + +// executes the given op set with a repl to allow easier debugging +func debugExecuteOpSet(ds dag.DAGService, width int, ops []testOp) (*HamtShard, error) { + s := NewHamtShard(ds, width) + e := ft.EmptyDirNode() + ds.Add(e) + ctx := context.TODO() + + run := 0 + + opnames := map[int]string{ + opAdd: "add", + opDel: "del", + } + +mainloop: + for i := 0; i < len(ops); i++ { + o := ops[i] + + fmt.Printf("Op %d: %s %s\n", i, opnames[o.Op], o.Val) + for run == 0 { + cmd := readCommand() + parts := strings.Split(cmd, " ") + switch parts[0] { + case "": + run = 1 + case "find": + _, err := s.Find(ctx, parts[1]) + if err == nil { + fmt.Println("success") + } else { + fmt.Println(err) + } + case "run": + if len(parts) > 1 { + n, err := strconv.Atoi(parts[1]) + if err != nil { + panic(err) + } + + run = n + } else { + run = -1 + } + case "lookop": + for k := 0; k < len(ops); k++ { + if ops[k].Val == parts[1] { + fmt.Printf(" Op %d: %s %s\n", k, opnames[ops[k].Op], parts[1]) + } + } + case "restart": + s = NewHamtShard(ds, width) + i = -1 + continue mainloop + case "print": + nd, err := s.Node() + if err != nil { + panic(err) + } + printDag(ds, nd.(*dag.ProtoNode), 0) + } + } + run-- + + switch o.Op { + case opAdd: + err := s.Set(ctx, o.Val, e) + if err != nil { + return nil, fmt.Errorf("inserting %s: %s", o.Val, err) + } + case opDel: + fmt.Println("deleting: ", o.Val) + err := s.Remove(ctx, o.Val) + if err != nil { + return nil, fmt.Errorf("deleting %s: %s", o.Val, err) + } + case opFind: + _, err := s.Find(ctx, o.Val) + if err != nil { + return nil, fmt.Errorf("finding %s: %s", o.Val, err) + } + } + } + + return s, nil +} + +func readCommand() string { + fmt.Print("> ") + scan := bufio.NewScanner(os.Stdin) + scan.Scan() + return scan.Text() +} diff --git a/unixfs/hamt/hamt_test.go b/unixfs/hamt/hamt_test.go new file mode 100644 index 000000000..cafa1ce8c --- /dev/null +++ b/unixfs/hamt/hamt_test.go @@ -0,0 +1,552 @@ +package hamt + +import ( + "context" + "fmt" + "math/rand" + "os" + "sort" + "strings" + "testing" + "time" + + dag "github.com/ipfs/go-ipfs/merkledag" + mdtest "github.com/ipfs/go-ipfs/merkledag/test" + dagutils "github.com/ipfs/go-ipfs/merkledag/utils" + ft "github.com/ipfs/go-ipfs/unixfs" +) + +func shuffle(seed int64, arr []string) { + r := rand.New(rand.NewSource(seed)) + for i := 0; i < len(arr); i++ { + a := r.Intn(len(arr)) + b := r.Intn(len(arr)) + arr[a], arr[b] = arr[b], arr[a] + } +} + +func makeDir(ds dag.DAGService, size int) ([]string, *HamtShard, error) { + return makeDirWidth(ds, size, 256) +} + +func makeDirWidth(ds dag.DAGService, size, width int) ([]string, *HamtShard, error) { + s := NewHamtShard(ds, width) + + var dirs []string + for i := 0; i < size; i++ { + dirs = append(dirs, fmt.Sprintf("DIRNAME%d", i)) + } + + shuffle(time.Now().UnixNano(), dirs) + + for i := 0; i < len(dirs); i++ { + nd := ft.EmptyDirNode() + ds.Add(nd) + err := s.Set(context.Background(), dirs[i], nd) + if err != nil { + return nil, nil, err + } + } + + return dirs, s, nil +} + +func assertLink(s *HamtShard, name string, found bool) error { + _, err := s.Find(context.Background(), name) + switch err { + case os.ErrNotExist: + if found { + return err + } + + return nil + case nil: + if found { + return nil + } + + return fmt.Errorf("expected not to find link named %s", name) + default: + return err + } +} + +func assertSerializationWorks(ds dag.DAGService, s *HamtShard) error { + nd, err := s.Node() + if err != nil { + return err + } + + nds, err := NewHamtFromDag(ds, nd) + if err != nil { + return err + } + + linksA, err := s.EnumLinks() + if err != nil { + return err + } + + linksB, err := nds.EnumLinks() + if err != nil { + return err + } + + if len(linksA) != len(linksB) { + return fmt.Errorf("links arrays are different sizes") + } + + for i, a := range linksA { + b := linksB[i] + if a.Name != b.Name { + return fmt.Errorf("links names mismatch") + } + + if a.Cid.String() != b.Cid.String() { + return fmt.Errorf("link hashes dont match") + } + + if a.Size != b.Size { + return fmt.Errorf("link sizes not the same") + } + } + + return nil +} + +func TestBasicSet(t *testing.T) { + ds := mdtest.Mock() + for _, w := range []int{128, 256, 512, 1024, 2048, 4096} { + t.Run(fmt.Sprintf("BasicSet%d", w), func(t *testing.T) { + names, s, err := makeDirWidth(ds, 1000, w) + if err != nil { + t.Fatal(err) + } + ctx := context.Background() + + for _, d := range names { + _, err := s.Find(ctx, d) + if err != nil { + t.Fatal(err) + } + } + }) + } +} + +func TestDirBuilding(t *testing.T) { + ds := mdtest.Mock() + s := NewHamtShard(ds, 256) + + _, s, err := makeDir(ds, 200) + if err != nil { + t.Fatal(err) + } + + nd, err := s.Node() + if err != nil { + t.Fatal(err) + } + + //printDag(ds, nd, 0) + + k := nd.Cid() + + if k.String() != "QmY89TkSEVHykWMHDmyejSWFj9CYNtvzw4UwnT9xbc4Zjc" { + t.Fatalf("output didnt match what we expected (got %s)", k.String()) + } +} + +func TestShardReload(t *testing.T) { + ds := mdtest.Mock() + s := NewHamtShard(ds, 256) + ctx := context.Background() + + _, s, err := makeDir(ds, 200) + if err != nil { + t.Fatal(err) + } + + nd, err := s.Node() + if err != nil { + t.Fatal(err) + } + + nds, err := NewHamtFromDag(ds, nd) + if err != nil { + t.Fatal(err) + } + + lnks, err := nds.EnumLinks() + if err != nil { + t.Fatal(err) + } + + if len(lnks) != 200 { + t.Fatal("not enough links back") + } + + _, err = nds.Find(ctx, "DIRNAME50") + if err != nil { + t.Fatal(err) + } + + // Now test roundtrip marshal with no operations + + nds, err = NewHamtFromDag(ds, nd) + if err != nil { + t.Fatal(err) + } + + ond, err := nds.Node() + if err != nil { + t.Fatal(err) + } + + outk := ond.Cid() + ndk := nd.Cid() + + if !outk.Equals(ndk) { + printDiff(ds, nd.(*dag.ProtoNode), ond.(*dag.ProtoNode)) + t.Fatal("roundtrip serialization failed") + } +} + +func TestRemoveElems(t *testing.T) { + ds := mdtest.Mock() + dirs, s, err := makeDir(ds, 500) + if err != nil { + t.Fatal(err) + } + ctx := context.Background() + + shuffle(time.Now().UnixNano(), dirs) + + for _, d := range dirs { + err := s.Remove(ctx, d) + if err != nil { + t.Fatal(err) + } + } + + nd, err := s.Node() + if err != nil { + t.Fatal(err) + } + + if len(nd.Links()) > 0 { + t.Fatal("shouldnt have any links here") + } + + err = s.Remove(ctx, "doesnt exist") + if err != os.ErrNotExist { + t.Fatal("expected error does not exist") + } +} + +func TestSetAfterMarshal(t *testing.T) { + ds := mdtest.Mock() + _, s, err := makeDir(ds, 300) + if err != nil { + t.Fatal(err) + } + ctx := context.Background() + + nd, err := s.Node() + if err != nil { + t.Fatal(err) + } + + nds, err := NewHamtFromDag(ds, nd) + if err != nil { + t.Fatal(err) + } + + empty := ft.EmptyDirNode() + for i := 0; i < 100; i++ { + err := nds.Set(ctx, fmt.Sprintf("moredirs%d", i), empty) + if err != nil { + t.Fatal(err) + } + } + + links, err := nds.EnumLinks() + if err != nil { + t.Fatal(err) + } + + if len(links) != 400 { + t.Fatal("expected 400 links") + } + + err = assertSerializationWorks(ds, nds) + if err != nil { + t.Fatal(err) + } +} + +func TestDuplicateAddShard(t *testing.T) { + ds := mdtest.Mock() + dir := NewHamtShard(ds, 256) + nd := new(dag.ProtoNode) + ctx := context.Background() + + err := dir.Set(ctx, "test", nd) + if err != nil { + t.Fatal(err) + } + + err = dir.Set(ctx, "test", nd) + if err != nil { + t.Fatal(err) + } + + lnks, err := dir.EnumLinks() + if err != nil { + t.Fatal(err) + } + + if len(lnks) != 1 { + t.Fatal("expected only one link") + } +} + +func TestLoadFailsFromNonShard(t *testing.T) { + ds := mdtest.Mock() + nd := ft.EmptyDirNode() + + _, err := NewHamtFromDag(ds, nd) + if err == nil { + t.Fatal("expected dir shard creation to fail when given normal directory") + } + + nd = new(dag.ProtoNode) + + _, err = NewHamtFromDag(ds, nd) + if err == nil { + t.Fatal("expected dir shard creation to fail when given normal directory") + } +} + +func TestFindNonExisting(t *testing.T) { + ds := mdtest.Mock() + _, s, err := makeDir(ds, 100) + if err != nil { + t.Fatal(err) + } + ctx := context.Background() + + for i := 0; i < 200; i++ { + _, err := s.Find(ctx, fmt.Sprintf("notfound%d", i)) + if err != os.ErrNotExist { + t.Fatal("expected ErrNotExist") + } + } +} + +func TestRemoveElemsAfterMarshal(t *testing.T) { + ds := mdtest.Mock() + dirs, s, err := makeDir(ds, 30) + if err != nil { + t.Fatal(err) + } + ctx := context.Background() + + sort.Strings(dirs) + + err = s.Remove(ctx, dirs[0]) + if err != nil { + t.Fatal(err) + } + + out, err := s.Find(ctx, dirs[0]) + if err == nil { + t.Fatal("expected error, got: ", out) + } + + nd, err := s.Node() + if err != nil { + t.Fatal(err) + } + + nds, err := NewHamtFromDag(ds, nd) + if err != nil { + t.Fatal(err) + } + + _, err = nds.Find(ctx, dirs[0]) + if err == nil { + t.Fatal("expected not to find ", dirs[0]) + } + + for _, d := range dirs[1:] { + _, err := nds.Find(ctx, d) + if err != nil { + t.Fatal("could not find expected link after unmarshaling") + } + } + + for _, d := range dirs[1:] { + err := nds.Remove(ctx, d) + if err != nil { + t.Fatal(err) + } + } + + links, err := nds.EnumLinks() + if err != nil { + t.Fatal(err) + } + + if len(links) != 0 { + t.Fatal("expected all links to be removed") + } + + err = assertSerializationWorks(ds, nds) + if err != nil { + t.Fatal(err) + } +} + +func TestBitfieldIndexing(t *testing.T) { + ds := mdtest.Mock() + s := NewHamtShard(ds, 256) + + set := func(i int) { + s.bitfield.SetBit(s.bitfield, i, 1) + } + + assert := func(i int, val int) { + if s.indexForBitPos(i) != val { + t.Fatalf("expected index %d to be %d", i, val) + } + } + + assert(50, 0) + set(4) + set(5) + set(60) + + assert(10, 2) + set(3) + assert(10, 3) + assert(1, 0) + + assert(100, 4) + set(50) + assert(45, 3) + set(100) + assert(100, 5) +} + +// test adding a sharded directory node as the child of another directory node. +// if improperly implemented, the parent hamt may assume the child is a part of +// itself. +func TestSetHamtChild(t *testing.T) { + ds := mdtest.Mock() + s := NewHamtShard(ds, 256) + ctx := context.Background() + + e := ft.EmptyDirNode() + ds.Add(e) + + err := s.Set(ctx, "bar", e) + if err != nil { + t.Fatal(err) + } + + snd, err := s.Node() + if err != nil { + t.Fatal(err) + } + + _, ns, err := makeDir(ds, 50) + if err != nil { + t.Fatal(err) + } + + err = ns.Set(ctx, "foo", snd) + if err != nil { + t.Fatal(err) + } + + nsnd, err := ns.Node() + if err != nil { + t.Fatal(err) + } + + hs, err := NewHamtFromDag(ds, nsnd) + if err != nil { + t.Fatal(err) + } + + err = assertLink(hs, "bar", false) + if err != nil { + t.Fatal(err) + } + + err = assertLink(hs, "foo", true) + if err != nil { + t.Fatal(err) + } +} + +func printDag(ds dag.DAGService, nd *dag.ProtoNode, depth int) { + padding := strings.Repeat(" ", depth) + fmt.Println("{") + for _, l := range nd.Links() { + fmt.Printf("%s%s: %s", padding, l.Name, l.Cid.String()) + ch, err := ds.Get(context.Background(), l.Cid) + if err != nil { + panic(err) + } + + printDag(ds, ch.(*dag.ProtoNode), depth+1) + } + fmt.Println(padding + "}") +} + +func printDiff(ds dag.DAGService, a, b *dag.ProtoNode) { + diff, err := dagutils.Diff(context.TODO(), ds, a, b) + if err != nil { + panic(err) + } + + for _, d := range diff { + fmt.Println(d) + } +} + +func BenchmarkHAMTSet(b *testing.B) { + ds := mdtest.Mock() + sh := NewHamtShard(ds, 256) + nd, err := sh.Node() + if err != nil { + b.Fatal(err) + } + + _, err = ds.Add(nd) + if err != nil { + b.Fatal(err) + } + ds.Add(ft.EmptyDirNode()) + + for i := 0; i < b.N; i++ { + s, err := NewHamtFromDag(ds, nd) + if err != nil { + b.Fatal(err) + } + + err = s.Set(context.TODO(), fmt.Sprint(i), ft.EmptyDirNode()) + if err != nil { + b.Fatal(err) + } + + out, err := s.Node() + if err != nil { + b.Fatal(err) + } + + nd = out + } +} diff --git a/unixfs/hamt/util.go b/unixfs/hamt/util.go new file mode 100644 index 000000000..1727dbfed --- /dev/null +++ b/unixfs/hamt/util.go @@ -0,0 +1,61 @@ +package hamt + +import ( + "math/big" +) + +type hashBits struct { + b []byte + consumed int +} + +func mkmask(n int) byte { + return (1 << uint(n)) - 1 +} + +func (hb *hashBits) Next(i int) int { + curbi := hb.consumed / 8 + leftb := 8 - (hb.consumed % 8) + + curb := hb.b[curbi] + if i == leftb { + out := int(mkmask(i) & curb) + hb.consumed += i + return out + } else if i < leftb { + a := curb & mkmask(leftb) // mask out the high bits we don't want + b := a & ^mkmask(leftb-i) // mask out the low bits we don't want + c := b >> uint(leftb-i) // shift whats left down + hb.consumed += i + return int(c) + } else { + out := int(mkmask(leftb) & curb) + out <<= uint(i - leftb) + hb.consumed += leftb + out += hb.Next(i - leftb) + return out + } +} + +const ( + m1 = 0x5555555555555555 //binary: 0101... + m2 = 0x3333333333333333 //binary: 00110011.. + m4 = 0x0f0f0f0f0f0f0f0f //binary: 4 zeros, 4 ones ... + h01 = 0x0101010101010101 //the sum of 256 to the power of 0,1,2,3... +) + +// from https://en.wikipedia.org/wiki/Hamming_weight +func popCountUint64(x uint64) int { + x -= (x >> 1) & m1 //put count of each 2 bits into those 2 bits + x = (x & m2) + ((x >> 2) & m2) //put count of each 4 bits into those 4 bits + x = (x + (x >> 4)) & m4 //put count of each 8 bits into those 8 bits + return int((x * h01) >> 56) +} + +func popCount(i *big.Int) int { + var n int + for _, v := range i.Bits() { + n += popCountUint64(uint64(v)) + } + return n +} diff --git a/unixfs/hamt/util_test.go b/unixfs/hamt/util_test.go new file mode 100644 index 000000000..4406ac859 --- /dev/null +++ b/unixfs/hamt/util_test.go @@ -0,0 +1,58 @@ +package hamt + +import ( + "math/big" + "testing" +) + +func TestPopCount(t *testing.T) { + x := big.NewInt(0) + + for i := 0; i < 50; i++ { + x.SetBit(x, i, 1) + } + + if popCount(x) != 50 { + t.Fatal("expected popcount to be 50") + } +} + +func TestHashBitsEvenSizes(t *testing.T) { + buf := []byte{255, 127, 79, 45, 116, 99, 35, 17} + hb := hashBits{b: buf} + + for _, v := range buf { + if hb.Next(8) != int(v) { + t.Fatal("got wrong numbers back") + } + } +} + +func TestHashBitsUneven(t *testing.T) { + buf := []byte{255, 127, 79, 45, 116, 99, 35, 17} + hb := hashBits{b: buf} + + v := hb.Next(4) + if v != 15 { + t.Fatal("should have gotten 15: ", v) + } + + v = hb.Next(4) + if v != 15 { + t.Fatal("should have gotten 15: ", v) + } + + if v := hb.Next(3); v != 3 { + t.Fatalf("expected 3, but got %b", v) + } + if v := hb.Next(3); v != 7 { + t.Fatalf("expected 7, but got %b", v) + } + if v := hb.Next(3); v != 6 { + t.Fatalf("expected 6, but got %b", v) + } + + if v := hb.Next(15); v != 20269 { + t.Fatalf("expected 20269, but got %b (%d)", v, v) + } +} diff --git a/unixfs/io/dirbuilder.go b/unixfs/io/dirbuilder.go index 071eba055..fdb616e1b 100644 --- a/unixfs/io/dirbuilder.go +++ b/unixfs/io/dirbuilder.go @@ -2,48 +2,144 @@ package io import ( "context" + "fmt" + "os" mdag "github.com/ipfs/go-ipfs/merkledag" format "github.com/ipfs/go-ipfs/unixfs" - cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid" + hamt "github.com/ipfs/go-ipfs/unixfs/hamt" + + node "gx/ipfs/QmYDscK7dmdo2GZ9aumS8s5auUUAH5mR1jvj5pYhWusfK7/go-ipld-node" ) -type directoryBuilder struct { +// ShardSplitThreshold specifies how large of an unsharded directory +// the Directory code will generate. Adding entries over this value will +// result in the node being restructured into a sharded object. +var ShardSplitThreshold = 1000 + +// DefaultShardWidth is the default value used for hamt sharding width. +var DefaultShardWidth = 256 + +type Directory struct { dserv mdag.DAGService dirnode *mdag.ProtoNode + + shard *hamt.HamtShard } -// NewEmptyDirectory returns an empty merkledag Node with a folder Data chunk -func NewEmptyDirectory() *mdag.ProtoNode { - nd := new(mdag.ProtoNode) - nd.SetData(format.FolderPBData()) - return nd -} - -// NewDirectory returns a directoryBuilder. It needs a DAGService to add the Children -func NewDirectory(dserv mdag.DAGService) *directoryBuilder { - db := new(directoryBuilder) +// NewDirectory returns a Directory. It needs a DAGService to add the Children +func NewDirectory(dserv mdag.DAGService) *Directory { + db := new(Directory) db.dserv = dserv - db.dirnode = NewEmptyDirectory() + db.dirnode = format.EmptyDirNode() return db } -// AddChild adds a (name, key)-pair to the root node. -func (d *directoryBuilder) AddChild(ctx context.Context, name string, c *cid.Cid) error { - cnode, err := d.dserv.Get(ctx, c) - if err != nil { - return err - } - - cnpb, ok := cnode.(*mdag.ProtoNode) +func NewDirectoryFromNode(dserv mdag.DAGService, nd node.Node) (*Directory, error) { + pbnd, ok := nd.(*mdag.ProtoNode) if !ok { - return mdag.ErrNotProtobuf + return nil, mdag.ErrNotProtobuf } - return d.dirnode.AddNodeLinkClean(name, cnpb) + pbd, err := format.FromBytes(pbnd.Data()) + if err != nil { + return nil, err + } + + switch pbd.GetType() { + case format.TDirectory: + return &Directory{ + dserv: dserv, + dirnode: pbnd.Copy().(*mdag.ProtoNode), + }, nil + case format.THAMTShard: + shard, err := hamt.NewHamtFromDag(dserv, nd) + if err != nil { + return nil, err + } + + return &Directory{ + dserv: dserv, + shard: shard, + }, nil + default: + return nil, fmt.Errorf("merkledag node was not a directory or shard") + } } -// GetNode returns the root of this directoryBuilder -func (d *directoryBuilder) GetNode() *mdag.ProtoNode { - return d.dirnode +// AddChild adds a (name, key)-pair to the root node. +func (d *Directory) AddChild(ctx context.Context, name string, nd node.Node) error { + if d.shard == nil { + if len(d.dirnode.Links()) < ShardSplitThreshold { + _ = d.dirnode.RemoveNodeLink(name) + return d.dirnode.AddNodeLinkClean(name, nd) + } + + err := d.switchToSharding(ctx) + if err != nil { + return err + } + } + + return d.shard.Set(ctx, name, nd) +} + +func (d *Directory) switchToSharding(ctx context.Context) error { + d.shard = hamt.NewHamtShard(d.dserv, DefaultShardWidth) + for _, lnk := range d.dirnode.Links() { + cnd, err := d.dserv.Get(ctx, lnk.Cid) + if err != nil { + return err + } + + err = d.shard.Set(ctx, lnk.Name, cnd) + if err != nil { + return err + } + } + + d.dirnode = nil + return nil +} + +func (d *Directory) Links() ([]*node.Link, error) { + if d.shard == nil { + return d.dirnode.Links(), nil + } + + return d.shard.EnumLinks() +} + +func (d *Directory) Find(ctx context.Context, name string) (node.Node, error) { + if d.shard == nil { + lnk, err := d.dirnode.GetNodeLink(name) + switch err { + case mdag.ErrLinkNotFound: + return nil, os.ErrNotExist + default: + return nil, err + case nil: + } + + return d.dserv.Get(ctx, lnk.Cid) + } + + return d.shard.Find(ctx, name) +} + +func (d *Directory) RemoveChild(ctx context.Context, name string) error { + if d.shard == nil { + return d.dirnode.RemoveNodeLink(name) + } + + return d.shard.Remove(ctx, name) +} + +// GetNode returns the root of this Directory +func (d *Directory) GetNode() (node.Node, error) { + if d.shard == nil { + return d.dirnode, nil + } + + return d.shard.Node() } diff --git a/unixfs/io/dirbuilder_test.go b/unixfs/io/dirbuilder_test.go index e7539a8bc..f07ee8894 100644 --- a/unixfs/io/dirbuilder_test.go +++ b/unixfs/io/dirbuilder_test.go @@ -2,49 +2,157 @@ package io import ( "context" - "io/ioutil" + "fmt" "testing" - testu "github.com/ipfs/go-ipfs/unixfs/test" + mdtest "github.com/ipfs/go-ipfs/merkledag/test" + ft "github.com/ipfs/go-ipfs/unixfs" ) func TestEmptyNode(t *testing.T) { - n := NewEmptyDirectory() + n := ft.EmptyDirNode() if len(n.Links()) != 0 { t.Fatal("empty node should have 0 links") } } -func TestDirBuilder(t *testing.T) { - dserv := testu.GetDAGServ() - ctx, closer := context.WithCancel(context.Background()) - defer closer() - inbuf, node := testu.GetRandomNode(t, dserv, 1024) - key := node.Cid() +func TestDirectoryGrowth(t *testing.T) { + ds := mdtest.Mock() + dir := NewDirectory(ds) + ctx := context.Background() - b := NewDirectory(dserv) + d := ft.EmptyDirNode() + ds.Add(d) - b.AddChild(ctx, "random", key) + nelems := 10000 - dir := b.GetNode() - outn, err := dir.GetLinkedProtoNode(ctx, dserv, "random") + for i := 0; i < nelems; i++ { + err := dir.AddChild(ctx, fmt.Sprintf("dir%d", i), d) + if err != nil { + t.Fatal(err) + } + } + + _, err := dir.GetNode() if err != nil { t.Fatal(err) } - reader, err := NewDagReader(ctx, outn, dserv) + links, err := dir.Links() if err != nil { t.Fatal(err) } - outbuf, err := ioutil.ReadAll(reader) - if err != nil { - t.Fatal(err) + if len(links) != nelems { + t.Fatal("didnt get right number of elements") } - err = testu.ArrComp(inbuf, outbuf) - if err != nil { - t.Fatal(err) + dirc := d.Cid() + + names := make(map[string]bool) + for _, l := range links { + names[l.Name] = true + if !l.Cid.Equals(dirc) { + t.Fatal("link wasnt correct") + } } + for i := 0; i < nelems; i++ { + dn := fmt.Sprintf("dir%d", i) + if !names[dn] { + t.Fatal("didnt find directory: ", dn) + } + + _, err := dir.Find(context.Background(), dn) + if err != nil { + t.Fatal(err) + } + } +} + +func TestDuplicateAddDir(t *testing.T) { + ds := mdtest.Mock() + dir := NewDirectory(ds) + ctx := context.Background() + nd := ft.EmptyDirNode() + + err := dir.AddChild(ctx, "test", nd) + if err != nil { + t.Fatal(err) + } + + err = dir.AddChild(ctx, "test", nd) + if err != nil { + t.Fatal(err) + } + + lnks, err := dir.Links() + if err != nil { + t.Fatal(err) + } + + if len(lnks) != 1 { + t.Fatal("expected only one link") + } +} + +func TestDirBuilder(t *testing.T) { + ds := mdtest.Mock() + dir := NewDirectory(ds) + ctx := context.Background() + + child := ft.EmptyDirNode() + _, err := ds.Add(child) + if err != nil { + t.Fatal(err) + } + + count := 5000 + + for i := 0; i < count; i++ { + err := dir.AddChild(ctx, fmt.Sprintf("entry %d", i), child) + if err != nil { + t.Fatal(err) + } + } + + dirnd, err := dir.GetNode() + if err != nil { + t.Fatal(err) + } + + links, err := dir.Links() + if err != nil { + t.Fatal(err) + } + + if len(links) != count { + t.Fatal("not enough links dawg", len(links), count) + } + + adir, err := NewDirectoryFromNode(ds, dirnd) + if err != nil { + t.Fatal(err) + } + + links, err = adir.Links() + if err != nil { + t.Fatal(err) + } + + names := make(map[string]bool) + for _, lnk := range links { + names[lnk.Name] = true + } + + for i := 0; i < count; i++ { + n := fmt.Sprintf("entry %d", i) + if !names[n] { + t.Fatal("COULDNT FIND: ", n) + } + } + + if len(links) != count { + t.Fatal("wrong number of links", len(links), count) + } } diff --git a/unixfs/io/resolve.go b/unixfs/io/resolve.go index 5970e72b5..ab9239601 100644 --- a/unixfs/io/resolve.go +++ b/unixfs/io/resolve.go @@ -5,26 +5,21 @@ import ( dag "github.com/ipfs/go-ipfs/merkledag" ft "github.com/ipfs/go-ipfs/unixfs" + hamt "github.com/ipfs/go-ipfs/unixfs/hamt" node "gx/ipfs/QmYDscK7dmdo2GZ9aumS8s5auUUAH5mR1jvj5pYhWusfK7/go-ipld-node" ) func ResolveUnixfsOnce(ctx context.Context, ds dag.DAGService, nd node.Node, name string) (*node.Link, error) { - pbnd, ok := nd.(*dag.ProtoNode) - if !ok { - lnk, _, err := nd.ResolveLink([]string{name}) - return lnk, err - } + switch nd := nd.(type) { + case *dag.ProtoNode: + upb, err := ft.FromBytes(nd.Data()) + if err != nil { + // Not a unixfs node, use standard object traversal code + return nd.GetNodeLink(name) + } - upb, err := ft.FromBytes(pbnd.Data()) - if err != nil { - // Not a unixfs node, use standard object traversal code - lnk, _, err := nd.ResolveLink([]string{name}) - return lnk, err - } - - switch upb.GetType() { - /* + switch upb.GetType() { case ft.THAMTShard: s, err := hamt.NewHamtFromDag(ds, nd) if err != nil { @@ -37,10 +32,15 @@ func ResolveUnixfsOnce(ctx context.Context, ds dag.DAGService, nd node.Node, nam return nil, err } - return dag.MakeLink(out) - */ + return node.MakeLink(out) + default: + return nd.GetNodeLink(name) + } default: lnk, _, err := nd.ResolveLink([]string{name}) - return lnk, err + if err != nil { + return nil, err + } + return lnk, nil } } diff --git a/unixfs/pb/unixfs.pb.go b/unixfs/pb/unixfs.pb.go index ffd3bb905..e28053031 100644 --- a/unixfs/pb/unixfs.pb.go +++ b/unixfs/pb/unixfs.pb.go @@ -31,6 +31,7 @@ const ( Data_File Data_DataType = 2 Data_Metadata Data_DataType = 3 Data_Symlink Data_DataType = 4 + Data_HAMTShard Data_DataType = 5 ) var Data_DataType_name = map[int32]string{ @@ -39,6 +40,7 @@ var Data_DataType_name = map[int32]string{ 2: "File", 3: "Metadata", 4: "Symlink", + 5: "HAMTShard", } var Data_DataType_value = map[string]int32{ "Raw": 0, @@ -46,6 +48,7 @@ var Data_DataType_value = map[string]int32{ "File": 2, "Metadata": 3, "Symlink": 4, + "HAMTShard": 5, } func (x Data_DataType) Enum() *Data_DataType { @@ -70,6 +73,8 @@ type Data struct { Data []byte `protobuf:"bytes,2,opt,name=Data" json:"Data,omitempty"` Filesize *uint64 `protobuf:"varint,3,opt,name=filesize" json:"filesize,omitempty"` Blocksizes []uint64 `protobuf:"varint,4,rep,name=blocksizes" json:"blocksizes,omitempty"` + HashType *uint64 `protobuf:"varint,5,opt,name=hashType" json:"hashType,omitempty"` + Fanout *uint64 `protobuf:"varint,6,opt,name=fanout" json:"fanout,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -105,6 +110,20 @@ func (m *Data) GetBlocksizes() []uint64 { return nil } +func (m *Data) GetHashType() uint64 { + if m != nil && m.HashType != nil { + return *m.HashType + } + return 0 +} + +func (m *Data) GetFanout() uint64 { + if m != nil && m.Fanout != nil { + return *m.Fanout + } + return 0 +} + type Metadata struct { MimeType *string `protobuf:"bytes,1,opt,name=MimeType" json:"MimeType,omitempty"` XXX_unrecognized []byte `json:"-"` diff --git a/unixfs/pb/unixfs.proto b/unixfs/pb/unixfs.proto index 2e4d47947..6feb7aad6 100644 --- a/unixfs/pb/unixfs.proto +++ b/unixfs/pb/unixfs.proto @@ -7,12 +7,16 @@ message Data { File = 2; Metadata = 3; Symlink = 4; + HAMTShard = 5; } required DataType Type = 1; optional bytes Data = 2; optional uint64 filesize = 3; repeated uint64 blocksizes = 4; + + optional uint64 hashType = 5; + optional uint64 fanout = 6; } message Metadata {