mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-23 19:37:46 +08:00
Merge pull request #5673 from overbool/refactor/commands/filestore
commands/filestore: use new cmds lib
This commit is contained in:
commit
a8dd21a59b
@ -1,16 +1,14 @@
|
||||
package commands
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
oldCmds "github.com/ipfs/go-ipfs/commands"
|
||||
lgc "github.com/ipfs/go-ipfs/commands/legacy"
|
||||
"github.com/ipfs/go-ipfs/core"
|
||||
core "github.com/ipfs/go-ipfs/core"
|
||||
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
|
||||
e "github.com/ipfs/go-ipfs/core/commands/e"
|
||||
"github.com/ipfs/go-ipfs/filestore"
|
||||
filestore "github.com/ipfs/go-ipfs/filestore"
|
||||
|
||||
cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
|
||||
cmds "gx/ipfs/Qma6uuSyjkecGhMFFLfzyJDPyoDtNJSHJNweDccZhaWkgU/go-ipfs-cmds"
|
||||
@ -23,8 +21,8 @@ var FileStoreCmd = &cmds.Command{
|
||||
},
|
||||
Subcommands: map[string]*cmds.Command{
|
||||
"ls": lsFileStore,
|
||||
"verify": lgc.NewCommand(verifyFileStore),
|
||||
"dups": lgc.NewCommand(dupsFileStore),
|
||||
"verify": verifyFileStore,
|
||||
"dups": dupsFileStore,
|
||||
},
|
||||
}
|
||||
|
||||
@ -59,11 +57,7 @@ The output is:
|
||||
}
|
||||
args := req.Arguments
|
||||
if len(args) > 0 {
|
||||
out := perKeyActionToChan(req.Context, args, func(c cid.Cid) *filestore.ListRes {
|
||||
return filestore.List(fs, c)
|
||||
})
|
||||
|
||||
return res.Emit(out)
|
||||
return listByArgs(res, fs, args)
|
||||
}
|
||||
|
||||
fileOrder, _ := req.Options[fileOrderOptionName].(bool)
|
||||
@ -72,8 +66,17 @@ The output is:
|
||||
return err
|
||||
}
|
||||
|
||||
out := listResToChan(req.Context, next)
|
||||
return res.Emit(out)
|
||||
for {
|
||||
r := next()
|
||||
if r == nil {
|
||||
break
|
||||
}
|
||||
if err := res.Emit(r); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
PostRun: cmds.PostRunMap{
|
||||
cmds.CLI: streamResult(func(v interface{}, out io.Writer) nonFatalError {
|
||||
@ -88,7 +91,7 @@ The output is:
|
||||
Type: filestore.ListRes{},
|
||||
}
|
||||
|
||||
var verifyFileStore = &oldCmds.Command{
|
||||
var verifyFileStore = &cmds.Command{
|
||||
Helptext: cmdkit.HelpText{
|
||||
Tagline: "Verify objects in filestore.",
|
||||
LongDescription: `
|
||||
@ -118,96 +121,103 @@ For ERROR entries the error will also be printed to stderr.
|
||||
Options: []cmdkit.Option{
|
||||
cmdkit.BoolOption(fileOrderOptionName, "verify the objects based on the order of the backing file"),
|
||||
},
|
||||
Run: func(req oldCmds.Request, res oldCmds.Response) {
|
||||
_, fs, err := getFilestore(req.InvocContext())
|
||||
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
|
||||
_, fs, err := getFilestore(env)
|
||||
if err != nil {
|
||||
res.SetError(err, cmdkit.ErrNormal)
|
||||
return
|
||||
return err
|
||||
}
|
||||
args := req.Arguments()
|
||||
args := req.Arguments
|
||||
if len(args) > 0 {
|
||||
out := perKeyActionToChan(req.Context(), args, func(c cid.Cid) *filestore.ListRes {
|
||||
return filestore.Verify(fs, c)
|
||||
})
|
||||
res.SetOutput(out)
|
||||
} else {
|
||||
fileOrder, _, _ := req.Option(fileOrderOptionName).Bool()
|
||||
next, err := filestore.VerifyAll(fs, fileOrder)
|
||||
if err != nil {
|
||||
res.SetError(err, cmdkit.ErrNormal)
|
||||
return
|
||||
}
|
||||
out := listResToChan(req.Context(), next)
|
||||
res.SetOutput(out)
|
||||
return listByArgs(res, fs, args)
|
||||
}
|
||||
|
||||
fileOrder, _ := req.Options[fileOrderOptionName].(bool)
|
||||
next, err := filestore.VerifyAll(fs, fileOrder)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
r := next()
|
||||
if r == nil {
|
||||
break
|
||||
}
|
||||
if err := res.Emit(r); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
Marshalers: oldCmds.MarshalerMap{
|
||||
oldCmds.Text: func(res oldCmds.Response) (io.Reader, error) {
|
||||
v, err := unwrapOutput(res.Output())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
PostRun: cmds.PostRunMap{
|
||||
cmds.CLI: func(res cmds.Response, re cmds.ResponseEmitter) error {
|
||||
for {
|
||||
v, err := res.Next()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
r, ok := v.(*filestore.ListRes)
|
||||
if !ok {
|
||||
return nil, e.TypeErr(r, v)
|
||||
}
|
||||
list, ok := v.(*filestore.ListRes)
|
||||
if !ok {
|
||||
return e.TypeErr(list, v)
|
||||
}
|
||||
|
||||
if r.Status == filestore.StatusOtherError {
|
||||
fmt.Fprintf(res.Stderr(), "%s\n", r.ErrorMsg)
|
||||
if list.Status == filestore.StatusOtherError {
|
||||
fmt.Fprintf(os.Stderr, "%s\n", list.ErrorMsg)
|
||||
}
|
||||
fmt.Fprintf(os.Stdout, "%s %s\n", list.Status.Format(), list.FormatLong())
|
||||
}
|
||||
fmt.Fprintf(res.Stdout(), "%s %s\n", r.Status.Format(), r.FormatLong())
|
||||
return nil, nil
|
||||
},
|
||||
},
|
||||
Type: filestore.ListRes{},
|
||||
}
|
||||
|
||||
var dupsFileStore = &oldCmds.Command{
|
||||
var dupsFileStore = &cmds.Command{
|
||||
Helptext: cmdkit.HelpText{
|
||||
Tagline: "List blocks that are both in the filestore and standard block storage.",
|
||||
},
|
||||
Run: func(req oldCmds.Request, res oldCmds.Response) {
|
||||
_, fs, err := getFilestore(req.InvocContext())
|
||||
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
|
||||
_, fs, err := getFilestore(env)
|
||||
if err != nil {
|
||||
res.SetError(err, cmdkit.ErrNormal)
|
||||
return
|
||||
return err
|
||||
}
|
||||
ch, err := fs.FileManager().AllKeysChan(req.Context())
|
||||
ch, err := fs.FileManager().AllKeysChan(req.Context)
|
||||
if err != nil {
|
||||
res.SetError(err, cmdkit.ErrNormal)
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
out := make(chan interface{}, 128)
|
||||
res.SetOutput((<-chan interface{})(out))
|
||||
|
||||
go func() {
|
||||
defer close(out)
|
||||
for cid := range ch {
|
||||
have, err := fs.MainBlockstore().Has(cid)
|
||||
if err != nil {
|
||||
select {
|
||||
case out <- &RefWrapper{Err: err.Error()}:
|
||||
case <-req.Context().Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
if have {
|
||||
select {
|
||||
case out <- &RefWrapper{Ref: cid.String()}:
|
||||
case <-req.Context().Done():
|
||||
return
|
||||
}
|
||||
for cid := range ch {
|
||||
have, err := fs.MainBlockstore().Has(cid)
|
||||
if err != nil {
|
||||
return res.Emit(&RefWrapper{Err: err.Error()})
|
||||
}
|
||||
if have {
|
||||
if err := res.Emit(&RefWrapper{Ref: cid.String()}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
Marshalers: refsMarshallerMap,
|
||||
Type: RefWrapper{},
|
||||
Encoders: cmds.EncoderMap{
|
||||
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *RefWrapper) error {
|
||||
if out.Err != "" {
|
||||
return fmt.Errorf(out.Err)
|
||||
}
|
||||
|
||||
fmt.Fprintln(w, out.Ref)
|
||||
|
||||
return nil
|
||||
}),
|
||||
},
|
||||
Type: RefWrapper{},
|
||||
}
|
||||
|
||||
func getFilestore(env interface{}) (*core.IpfsNode, *filestore.Filestore, error) {
|
||||
func getFilestore(env cmds.Environment) (*core.IpfsNode, *filestore.Filestore, error) {
|
||||
n, err := cmdenv.GetNode(env)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
@ -219,49 +229,24 @@ func getFilestore(env interface{}) (*core.IpfsNode, *filestore.Filestore, error)
|
||||
return n, fs, err
|
||||
}
|
||||
|
||||
func listResToChan(ctx context.Context, next func() *filestore.ListRes) <-chan interface{} {
|
||||
out := make(chan interface{}, 128)
|
||||
go func() {
|
||||
defer close(out)
|
||||
for {
|
||||
r := next()
|
||||
if r == nil {
|
||||
return
|
||||
func listByArgs(res cmds.ResponseEmitter, fs *filestore.Filestore, args []string) error {
|
||||
for _, arg := range args {
|
||||
c, err := cid.Decode(arg)
|
||||
if err != nil {
|
||||
ret := &filestore.ListRes{
|
||||
Status: filestore.StatusOtherError,
|
||||
ErrorMsg: fmt.Sprintf("%s: %v", arg, err),
|
||||
}
|
||||
select {
|
||||
case out <- r:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
if err := res.Emit(ret); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
}()
|
||||
return out
|
||||
}
|
||||
|
||||
func perKeyActionToChan(ctx context.Context, args []string, action func(cid.Cid) *filestore.ListRes) <-chan interface{} {
|
||||
out := make(chan interface{}, 128)
|
||||
go func() {
|
||||
defer close(out)
|
||||
for _, arg := range args {
|
||||
c, err := cid.Decode(arg)
|
||||
if err != nil {
|
||||
select {
|
||||
case out <- &filestore.ListRes{
|
||||
Status: filestore.StatusOtherError,
|
||||
ErrorMsg: fmt.Sprintf("%s: %v", arg, err),
|
||||
}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
r := action(c)
|
||||
select {
|
||||
case out <- r:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
r := filestore.Verify(fs, c)
|
||||
if err := res.Emit(r); err != nil {
|
||||
return err
|
||||
}
|
||||
}()
|
||||
return out
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user