mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-10 02:40:11 +08:00
Merge pull request #4254 from ipfs/feat/dag-batch
dag: Support multiple files in put
This commit is contained in:
commit
e49ce6c98f
@ -8,11 +8,13 @@ import (
|
||||
"strings"
|
||||
|
||||
cmds "github.com/ipfs/go-ipfs/commands"
|
||||
files "github.com/ipfs/go-ipfs/commands/files"
|
||||
coredag "github.com/ipfs/go-ipfs/core/coredag"
|
||||
path "github.com/ipfs/go-ipfs/path"
|
||||
pin "github.com/ipfs/go-ipfs/pin"
|
||||
|
||||
cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
|
||||
u "gx/ipfs/QmSU6eubNdhXjFBJBSksTp8kv8YRub8mGAPv8tVJHmL2EU/go-ipfs-util"
|
||||
mh "gx/ipfs/QmU9a9NV9RdPNwZQDYd5uKsm6N6LJLSvLbywDDYFbaaC6P/go-multihash"
|
||||
)
|
||||
|
||||
@ -53,7 +55,7 @@ into an object of the specified format.
|
||||
`,
|
||||
},
|
||||
Arguments: []cmds.Argument{
|
||||
cmds.FileArg("object data", true, false, "The object to put").EnableStdin(),
|
||||
cmds.FileArg("object data", true, true, "The object to put").EnableStdin(),
|
||||
},
|
||||
Options: []cmds.Option{
|
||||
cmds.StringOption("format", "f", "Format that the object will be added as.").Default("cbor"),
|
||||
@ -68,12 +70,6 @@ into an object of the specified format.
|
||||
return
|
||||
}
|
||||
|
||||
fi, err := req.Files().NextFile()
|
||||
if err != nil {
|
||||
res.SetError(err, cmds.ErrNormal)
|
||||
return
|
||||
}
|
||||
|
||||
ienc, _, _ := req.Option("input-enc").String()
|
||||
format, _, _ := req.Option("format").String()
|
||||
hash, _, err := req.Option("hash").String()
|
||||
@ -96,56 +92,93 @@ into an object of the specified format.
|
||||
}
|
||||
}
|
||||
|
||||
if dopin {
|
||||
defer n.Blockstore.PinLock().Unlock()
|
||||
outChan := make(chan interface{}, 8)
|
||||
res.SetOutput((<-chan interface{})(outChan))
|
||||
|
||||
addAllAndPin := func(f files.File) error {
|
||||
cids := cid.NewSet()
|
||||
b := n.DAG.Batch()
|
||||
|
||||
for {
|
||||
file, err := f.NextFile()
|
||||
if err == io.EOF {
|
||||
// Finished the list of files.
|
||||
break
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
nds, err := coredag.ParseInputs(ienc, format, file, mhType, -1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(nds) == 0 {
|
||||
return fmt.Errorf("no node returned from ParseInputs")
|
||||
}
|
||||
|
||||
for _, nd := range nds {
|
||||
_, err := b.Add(nd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
cid := nds[0].Cid()
|
||||
cids.Add(cid)
|
||||
outChan <- &OutputObject{Cid: cid}
|
||||
}
|
||||
|
||||
if err := b.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if dopin {
|
||||
defer n.Blockstore.PinLock().Unlock()
|
||||
|
||||
cids.ForEach(func(c *cid.Cid) error {
|
||||
n.Pinning.PinWithMode(c, pin.Recursive)
|
||||
return nil
|
||||
})
|
||||
|
||||
err := n.Pinning.Flush()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
nds, err := coredag.ParseInputs(ienc, format, fi, mhType, -1)
|
||||
if err != nil {
|
||||
res.SetError(err, cmds.ErrNormal)
|
||||
return
|
||||
}
|
||||
if len(nds) == 0 {
|
||||
res.SetError(fmt.Errorf("no node returned from ParseInputs"), cmds.ErrNormal)
|
||||
return
|
||||
}
|
||||
|
||||
b := n.DAG.Batch()
|
||||
for _, nd := range nds {
|
||||
_, err := b.Add(nd)
|
||||
if err != nil {
|
||||
go func() {
|
||||
defer close(outChan)
|
||||
if err := addAllAndPin(req.Files()); err != nil {
|
||||
res.SetError(err, cmds.ErrNormal)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if err := b.Commit(); err != nil {
|
||||
res.SetError(err, cmds.ErrNormal)
|
||||
return
|
||||
}
|
||||
|
||||
root := nds[0].Cid()
|
||||
if dopin {
|
||||
n.Pinning.PinWithMode(root, pin.Recursive)
|
||||
|
||||
err := n.Pinning.Flush()
|
||||
if err != nil {
|
||||
res.SetError(err, cmds.ErrNormal)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
res.SetOutput(&OutputObject{Cid: root})
|
||||
}()
|
||||
},
|
||||
Type: OutputObject{},
|
||||
Marshalers: cmds.MarshalerMap{
|
||||
cmds.Text: func(res cmds.Response) (io.Reader, error) {
|
||||
oobj, ok := res.Output().(*OutputObject)
|
||||
outChan, ok := res.Output().(<-chan interface{})
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expected a different object in marshaler")
|
||||
return nil, u.ErrCast()
|
||||
}
|
||||
|
||||
return strings.NewReader(oobj.Cid.String()), nil
|
||||
marshal := func(v interface{}) (io.Reader, error) {
|
||||
obj, ok := v.(*OutputObject)
|
||||
if !ok {
|
||||
return nil, u.ErrCast()
|
||||
}
|
||||
|
||||
return strings.NewReader(obj.Cid.String() + "\n"), nil
|
||||
}
|
||||
|
||||
return &cmds.ChannelMarshaler{
|
||||
Channel: outChan,
|
||||
Marshaler: marshal,
|
||||
Res: res,
|
||||
}, nil
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@ -153,7 +153,7 @@ test_dag_cmd() {
|
||||
'
|
||||
|
||||
test_expect_success "dag put with dag-pb works output looks good" '
|
||||
printf $HASH > dag_put_exp &&
|
||||
echo $HASH > dag_put_exp &&
|
||||
test_cmp dag_put_exp dag_put_out
|
||||
'
|
||||
|
||||
@ -163,7 +163,20 @@ test_dag_cmd() {
|
||||
'
|
||||
|
||||
test_expect_success "dag put with dag-pb works output looks good" '
|
||||
printf $HASH > dag_put_exp &&
|
||||
echo $HASH > dag_put_exp &&
|
||||
test_cmp dag_put_exp dag_put_out
|
||||
'
|
||||
|
||||
test_expect_success "dag put multiple files" '
|
||||
printf {\"foo\":\"bar\"} > a.json &&
|
||||
printf {\"foo\":\"baz\"} > b.json &&
|
||||
ipfs dag put a.json b.json > dag_put_out
|
||||
'
|
||||
|
||||
test_expect_success "dag put multiple files output looks good" '
|
||||
echo zdpuAoKMEvka7gKGSjF9B3of1F5gE5MyMMywxTC13wCmouQrf > dag_put_exp &&
|
||||
echo zdpuAogmDEvpvGjMFsNTGDEU1JMYe6v69oxR8nG81EurmGHMj >> dag_put_exp &&
|
||||
|
||||
test_cmp dag_put_exp dag_put_out
|
||||
'
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user