diff --git a/client/httpapi/api.go b/client/httpapi/api.go index 3a8e0e003..7d4fbd0e0 100644 --- a/client/httpapi/api.go +++ b/client/httpapi/api.go @@ -10,6 +10,8 @@ import ( "github.com/ipfs/go-ipfs/core/coreapi/interface" caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options" + + "github.com/ipfs/go-ipld-format" homedir "github.com/mitchellh/go-homedir" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr-net" @@ -137,8 +139,8 @@ func (api *HttpApi) Block() iface.BlockAPI { return (*BlockAPI)(api) } -func (api *HttpApi) Dag() iface.DagAPI { - return (*DagAPI)(api) +func (api *HttpApi) Dag() format.DAGService { + return (*HttpDagServ)(api) } func (api *HttpApi) Name() iface.NameAPI { diff --git a/client/httpapi/apifile.go b/client/httpapi/apifile.go index d9da23975..99ae72059 100644 --- a/client/httpapi/apifile.go +++ b/client/httpapi/apifile.go @@ -125,6 +125,11 @@ func (it *apiIter) Name() string { } func (it *apiIter) Next() bool { + if it.ctx.Err() != nil { + it.err = it.ctx.Err() + return false + } + var out lsOutput if err := it.dec.Decode(&out); err != nil { if err != io.EOF { diff --git a/client/httpapi/dag.go b/client/httpapi/dag.go index 20c233196..de2934c2f 100644 --- a/client/httpapi/dag.go +++ b/client/httpapi/dag.go @@ -1,71 +1,104 @@ package httpapi import ( + "bytes" "context" "fmt" - "io" - "math" + "io/ioutil" + "sync" "github.com/ipfs/go-ipfs/core/coreapi/interface" - caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options" + "github.com/ipfs/go-ipfs/core/coreapi/interface/options" + "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" "github.com/ipfs/go-ipld-format" - mh "github.com/multiformats/go-multihash" ) -type DagAPI HttpApi +type HttpDagServ HttpApi -func (api *DagAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.DagPutOption) (iface.ResolvedPath, error) { - options, err := caopts.DagPutOptions(opts...) +func (api *HttpDagServ) Get(ctx context.Context, c cid.Cid) (format.Node, error) { + r, err := api.core().Block().Get(ctx, iface.IpldPath(c)) if err != nil { return nil, err } - codec, ok := cid.CodecToStr[options.Codec] - if !ok { - return nil, fmt.Errorf("unknowm codec %d", options.MhType) + data, err := ioutil.ReadAll(r) + if err != nil { + return nil, err } - if options.MhLength != -1 { - return nil, fmt.Errorf("setting hash len is not supported yet") + blk, err := blocks.NewBlockWithCid(data, c) + if err != nil { + return nil, err } - var out struct { - Cid cid.Cid - } - req := api.core().request("dag/put"). - Option("format", codec). - Option("input-enc", options.InputEnc) + return format.DefaultBlockDecoder.Decode(blk) +} - if options.MhType != math.MaxUint64 { - mht, ok := mh.Codes[options.MhType] - if !ok { - return nil, fmt.Errorf("unknowm mhType %d", options.MhType) +func (api *HttpDagServ) GetMany(ctx context.Context, cids []cid.Cid) <-chan *format.NodeOption { + out := make(chan *format.NodeOption) + wg := sync.WaitGroup{} + wg.Add(len(cids)) + + for _, c := range cids { + // TODO: Consider limiting concurrency of this somehow + go func() { + defer wg.Done() + n, err := api.Get(ctx, c) + + select { + case out <- &format.NodeOption{Node: n, Err: err}: + case <-ctx.Done(): + } + }() + } + return out +} + +func (api *HttpDagServ) Add(ctx context.Context, nd format.Node) error { + c := nd.Cid() + prefix := c.Prefix() + format := cid.CodecToStr[prefix.Codec] + if prefix.Version == 0 { + format = "v0" + } + + stat, err := api.core().Block().Put(ctx, bytes.NewReader(nd.RawData()), + options.Block.Hash(prefix.MhType, prefix.MhLength), options.Block.Format(format)) + if err != nil { + return err + } + if !stat.Path().Cid().Equals(c) { + return fmt.Errorf("cids didn't match - local %s, remote %s", c.String(), stat.Path().Cid().String()) + } + return nil +} + +func (api *HttpDagServ) AddMany(ctx context.Context, nds []format.Node) error { + for _, nd := range nds { + // TODO: optimize + if err := api.Add(ctx, nd); err != nil { + return err } - req.Option("hash", mht) } + return nil +} - err = req.FileBody(src).Exec(ctx, &out) - if err != nil { - return nil, err +func (api *HttpDagServ) Remove(ctx context.Context, c cid.Cid) error { + return api.core().Block().Rm(ctx, iface.IpldPath(c)) //TODO: should we force rm? +} + +func (api *HttpDagServ) RemoveMany(ctx context.Context, cids []cid.Cid) error { + for _, c := range cids { + // TODO: optimize + if err := api.Remove(ctx, c); err != nil { + return err + } } - - return iface.IpldPath(out.Cid), nil + return nil } -func (api *DagAPI) Get(ctx context.Context, path iface.Path) (format.Node, error) { - panic("implement me") -} - -func (api *DagAPI) Tree(ctx context.Context, path iface.Path, opts ...caopts.DagTreeOption) ([]iface.Path, error) { - panic("implement me") -} - -func (api *DagAPI) Batch(ctx context.Context) iface.DagBatch { - panic("implement me") -} - -func (api *DagAPI) core() *HttpApi { +func (api *HttpDagServ) core() *HttpApi { return (*HttpApi)(api) }