gc: add option to stream errors

License: MIT
Signed-off-by: Kevin Atkinson <k@kevina.org>
This commit is contained in:
Kevin Atkinson 2017-03-02 14:53:58 -05:00
parent 401d156596
commit f338520aa8
3 changed files with 55 additions and 24 deletions

View File

@ -40,6 +40,11 @@ var RepoCmd = &cmds.Command{
},
}
type GcResult struct {
Key *cid.Cid
Error string `json:",omitempty"`
}
var repoGcCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Perform a garbage collection sweep on the repo.",
@ -51,6 +56,7 @@ order to reclaim hard disk space.
},
Options: []cmds.Option{
cmds.BoolOption("quiet", "q", "Write minimal output.").Default(false),
cmds.BoolOption("stream-errors", "Stream errors.").Default(false),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
@ -59,22 +65,41 @@ order to reclaim hard disk space.
return
}
streamErrors, _, _ := res.Request().Option("stream-errors").Bool()
gcOutChan := corerepo.GarbageCollectAsync(n, req.Context())
outChan := make(chan interface{}, len(gcOutChan))
outChan := make(chan interface{}, cap(gcOutChan))
res.SetOutput((<-chan interface{})(outChan))
go func() {
defer close(outChan)
err := corerepo.CollectResult(req.Context(), gcOutChan, func(k *cid.Cid) {
outChan <- &corerepo.KeyRemoved{k}
})
if err != nil {
res.SetError(err, cmds.ErrNormal)
unreportedError := false
var lastErr error
if streamErrors {
for res := range gcOutChan {
if unreportedError {
outChan <- &GcResult{Error: lastErr.Error()}
unreportedError = false
}
if res.Error != nil {
lastErr = res.Error
unreportedError = true
} else {
outChan <- &GcResult{Key: res.KeyRemoved}
}
}
} else {
lastErr = corerepo.CollectResult(req.Context(), gcOutChan, func(k *cid.Cid) {
outChan <- &GcResult{Key: k}
})
}
if lastErr != nil {
res.SetError(lastErr, cmds.ErrNormal)
}
}()
},
Type: corerepo.KeyRemoved{},
Type: GcResult{},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
outChan, ok := res.Output().(<-chan interface{})
@ -87,26 +112,29 @@ order to reclaim hard disk space.
return nil, err
}
marshal := func(v interface{}) (io.Reader, error) {
obj, ok := v.(*corerepo.KeyRemoved)
for v := range outChan {
obj, ok := v.(*GcResult)
if !ok {
return nil, u.ErrCast()
}
buf := new(bytes.Buffer)
if quiet {
buf = bytes.NewBufferString(obj.Key.String() + "\n")
} else {
buf = bytes.NewBufferString(fmt.Sprintf("removed %s\n", obj.Key))
if obj.Error != "" {
fmt.Fprintf(res.Stderr(), "Error: %s\n", obj.Error)
continue
}
if quiet {
fmt.Fprintf(res.Stdout(), "%s\n", obj.Key.String())
} else {
fmt.Fprintf(res.Stdout(), "removed %s\n", obj.Key.String())
}
return buf, nil
}
return &cmds.ChannelMarshaler{
Channel: outChan,
Marshaler: marshal,
Res: res,
}, nil
if res.Error() != nil {
return nil, res.Error()
}
return nil, nil
},
},
}

View File

@ -20,10 +20,6 @@ var log = logging.Logger("corerepo")
var ErrMaxStorageExceeded = errors.New("Maximum storage limit exceeded. Maybe unpin some files?")
type KeyRemoved struct {
Key *cid.Cid
}
type GC struct {
Node *core.IpfsNode
Repo repo.Repo

View File

@ -134,6 +134,13 @@ test_gc_robust_part2() {
grep -q "aborted" repo_gc_out
'
test_expect_success "'ipfs repo gc --stream-errors' should abort and report each error separately" '
test_must_fail ipfs repo gc --stream-errors 2>&1 | tee repo_gc_out &&
grep -q "Error: could not retrieve links for $LEAF1" repo_gc_out &&
grep -q "Error: could not retrieve links for $LEAF2" repo_gc_out &&
grep -q "Error: garbage collection aborted" repo_gc_out
'
test_expect_success "unpin 1MB file" '
ipfs pin rm $HASH2
'