mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-24 11:57:44 +08:00
Unixfs.Add progress events
This commit was moved from ipfs/go-ipfs-http-client@2f3a77b686
This commit is contained in:
parent
31a4c3754b
commit
c213e26542
@ -99,8 +99,8 @@ func NewApiWithClient(a ma.Multiaddr, c *gohttp.Client) *HttpApi {
|
||||
}
|
||||
|
||||
return &HttpApi{
|
||||
url: url,
|
||||
httpcli: c,
|
||||
url: url,
|
||||
httpcli: c,
|
||||
applyGlobal: func(*RequestBuilder) {},
|
||||
}
|
||||
}
|
||||
|
||||
@ -53,7 +53,7 @@ func (NodeProvider) MakeAPISwarm(ctx context.Context, fullIdentity bool, n int)
|
||||
}
|
||||
|
||||
if n > 1 {
|
||||
connectArgs := []string{"iptb", "--IPTB_ROOT", dir, "connect", fmt.Sprintf("[1-%d]", n - 1), "0"}
|
||||
connectArgs := []string{"iptb", "--IPTB_ROOT", dir, "connect", fmt.Sprintf("[1-%d]", n-1), "0"}
|
||||
if err := c.Run(connectArgs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -95,8 +95,8 @@ func (NodeProvider) MakeAPISwarm(ctx context.Context, fullIdentity bool, n int)
|
||||
}
|
||||
c := &gohttp.Client{
|
||||
Transport: &gohttp.Transport{
|
||||
Proxy: gohttp.ProxyFromEnvironment,
|
||||
DisableKeepAlives: true,
|
||||
Proxy: gohttp.ProxyFromEnvironment,
|
||||
DisableKeepAlives: true,
|
||||
DisableCompression: true,
|
||||
},
|
||||
}
|
||||
|
||||
@ -22,12 +22,12 @@ func (api *UnixfsAPI) Get(ctx context.Context, p iface.Path) (files.Node, error)
|
||||
}
|
||||
}
|
||||
|
||||
var stat struct{
|
||||
var stat struct {
|
||||
Hash string
|
||||
Type string
|
||||
Size int64 // unixfs size
|
||||
}
|
||||
err := api.core().request("files/stat", p.String()). Exec(ctx, &stat)
|
||||
err := api.core().request("files/stat", p.String()).Exec(ctx, &stat)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -43,12 +43,12 @@ func (api *UnixfsAPI) Get(ctx context.Context, p iface.Path) (files.Node, error)
|
||||
}
|
||||
|
||||
type apiFile struct {
|
||||
ctx context.Context
|
||||
ctx context.Context
|
||||
core *HttpApi
|
||||
size int64
|
||||
path iface.Path
|
||||
|
||||
r io.ReadCloser
|
||||
r io.ReadCloser
|
||||
at int64
|
||||
}
|
||||
|
||||
@ -96,7 +96,7 @@ func (f *apiFile) Size() (int64, error) {
|
||||
|
||||
func (api *UnixfsAPI) getFile(ctx context.Context, p iface.Path, size int64) (files.Node, error) {
|
||||
f := &apiFile{
|
||||
ctx: ctx,
|
||||
ctx: ctx,
|
||||
core: api.core(),
|
||||
size: size,
|
||||
path: p,
|
||||
@ -106,14 +106,14 @@ func (api *UnixfsAPI) getFile(ctx context.Context, p iface.Path, size int64) (fi
|
||||
}
|
||||
|
||||
type apiIter struct {
|
||||
ctx context.Context
|
||||
ctx context.Context
|
||||
core *UnixfsAPI
|
||||
|
||||
err error
|
||||
|
||||
dec *json.Decoder
|
||||
dec *json.Decoder
|
||||
curFile files.Node
|
||||
cur lsLink
|
||||
cur lsLink
|
||||
}
|
||||
|
||||
func (it *apiIter) Err() error {
|
||||
@ -179,7 +179,7 @@ func (it *apiIter) Node() files.Node {
|
||||
}
|
||||
|
||||
type apiDir struct {
|
||||
ctx context.Context
|
||||
ctx context.Context
|
||||
core *UnixfsAPI
|
||||
size int64
|
||||
path iface.Path
|
||||
@ -197,9 +197,9 @@ func (d *apiDir) Size() (int64, error) {
|
||||
|
||||
func (d *apiDir) Entries() files.DirIterator {
|
||||
return &apiIter{
|
||||
ctx: d.ctx,
|
||||
ctx: d.ctx,
|
||||
core: d.core,
|
||||
dec: d.dec,
|
||||
dec: d.dec,
|
||||
}
|
||||
}
|
||||
|
||||
@ -216,7 +216,7 @@ func (api *UnixfsAPI) getDir(ctx context.Context, p iface.Path, size int64) (fil
|
||||
}
|
||||
|
||||
d := &apiDir{
|
||||
ctx: ctx,
|
||||
ctx: ctx,
|
||||
core: api,
|
||||
size: size,
|
||||
path: p,
|
||||
|
||||
@ -16,7 +16,7 @@ import (
|
||||
type BlockAPI HttpApi
|
||||
|
||||
type blockStat struct {
|
||||
Key string
|
||||
Key string
|
||||
BSize int `json:"Size"`
|
||||
}
|
||||
|
||||
|
||||
@ -31,7 +31,7 @@ func (api *DagAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.DagPut
|
||||
return nil, fmt.Errorf("setting hash len is not supported yet")
|
||||
}
|
||||
|
||||
var out struct{
|
||||
var out struct {
|
||||
Cid cid.Cid
|
||||
}
|
||||
req := api.core().request("dag/put").
|
||||
|
||||
@ -22,9 +22,9 @@ type ipldNode struct {
|
||||
|
||||
func (a *HttpApi) nodeFromPath(ctx context.Context, p iface.ResolvedPath) ipld.Node {
|
||||
return &ipldNode{
|
||||
ctx: ctx,
|
||||
ctx: ctx,
|
||||
path: p,
|
||||
api: a,
|
||||
api: a,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -13,7 +13,7 @@ type KeyAPI HttpApi
|
||||
|
||||
type keyOutput struct {
|
||||
JName string `json:"Name"`
|
||||
Id string
|
||||
Id string
|
||||
}
|
||||
|
||||
func (k *keyOutput) Name() string {
|
||||
@ -35,7 +35,6 @@ func (k *keyOutput) valid() error {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
func (api *KeyAPI) Generate(ctx context.Context, name string, opts ...caopts.KeyGenerateOption) (iface.Key, error) {
|
||||
options, err := caopts.KeyGenerateOptions(opts...)
|
||||
if err != nil {
|
||||
@ -65,7 +64,7 @@ func (api *KeyAPI) List(ctx context.Context) ([]iface.Key, error) {
|
||||
}
|
||||
|
||||
func (api *KeyAPI) Self(ctx context.Context) (iface.Key, error) {
|
||||
var id struct{ID string}
|
||||
var id struct{ ID string }
|
||||
if err := api.core().request("id").Exec(ctx, &id); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -14,7 +14,7 @@ import (
|
||||
type ObjectAPI HttpApi
|
||||
|
||||
type objectOut struct {
|
||||
Hash string
|
||||
Hash string
|
||||
}
|
||||
|
||||
func (api *ObjectAPI) New(ctx context.Context, opts ...caopts.ObjectNewOption) (format.Node, error) {
|
||||
|
||||
@ -20,7 +20,7 @@ type pinRefKeyList struct {
|
||||
|
||||
type pin struct {
|
||||
path iface.ResolvedPath
|
||||
typ string
|
||||
typ string
|
||||
}
|
||||
|
||||
func (p *pin) Path() iface.ResolvedPath {
|
||||
@ -31,7 +31,6 @@ func (p *pin) Type() string {
|
||||
return p.typ
|
||||
}
|
||||
|
||||
|
||||
func (api *PinAPI) Add(context.Context, iface.Path, ...caopts.PinAddOption) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
@ -2,9 +2,11 @@ package httpapi
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/pkg/errors"
|
||||
"io"
|
||||
|
||||
"github.com/ipfs/go-ipfs/core/coreapi/interface"
|
||||
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
|
||||
@ -39,7 +41,6 @@ func (api *UnixfsAPI) Add(ctx context.Context, f files.Node, opts ...caopts.Unix
|
||||
Option("hash", mht).
|
||||
Option("chunker", options.Chunker).
|
||||
Option("cid-version", options.CidVersion).
|
||||
//Option("", options.Events).
|
||||
Option("fscache", options.FsCache).
|
||||
Option("hidden", options.Hidden).
|
||||
Option("inline", options.Inline).
|
||||
@ -47,11 +48,10 @@ func (api *UnixfsAPI) Add(ctx context.Context, f files.Node, opts ...caopts.Unix
|
||||
Option("nocopy", options.NoCopy).
|
||||
Option("only-hash", options.OnlyHash).
|
||||
Option("pin", options.Pin).
|
||||
//Option("", options.Progress).
|
||||
Option("silent", options.Silent).
|
||||
Option("stdin-name", options.StdinName).
|
||||
Option("wrap-with-directory", options.Wrap).
|
||||
Option("quieter", true) // TODO: rm after event impl
|
||||
Option("progress", options.Progress)
|
||||
|
||||
if options.RawLeavesSet {
|
||||
req.Option("raw-leaves", options.RawLeaves)
|
||||
@ -73,9 +73,50 @@ func (api *UnixfsAPI) Add(ctx context.Context, f files.Node, opts ...caopts.Unix
|
||||
}
|
||||
|
||||
var out addEvent
|
||||
if err := req.Exec(ctx, &out); err != nil { //TODO: ndjson events
|
||||
resp, err := req.Send(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if resp.Error != nil {
|
||||
return nil, resp.Error
|
||||
}
|
||||
defer resp.Output.Close()
|
||||
dec := json.NewDecoder(resp.Output)
|
||||
loop:
|
||||
for {
|
||||
var evt addEvent
|
||||
switch err := dec.Decode(&evt); err {
|
||||
case nil:
|
||||
case io.EOF:
|
||||
break loop
|
||||
default:
|
||||
return nil, err
|
||||
}
|
||||
out = evt
|
||||
|
||||
if options.Events != nil {
|
||||
ifevt := &iface.AddEvent{
|
||||
Name: out.Name,
|
||||
Size: out.Size,
|
||||
Bytes: out.Bytes,
|
||||
}
|
||||
|
||||
if out.Hash != "" {
|
||||
c, err := cid.Parse(out.Hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ifevt.Path = iface.IpfsPath(c)
|
||||
}
|
||||
|
||||
select {
|
||||
case options.Events <- ifevt:
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
c, err := cid.Parse(out.Hash)
|
||||
if err != nil {
|
||||
@ -120,7 +161,7 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p iface.Path) ([]*format.Link, err
|
||||
links[i] = &format.Link{
|
||||
Name: l.Name,
|
||||
Size: l.Size,
|
||||
Cid: c,
|
||||
Cid: c,
|
||||
}
|
||||
}
|
||||
return links, nil
|
||||
|
||||
Loading…
Reference in New Issue
Block a user