Merge pull request #5611 from ipfs/features/streaming-ls-5600

Add --stream option to `ls` command
This commit is contained in:
Steven Allen 2018-11-20 23:42:22 -08:00 committed by GitHub
commit b7a48531b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 277 additions and 116 deletions

View File

@ -1,13 +1,12 @@
package commands
import (
"bytes"
"fmt"
"io"
"os"
"text/tabwriter"
cmds "github.com/ipfs/go-ipfs/commands"
e "github.com/ipfs/go-ipfs/core/commands/e"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
iface "github.com/ipfs/go-ipfs/core/coreapi/interface"
cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
@ -16,22 +15,28 @@ import (
unixfspb "gx/ipfs/QmUnHNqhSB1JgzVCxL1Kz3yb4bdyB4q1Z9AD5AUBVmt3fZ/go-unixfs/pb"
blockservice "gx/ipfs/QmVDTbzzTwnuBwNbJdhW3u7LoBQp46bezm9yp4z1RoEepM/go-blockservice"
offline "gx/ipfs/QmYZwey1thDTynSrvd6qQkX24UpTka6TFhQ2v569UpoqxD/go-ipfs-exchange-offline"
cmds "gx/ipfs/Qma6uuSyjkecGhMFFLfzyJDPyoDtNJSHJNweDccZhaWkgU/go-ipfs-cmds"
merkledag "gx/ipfs/QmcGt25mrjuB2kKW2zhPbXVZNHc4yoTDQ65NA8m6auP2f1/go-merkledag"
ipld "gx/ipfs/QmcKKBwfz6FyQdHR2jsXrrF6XeSBXYL86anmWNewpFpoF5/go-ipld-format"
"gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
)
// LsLink contains printable data for a single ipld link in ls output
type LsLink struct {
Name, Hash string
Size uint64
Type unixfspb.Data_DataType
}
// LsObject is an element of LsOutput
// It can represent all or part of a directory
type LsObject struct {
Hash string
Links []LsLink
}
// LsOutput is a set of printable data for directories,
// it can be complete or partial
type LsOutput struct {
Objects []LsObject
}
@ -39,6 +44,7 @@ type LsOutput struct {
const (
lsHeadersOptionNameTime = "headers"
lsResolveTypeOptionName = "resolve-type"
lsStreamOptionName = "stream"
)
var LsCmd = &cmds.Command{
@ -60,32 +66,20 @@ The JSON output contains type information.
Options: []cmdkit.Option{
cmdkit.BoolOption(lsHeadersOptionNameTime, "v", "Print table headers (Hash, Size, Name)."),
cmdkit.BoolOption(lsResolveTypeOptionName, "Resolve linked objects to find out their types.").WithDefault(true),
cmdkit.BoolOption(lsStreamOptionName, "s", "Enable exprimental streaming of directory entries as they are traversed."),
},
Run: func(req cmds.Request, res cmds.Response) {
nd, err := req.InvocContext().GetNode()
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
nd, err := cmdenv.GetNode(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
api, err := req.InvocContext().GetApi()
api, err := cmdenv.GetApi(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
// get options early -> exit early in case of error
if _, _, err := req.Option(lsHeadersOptionNameTime).Bool(); err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
resolve, _, err := req.Option(lsResolveTypeOptionName).Bool()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
resolve, _ := req.Options[lsResolveTypeOptionName].(bool)
dserv := nd.DAG
if !resolve {
offlineexch := offline.Exchange(nd.Blockstore)
@ -93,125 +87,214 @@ The JSON output contains type information.
dserv = merkledag.NewDAGService(bserv)
}
paths := req.Arguments()
err = req.ParseBodyArgs()
if err != nil {
return err
}
paths := req.Arguments
var dagnodes []ipld.Node
for _, fpath := range paths {
p, err := iface.ParsePath(fpath)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
dagnode, err := api.ResolveNode(req.Context(), p)
dagnode, err := api.ResolveNode(req.Context, p)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
dagnodes = append(dagnodes, dagnode)
}
output := make([]LsObject, len(req.Arguments()))
ng := merkledag.NewSession(req.Context(), nd.DAG)
ng := merkledag.NewSession(req.Context, nd.DAG)
ro := merkledag.NewReadOnlyDagService(ng)
stream, _ := req.Options[lsStreamOptionName].(bool)
if !stream {
output := make([]LsObject, len(req.Arguments))
for i, dagnode := range dagnodes {
dir, err := uio.NewDirectoryFromNode(ro, dagnode)
if err != nil && err != uio.ErrNotADir {
return fmt.Errorf("the data in %s (at %q) is not a UnixFS directory: %s", dagnode.Cid(), paths[i], err)
}
var links []*ipld.Link
if dir == nil {
links = dagnode.Links()
} else {
links, err = dir.Links(req.Context)
if err != nil {
return err
}
}
outputLinks := make([]LsLink, len(links))
for j, link := range links {
lsLink, err := makeLsLink(req, dserv, resolve, link)
if err != nil {
return err
}
outputLinks[j] = *lsLink
}
output[i] = LsObject{
Hash: paths[i],
Links: outputLinks,
}
}
return cmds.EmitOnce(res, &LsOutput{output})
}
for i, dagnode := range dagnodes {
dir, err := uio.NewDirectoryFromNode(ro, dagnode)
if err != nil && err != uio.ErrNotADir {
res.SetError(fmt.Errorf("the data in %s (at %q) is not a UnixFS directory: %s", dagnode.Cid(), paths[i], err), cmdkit.ErrNormal)
return
return fmt.Errorf("the data in %s (at %q) is not a UnixFS directory: %s", dagnode.Cid(), paths[i], err)
}
var links []*ipld.Link
var linkResults <-chan unixfs.LinkResult
if dir == nil {
links = dagnode.Links()
linkResults = makeDagNodeLinkResults(req, dagnode)
} else {
links, err = dir.Links(req.Context())
linkResults = dir.EnumLinksAsync(req.Context)
}
for linkResult := range linkResults {
if linkResult.Err != nil {
return linkResult.Err
}
link := linkResult.Link
lsLink, err := makeLsLink(req, dserv, resolve, link)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
}
output[i] = LsObject{
Hash: paths[i],
Links: make([]LsLink, len(links)),
}
for j, link := range links {
t := unixfspb.Data_DataType(-1)
switch link.Cid.Type() {
case cid.Raw:
// No need to check with raw leaves
t = unixfs.TFile
case cid.DagProtobuf:
linkNode, err := link.GetNode(req.Context(), dserv)
if err == ipld.ErrNotFound && !resolve {
// not an error
linkNode = nil
} else if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
if pn, ok := linkNode.(*merkledag.ProtoNode); ok {
d, err := unixfs.FSNodeFromBytes(pn.Data())
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
t = d.Type()
}
}
output[i].Links[j] = LsLink{
Name: link.Name,
Hash: link.Cid.String(),
Size: link.Size,
Type: t,
output := []LsObject{{
Hash: paths[i],
Links: []LsLink{*lsLink},
}}
if err = res.Emit(&LsOutput{output}); err != nil {
return err
}
}
}
res.SetOutput(&LsOutput{output})
return nil
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
PostRun: cmds.PostRunMap{
cmds.CLI: func(res cmds.Response, re cmds.ResponseEmitter) error {
req := res.Request()
lastObjectHash := ""
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
}
headers, _, _ := res.Request().Option(lsHeadersOptionNameTime).Bool()
output, ok := v.(*LsOutput)
if !ok {
return nil, e.TypeErr(output, v)
}
buf := new(bytes.Buffer)
w := tabwriter.NewWriter(buf, 1, 2, 1, ' ', 0)
for _, object := range output.Objects {
if len(output.Objects) > 1 {
fmt.Fprintf(w, "%s:\n", object.Hash)
}
if headers {
fmt.Fprintln(w, "Hash\tSize\tName")
}
for _, link := range object.Links {
if link.Type == unixfs.TDirectory {
link.Name += "/"
for {
v, err := res.Next()
if err != nil {
if err == io.EOF {
return nil
}
fmt.Fprintf(w, "%s\t%v\t%s\n", link.Hash, link.Size, link.Name)
}
if len(output.Objects) > 1 {
fmt.Fprintln(w)
return err
}
out := v.(*LsOutput)
lastObjectHash = tabularOutput(req, os.Stdout, out, lastObjectHash, false)
}
w.Flush()
return buf, nil
},
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *LsOutput) error {
// when streaming over HTTP using a text encoder, we cannot render breaks
// between directories because we don't know the hash of the last
// directory encoder
ignoreBreaks, _ := req.Options[lsStreamOptionName].(bool)
tabularOutput(req, w, out, "", ignoreBreaks)
return nil
}),
},
Type: LsOutput{},
}
func makeDagNodeLinkResults(req *cmds.Request, dagnode ipld.Node) <-chan unixfs.LinkResult {
links := dagnode.Links()
linkResults := make(chan unixfs.LinkResult, len(links))
defer close(linkResults)
for _, l := range links {
linkResults <- unixfs.LinkResult{
Link: l,
Err: nil,
}
}
return linkResults
}
func makeLsLink(req *cmds.Request, dserv ipld.DAGService, resolve bool, link *ipld.Link) (*LsLink, error) {
t := unixfspb.Data_DataType(-1)
switch link.Cid.Type() {
case cid.Raw:
// No need to check with raw leaves
t = unixfs.TFile
case cid.DagProtobuf:
linkNode, err := link.GetNode(req.Context, dserv)
if err == ipld.ErrNotFound && !resolve {
// not an error
linkNode = nil
} else if err != nil {
return nil, err
}
if pn, ok := linkNode.(*merkledag.ProtoNode); ok {
d, err := unixfs.FSNodeFromBytes(pn.Data())
if err != nil {
return nil, err
}
t = d.Type()
}
}
return &LsLink{
Name: link.Name,
Hash: link.Cid.String(),
Size: link.Size,
Type: t,
}, nil
}
func tabularOutput(req *cmds.Request, w io.Writer, out *LsOutput, lastObjectHash string, ignoreBreaks bool) string {
headers, _ := req.Options[lsHeadersOptionNameTime].(bool)
stream, _ := req.Options[lsStreamOptionName].(bool)
// in streaming mode we can't automatically align the tabs
// so we take a best guess
var minTabWidth int
if stream {
minTabWidth = 10
} else {
minTabWidth = 1
}
multipleFolders := len(req.Arguments) > 1
tw := tabwriter.NewWriter(w, minTabWidth, 2, 1, ' ', 0)
for _, object := range out.Objects {
if !ignoreBreaks && object.Hash != lastObjectHash {
if multipleFolders {
if lastObjectHash != "" {
fmt.Fprintln(tw)
}
fmt.Fprintf(tw, "%s:\n", object.Hash)
}
if headers {
fmt.Fprintln(tw, "Hash\tSize\tName")
}
lastObjectHash = object.Hash
}
for _, link := range object.Links {
if link.Type == unixfs.TDirectory {
link.Name += "/"
}
fmt.Fprintf(tw, "%s\t%v\t%s\n", link.Hash, link.Size, link.Name)
}
}
tw.Flush()
return lastObjectHash
}

View File

@ -3,7 +3,6 @@ package commands
import (
"errors"
lgc "github.com/ipfs/go-ipfs/commands/legacy"
dag "github.com/ipfs/go-ipfs/core/commands/dag"
name "github.com/ipfs/go-ipfs/core/commands/name"
ocmd "github.com/ipfs/go-ipfs/core/commands/object"
@ -127,7 +126,7 @@ var rootSubcommands = map[string]*cmds.Command{
"id": IDCmd,
"key": KeyCmd,
"log": LogCmd,
"ls": lgc.NewCommand(LsCmd),
"ls": LsCmd,
"mount": MountCmd,
"name": name.NameCmd,
"object": ocmd.ObjectCmd,
@ -165,7 +164,7 @@ var rootROSubcommands = map[string]*cmds.Command{
},
"get": GetCmd,
"dns": DNSCmd,
"ls": lgc.NewCommand(LsCmd),
"ls": LsCmd,
"name": {
Subcommands: map[string]*cmds.Command{
"resolve": name.IpnsCmd,

View File

@ -57,7 +57,6 @@ QmaRGe7bVmVaLmxbrMiVNXqW4pRNNp3xq7hFtyRKA3mtJL 14 a
QmSix55yz8CzWXf5ZVM9vgEvijnEeeXiTSarVtsqiiCJss:
QmQNd6ubRXaNG6Prov8o6vk3bn6eWsj9FxLGrAVDUAGkGe 139 128
QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN 14 a
EOF
test_cmp expected_ls actual_ls
'
@ -84,12 +83,90 @@ QmSix55yz8CzWXf5ZVM9vgEvijnEeeXiTSarVtsqiiCJss:
Hash Size Name
QmQNd6ubRXaNG6Prov8o6vk3bn6eWsj9FxLGrAVDUAGkGe 139 128
QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN 14 a
EOF
test_cmp expected_ls_headers actual_ls_headers
'
}
test_ls_cmd_streaming() {
test_expect_success "'ipfs add -r testData' succeeds" '
mkdir -p testData testData/d1 testData/d2 &&
echo "test" >testData/f1 &&
echo "data" >testData/f2 &&
echo "hello" >testData/d1/a &&
random 128 42 >testData/d1/128 &&
echo "world" >testData/d2/a &&
random 1024 42 >testData/d2/1024 &&
ipfs add -r testData >actual_add
'
test_expect_success "'ipfs add' output looks good" '
cat <<-\EOF >expected_add &&
added QmQNd6ubRXaNG6Prov8o6vk3bn6eWsj9FxLGrAVDUAGkGe testData/d1/128
added QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN testData/d1/a
added QmbQBUSRL9raZtNXfpTDeaxQapibJEG6qEY8WqAN22aUzd testData/d2/1024
added QmaRGe7bVmVaLmxbrMiVNXqW4pRNNp3xq7hFtyRKA3mtJL testData/d2/a
added QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVH testData/f1
added QmNtocSs7MoDkJMc1RkyisCSKvLadujPsfJfSdJ3e1eA1M testData/f2
added QmSix55yz8CzWXf5ZVM9vgEvijnEeeXiTSarVtsqiiCJss testData/d1
added QmR3jhV4XpxxPjPT3Y8vNnWvWNvakdcT3H6vqpRBsX1MLy testData/d2
added QmfNy183bXiRVyrhyWtq3TwHn79yHEkiAGFr18P7YNzESj testData
EOF
test_cmp expected_add actual_add
'
test_expect_success "'ipfs ls --stream <three dir hashes>' succeeds" '
ipfs ls --stream QmfNy183bXiRVyrhyWtq3TwHn79yHEkiAGFr18P7YNzESj QmR3jhV4XpxxPjPT3Y8vNnWvWNvakdcT3H6vqpRBsX1MLy QmSix55yz8CzWXf5ZVM9vgEvijnEeeXiTSarVtsqiiCJss >actual_ls_stream
'
test_expect_success "'ipfs ls --stream <three dir hashes>' output looks good" '
cat <<-\EOF >expected_ls_stream &&
QmfNy183bXiRVyrhyWtq3TwHn79yHEkiAGFr18P7YNzESj:
QmSix55yz8CzWXf5ZVM9vgEvijnEeeXiTSarVtsqiiCJss 246 d1/
QmR3jhV4XpxxPjPT3Y8vNnWvWNvakdcT3H6vqpRBsX1MLy 1143 d2/
QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVH 13 f1
QmNtocSs7MoDkJMc1RkyisCSKvLadujPsfJfSdJ3e1eA1M 13 f2
QmR3jhV4XpxxPjPT3Y8vNnWvWNvakdcT3H6vqpRBsX1MLy:
QmbQBUSRL9raZtNXfpTDeaxQapibJEG6qEY8WqAN22aUzd 1035 1024
QmaRGe7bVmVaLmxbrMiVNXqW4pRNNp3xq7hFtyRKA3mtJL 14 a
QmSix55yz8CzWXf5ZVM9vgEvijnEeeXiTSarVtsqiiCJss:
QmQNd6ubRXaNG6Prov8o6vk3bn6eWsj9FxLGrAVDUAGkGe 139 128
QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN 14 a
EOF
test_cmp expected_ls_stream actual_ls_stream
'
test_expect_success "'ipfs ls --stream --headers <three dir hashes>' succeeds" '
ipfs ls --stream --headers QmfNy183bXiRVyrhyWtq3TwHn79yHEkiAGFr18P7YNzESj QmR3jhV4XpxxPjPT3Y8vNnWvWNvakdcT3H6vqpRBsX1MLy QmSix55yz8CzWXf5ZVM9vgEvijnEeeXiTSarVtsqiiCJss >actual_ls_stream_headers
'
test_expect_success "'ipfs ls --stream --headers <three dir hashes>' output looks good" '
cat <<-\EOF >expected_ls_stream_headers &&
QmfNy183bXiRVyrhyWtq3TwHn79yHEkiAGFr18P7YNzESj:
Hash Size Name
QmSix55yz8CzWXf5ZVM9vgEvijnEeeXiTSarVtsqiiCJss 246 d1/
QmR3jhV4XpxxPjPT3Y8vNnWvWNvakdcT3H6vqpRBsX1MLy 1143 d2/
QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVH 13 f1
QmNtocSs7MoDkJMc1RkyisCSKvLadujPsfJfSdJ3e1eA1M 13 f2
QmR3jhV4XpxxPjPT3Y8vNnWvWNvakdcT3H6vqpRBsX1MLy:
Hash Size Name
QmbQBUSRL9raZtNXfpTDeaxQapibJEG6qEY8WqAN22aUzd 1035 1024
QmaRGe7bVmVaLmxbrMiVNXqW4pRNNp3xq7hFtyRKA3mtJL 14 a
QmSix55yz8CzWXf5ZVM9vgEvijnEeeXiTSarVtsqiiCJss:
Hash Size Name
QmQNd6ubRXaNG6Prov8o6vk3bn6eWsj9FxLGrAVDUAGkGe 139 128
QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN 14 a
EOF
test_cmp expected_ls_stream_headers actual_ls_stream_headers
'
}
test_ls_cmd_raw_leaves() {
test_expect_success "'ipfs add -r --raw-leaves' then 'ipfs ls' works as expected" '
mkdir -p somedir &&
@ -114,12 +191,14 @@ test_ls_object() {
# should work offline
test_ls_cmd
test_ls_cmd_streaming
test_ls_cmd_raw_leaves
test_ls_object
# should work online
test_launch_ipfs_daemon
test_ls_cmd
test_ls_cmd_streaming
test_ls_cmd_raw_leaves
test_kill_ipfs_daemon
test_ls_object