diff --git a/core/commands/cat.go b/core/commands/cat.go index 6ca60113c..8c04a434b 100644 --- a/core/commands/cat.go +++ b/core/commands/cat.go @@ -6,7 +6,7 @@ import ( "io" "os" - cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv" + "github.com/ipfs/go-ipfs/core/commands/cmdenv" "github.com/ipfs/go-ipfs/core/coreapi/interface" cmds "gx/ipfs/QmRRovo1DE6i5cMjCbf19mQCSuszF6SKwdZNUMS7MtBnH1/go-ipfs-cmds" @@ -135,32 +135,48 @@ func cat(ctx context.Context, api iface.CoreAPI, paths []string, offset int64, m return nil, 0, err } - read, err := api.Unixfs().Cat(ctx, fpath) + file, err := api.Unixfs().Get(ctx, fpath) if err != nil { return nil, 0, err } - if offset > int64(read.Size()) { - offset = offset - int64(read.Size()) + + if file.IsDirectory() { + return nil, 0, iface.ErrIsDir + } + + fsize, err := file.Size() + if err != nil { + return nil, 0, err + } + + if offset > fsize { + offset = offset - fsize continue } - count, err := read.Seek(offset, io.SeekStart) + + count, err := file.Seek(offset, io.SeekStart) if err != nil { return nil, 0, err } offset = 0 - size := uint64(read.Size() - uint64(count)) + fsize, err = file.Size() + if err != nil { + return nil, 0, err + } + + size := uint64(fsize - count) length += size if max > 0 && length >= uint64(max) { - var r io.Reader = read + var r io.Reader = file if overshoot := int64(length - uint64(max)); overshoot != 0 { - r = io.LimitReader(read, int64(size)-overshoot) + r = io.LimitReader(file, int64(size)-overshoot) length = uint64(max) } readers = append(readers, r) break } - readers = append(readers, read) + readers = append(readers, file) } return readers, length, nil } diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index ba6082626..f6fdc04ee 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -14,21 +14,26 @@ Interfaces here aren't yet completely stable. package coreapi import ( + "context" + core "github.com/ipfs/go-ipfs/core" coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" + dag "gx/ipfs/QmVvNkTCx8V9Zei8xuTYTBdUXmbnDRS4iNuw1SztYyhQwQ/go-merkledag" logging "gx/ipfs/QmZChCsSt8DctjceaL56Eibc29CVQq4dGKRXC5JRZ6Ppae/go-log" + ipld "gx/ipfs/QmdDXJs4axxefSPgK6Y1QhpJWKuDPnGJiqgq4uncb4rFHL/go-ipld-format" ) var log = logging.Logger("core/coreapi") type CoreAPI struct { node *core.IpfsNode + dag ipld.DAGService } // NewCoreAPI creates new instance of IPFS CoreAPI backed by go-ipfs Node. func NewCoreAPI(n *core.IpfsNode) coreiface.CoreAPI { - api := &CoreAPI{n} + api := &CoreAPI{n, n.DAG} return api } @@ -81,3 +86,9 @@ func (api *CoreAPI) Swarm() coreiface.SwarmAPI { func (api *CoreAPI) PubSub() coreiface.PubSubAPI { return (*PubSubAPI)(api) } + +// getSession returns new api backed by the same node with a read-only session DAG +func (api *CoreAPI) getSession(ctx context.Context) *CoreAPI { + ng := dag.NewReadOnlyDagService(dag.NewSession(ctx, api.dag)) + return &CoreAPI{api.node, ng} +} diff --git a/core/coreapi/dag.go b/core/coreapi/dag.go index 7adf823ee..976c04f83 100644 --- a/core/coreapi/dag.go +++ b/core/coreapi/dag.go @@ -31,7 +31,7 @@ type dagBatch struct { func (api *DagAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.DagPutOption) (coreiface.ResolvedPath, error) { nd, err := getNode(src, opts...) - err = api.node.DAG.Add(ctx, nd) + err = api.dag.Add(ctx, nd) if err != nil { return nil, err } @@ -96,7 +96,7 @@ func (b *dagBatch) Commit(ctx context.Context) error { b.toPut = nil }() - return b.api.node.DAG.AddMany(ctx, b.toPut) + return b.api.dag.AddMany(ctx, b.toPut) } func getNode(src io.Reader, opts ...caopts.DagPutOption) (ipld.Node, error) { diff --git a/core/coreapi/interface/unixfs.go b/core/coreapi/interface/unixfs.go index 078d648bc..dd7e5a392 100644 --- a/core/coreapi/interface/unixfs.go +++ b/core/coreapi/interface/unixfs.go @@ -2,6 +2,7 @@ package iface import ( "context" + "io" options "github.com/ipfs/go-ipfs/core/coreapi/interface/options" @@ -17,6 +18,11 @@ type AddEvent struct { Size string `json:",omitempty"` } +type UnixfsFile interface { + files.SizeFile + io.Seeker +} + // UnixfsAPI is the basic interface to immutable files in IPFS // NOTE: This API is heavily WIP, things are guaranteed to break frequently type UnixfsAPI interface { @@ -29,11 +35,7 @@ type UnixfsAPI interface { // // Note that some implementations of this API may apply the specified context // to operations performed on the returned file - Get(context.Context, Path) (files.File, error) - - // Cat returns a reader for the file - // TODO: Remove in favour of Get (if we use Get on a file we still have reader directly, so..) - Cat(context.Context, Path) (Reader, error) + Get(context.Context, Path) (UnixfsFile, error) // Ls returns the list of links in a directory Ls(context.Context, Path) ([]*ipld.Link, error) diff --git a/core/coreapi/object.go b/core/coreapi/object.go index d52b6a538..8acc9e1f1 100644 --- a/core/coreapi/object.go +++ b/core/coreapi/object.go @@ -50,7 +50,7 @@ func (api *ObjectAPI) New(ctx context.Context, opts ...caopts.ObjectNewOption) ( n = ft.EmptyDirNode() } - err = api.node.DAG.Add(ctx, n) + err = api.dag.Add(ctx, n) if err != nil { return nil, err } @@ -121,7 +121,7 @@ func (api *ObjectAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.Obj defer api.node.Blockstore.PinLock().Unlock() } - err = api.node.DAG.Add(ctx, dagnode) + err = api.dag.Add(ctx, dagnode) if err != nil { return nil, err } @@ -219,14 +219,14 @@ func (api *ObjectAPI) AddLink(ctx context.Context, base coreiface.Path, name str createfunc = ft.EmptyDirNode } - e := dagutils.NewDagEditor(basePb, api.node.DAG) + e := dagutils.NewDagEditor(basePb, api.dag) err = e.InsertNodeAtPath(ctx, name, childNd, createfunc) if err != nil { return nil, err } - nnode, err := e.Finalize(ctx, api.node.DAG) + nnode, err := e.Finalize(ctx, api.dag) if err != nil { return nil, err } @@ -245,14 +245,14 @@ func (api *ObjectAPI) RmLink(ctx context.Context, base coreiface.Path, link stri return nil, dag.ErrNotProtobuf } - e := dagutils.NewDagEditor(basePb, api.node.DAG) + e := dagutils.NewDagEditor(basePb, api.dag) err = e.RmLink(ctx, link) if err != nil { return nil, err } - nnode, err := e.Finalize(ctx, api.node.DAG) + nnode, err := e.Finalize(ctx, api.dag) if err != nil { return nil, err } @@ -289,7 +289,7 @@ func (api *ObjectAPI) patchData(ctx context.Context, path coreiface.Path, r io.R } pbnd.SetData(data) - err = api.node.DAG.Add(ctx, pbnd) + err = api.dag.Add(ctx, pbnd) if err != nil { return nil, err } @@ -308,7 +308,7 @@ func (api *ObjectAPI) Diff(ctx context.Context, before coreiface.Path, after cor return nil, err } - changes, err := dagutils.Diff(ctx, api.node.DAG, beforeNd, afterNd) + changes, err := dagutils.Diff(ctx, api.dag, beforeNd, afterNd) if err != nil { return nil, err } diff --git a/core/coreapi/path.go b/core/coreapi/path.go index 1fbc7fda8..e8c06708c 100644 --- a/core/coreapi/path.go +++ b/core/coreapi/path.go @@ -7,12 +7,12 @@ import ( "github.com/ipfs/go-ipfs/core" coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" - uio "gx/ipfs/QmWE6Ftsk98cG2MTVgH4wJT8VP2nL9TuBkYTrz9GSqcsh5/go-unixfs/io" - ipfspath "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path" - "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path/resolver" "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" + uio "gx/ipfs/QmWE6Ftsk98cG2MTVgH4wJT8VP2nL9TuBkYTrz9GSqcsh5/go-unixfs/io" ipld "gx/ipfs/QmdDXJs4axxefSPgK6Y1QhpJWKuDPnGJiqgq4uncb4rFHL/go-ipld-format" + ipfspath "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path" + "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path/resolver" ) // ResolveNode resolves the path `p` using Unixfs resolver, gets and returns the @@ -23,7 +23,7 @@ func (api *CoreAPI) ResolveNode(ctx context.Context, p coreiface.Path) (ipld.Nod return nil, err } - node, err := api.node.DAG.Get(ctx, rp.Cid()) + node, err := api.dag.Get(ctx, rp.Cid()) if err != nil { return nil, err } @@ -57,7 +57,7 @@ func (api *CoreAPI) ResolvePath(ctx context.Context, p coreiface.Path) (coreifac } r := &resolver.Resolver{ - DAG: api.node.DAG, + DAG: api.dag, ResolveOnce: resolveOnce, } diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go index c438f26a4..09fe4b917 100644 --- a/core/coreapi/pin.go +++ b/core/coreapi/pin.go @@ -193,7 +193,7 @@ func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) ([]coreiface.Pi if typeStr == "indirect" || typeStr == "all" { set := cid.NewSet() for _, k := range api.node.Pinning.RecursiveKeys() { - err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(api.node.DAG), k, set.Visit) + err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(api.dag), k, set.Visit) if err != nil { return nil, err } diff --git a/core/coreapi/unixfile.go b/core/coreapi/unixfile.go index b7c3ae2bf..844863bd4 100644 --- a/core/coreapi/unixfile.go +++ b/core/coreapi/unixfile.go @@ -8,6 +8,8 @@ import ( gopath "path" "time" + "github.com/ipfs/go-ipfs/core/coreapi/interface" + dag "gx/ipfs/QmVvNkTCx8V9Zei8xuTYTBdUXmbnDRS4iNuw1SztYyhQwQ/go-merkledag" ft "gx/ipfs/QmWE6Ftsk98cG2MTVgH4wJT8VP2nL9TuBkYTrz9GSqcsh5/go-unixfs" uio "gx/ipfs/QmWE6Ftsk98cG2MTVgH4wJT8VP2nL9TuBkYTrz9GSqcsh5/go-unixfs/io" @@ -95,6 +97,14 @@ func (d *ufsDirectory) NextFile() (files.File, error) { return newUnixfsFile(d.ctx, d.dserv, nd, l.Name, d) } +func (d *ufsDirectory) Size() (int64, error) { + return 0, files.ErrNotReader +} + +func (d *ufsDirectory) Seek(offset int64, whence int) (int64, error) { + return 0, files.ErrNotReader +} + type ufsFile struct { uio.DagReader @@ -122,7 +132,7 @@ func (f *ufsFile) Size() (int64, error) { return int64(f.DagReader.Size()), nil } -func newUnixfsDir(ctx context.Context, dserv ipld.DAGService, nd ipld.Node, name string, path string) (files.File, error) { +func newUnixfsDir(ctx context.Context, dserv ipld.DAGService, nd ipld.Node, name string, path string) (iface.UnixfsFile, error) { dir, err := uio.NewDirectoryFromNode(dserv, nd) if err != nil { return nil, err @@ -153,7 +163,7 @@ func newUnixfsDir(ctx context.Context, dserv ipld.DAGService, nd ipld.Node, name }, nil } -func newUnixfsFile(ctx context.Context, dserv ipld.DAGService, nd ipld.Node, name string, parent files.File) (files.File, error) { +func newUnixfsFile(ctx context.Context, dserv ipld.DAGService, nd ipld.Node, name string, parent files.File) (iface.UnixfsFile, error) { path := name if parent != nil { path = gopath.Join(parent.FullPath(), name) diff --git a/core/coreapi/unixfs.go b/core/coreapi/unixfs.go index 9b1e505af..02d1fe402 100644 --- a/core/coreapi/unixfs.go +++ b/core/coreapi/unixfs.go @@ -133,31 +133,15 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.File, opts ...options return coreiface.IpfsPath(nd.Cid()), nil } -func (api *UnixfsAPI) Get(ctx context.Context, p coreiface.Path) (files.File, error) { - nd, err := api.core().ResolveNode(ctx, p) +func (api *UnixfsAPI) Get(ctx context.Context, p coreiface.Path) (coreiface.UnixfsFile, error) { + ses := api.core().getSession(ctx) + + nd, err := ses.ResolveNode(ctx, p) if err != nil { return nil, err } - return newUnixfsFile(ctx, api.node.DAG, nd, "", nil) -} - -// Cat returns the data contained by an IPFS or IPNS object(s) at path `p`. -func (api *UnixfsAPI) Cat(ctx context.Context, p coreiface.Path) (coreiface.Reader, error) { - dget := api.node.DAG // TODO: use a session here once routing perf issues are resolved - - dagnode, err := api.core().ResolveNode(ctx, p) - if err != nil { - return nil, err - } - - r, err := uio.NewDagReader(ctx, dagnode, dget) - if err == uio.ErrIsDir { - return nil, coreiface.ErrIsDir - } else if err != nil { - return nil, err - } - return r, nil + return newUnixfsFile(ctx, ses.dag, nd, "", nil) } // Ls returns the contents of an IPFS or IPNS object(s) at path p, with the format: @@ -169,7 +153,7 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p coreiface.Path) ([]*ipld.Link, e } var ndlinks []*ipld.Link - dir, err := uio.NewDirectoryFromNode(api.node.DAG, dagnode) + dir, err := uio.NewDirectoryFromNode(api.dag, dagnode) switch err { case nil: l, err := dir.Links(ctx) @@ -190,6 +174,6 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p coreiface.Path) ([]*ipld.Link, e return links, nil } -func (api *UnixfsAPI) core() coreiface.CoreAPI { +func (api *UnixfsAPI) core() *CoreAPI { return (*CoreAPI)(api) } diff --git a/core/coreapi/unixfs_test.go b/core/coreapi/unixfs_test.go index 8a682b7ca..d0b786c74 100644 --- a/core/coreapi/unixfs_test.go +++ b/core/coreapi/unixfs_test.go @@ -623,7 +623,7 @@ func TestAddHashOnly(t *testing.T) { } } -func TestCatEmptyFile(t *testing.T) { +func TestGetEmptyFile(t *testing.T) { ctx := context.Background() node, api, err := makeAPI(ctx) if err != nil { @@ -640,7 +640,7 @@ func TestCatEmptyFile(t *testing.T) { t.Fatal(err) } - r, err := api.Unixfs().Cat(ctx, emptyFilePath) + r, err := api.Unixfs().Get(ctx, emptyFilePath) if err != nil { t.Fatal(err) } @@ -655,7 +655,7 @@ func TestCatEmptyFile(t *testing.T) { } } -func TestCatDir(t *testing.T) { +func TestGetDir(t *testing.T) { ctx := context.Background() node, api, err := makeAPI(ctx) if err != nil { @@ -677,13 +677,18 @@ func TestCatDir(t *testing.T) { t.Fatalf("expected path %s, got: %s", emptyDir.Cid(), p.String()) } - _, err = api.Unixfs().Cat(ctx, coreiface.IpfsPath(emptyDir.Cid())) - if err != coreiface.ErrIsDir { + r, err := api.Unixfs().Get(ctx, coreiface.IpfsPath(emptyDir.Cid())) + if err != nil { + t.Error(err) + } + + _, err = r.Read(make([]byte, 2)) + if err != files.ErrNotReader { t.Fatalf("expected ErrIsDir, got: %s", err) } } -func TestCatNonUnixfs(t *testing.T) { +func TestGetNonUnixfs(t *testing.T) { ctx := context.Background() node, api, err := makeAPI(ctx) if err != nil { @@ -696,7 +701,7 @@ func TestCatNonUnixfs(t *testing.T) { t.Error(err) } - _, err = api.Unixfs().Cat(ctx, coreiface.IpfsPath(nd.Cid())) + _, err = api.Unixfs().Get(ctx, coreiface.IpfsPath(nd.Cid())) if !strings.Contains(err.Error(), "proto: required field") { t.Fatalf("expected protobuf error, got: %s", err) } @@ -713,7 +718,7 @@ func TestCatOffline(t *testing.T) { if err != nil { t.Error(err) } - _, err = api.Unixfs().Cat(ctx, p) + _, err = api.Unixfs().Get(ctx, p) if err != coreiface.ErrOffline { t.Fatalf("expected ErrOffline, got: %s", err) } diff --git a/core/corehttp/gateway_handler.go b/core/corehttp/gateway_handler.go index fc4d4fe65..88c489ebc 100644 --- a/core/corehttp/gateway_handler.go +++ b/core/corehttp/gateway_handler.go @@ -178,19 +178,17 @@ func (i *gatewayHandler) getOrHeadHandler(ctx context.Context, w http.ResponseWr return } - dr, err := i.api.Unixfs().Cat(ctx, resolvedPath) - dir := false - switch err { - case nil: - // Cat() worked - defer dr.Close() - case coreiface.ErrIsDir: - dir = true - default: + dr, err := i.api.Unixfs().Get(ctx, resolvedPath) + if err != nil { webError(w, "ipfs cat "+escapedURLPath, err, http.StatusNotFound) return } + dir := dr.IsDirectory() + if !dir { + defer dr.Close() + } + // Check etag send back to us etag := "\"" + resolvedPath.Cid().String() + "\"" if r.Header.Get("If-None-Match") == etag || r.Header.Get("If-None-Match") == "W/"+etag { @@ -297,7 +295,7 @@ func (i *gatewayHandler) getOrHeadHandler(ctx context.Context, w http.ResponseWr return } - dr, err := i.api.Unixfs().Cat(ctx, coreiface.IpfsPath(ixnd.Cid())) + dr, err := i.api.Unixfs().Get(ctx, coreiface.IpfsPath(ixnd.Cid())) if err != nil { internalWebError(w, err) return @@ -372,7 +370,7 @@ func (i *gatewayHandler) getOrHeadHandler(ctx context.Context, w http.ResponseWr } type sizeReadSeeker interface { - Size() uint64 + Size() (int64, error) io.ReadSeeker } @@ -383,7 +381,7 @@ type sizeSeeker struct { func (s *sizeSeeker) Seek(offset int64, whence int) (int64, error) { if whence == io.SeekEnd && offset == 0 { - return int64(s.Size()), nil + return s.Size() } return s.sizeReadSeeker.Seek(offset, whence) diff --git a/fuse/readonly/ipfs_test.go b/fuse/readonly/ipfs_test.go index 1febd6342..f8a2a0699 100644 --- a/fuse/readonly/ipfs_test.go +++ b/fuse/readonly/ipfs_test.go @@ -175,7 +175,7 @@ func TestIpfsStressRead(t *testing.T) { errs <- err } - read, err := api.Unixfs().Cat(nd.Context(), item) + read, err := api.Unixfs().Get(nd.Context(), item) if err != nil { errs <- err } diff --git a/test/integration/addcat_test.go b/test/integration/addcat_test.go index 207e84a18..6665e562b 100644 --- a/test/integration/addcat_test.go +++ b/test/integration/addcat_test.go @@ -147,7 +147,7 @@ func DirectAddCat(data []byte, conf testutil.LatencyConfig) error { return err } - readerCatted, err := catterApi.Unixfs().Cat(ctx, ap) + readerCatted, err := catterApi.Unixfs().Get(ctx, ap) if err != nil { return err } diff --git a/test/integration/bench_cat_test.go b/test/integration/bench_cat_test.go index e4a44d1c0..dd64a57ad 100644 --- a/test/integration/bench_cat_test.go +++ b/test/integration/bench_cat_test.go @@ -94,7 +94,7 @@ func benchCat(b *testing.B, data []byte, conf testutil.LatencyConfig) error { } b.StartTimer() - readerCatted, err := catterApi.Unixfs().Cat(ctx, ap) + readerCatted, err := catterApi.Unixfs().Get(ctx, ap) if err != nil { return err } diff --git a/test/integration/three_legged_cat_test.go b/test/integration/three_legged_cat_test.go index 031ba10b9..73f360b99 100644 --- a/test/integration/three_legged_cat_test.go +++ b/test/integration/three_legged_cat_test.go @@ -126,7 +126,7 @@ func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error { return err } - readerCatted, err := catterApi.Unixfs().Cat(ctx, ap) + readerCatted, err := catterApi.Unixfs().Get(ctx, ap) if err != nil { return err }