mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-11 03:09:41 +08:00
Reimplement DAG as DAGService
This commit was moved from ipfs/go-ipfs-http-client@f34a5f6d25
This commit is contained in:
parent
9b24cf0aaf
commit
b7e258cc10
@ -10,6 +10,8 @@ import (
|
||||
|
||||
"github.com/ipfs/go-ipfs/core/coreapi/interface"
|
||||
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
|
||||
|
||||
"github.com/ipfs/go-ipld-format"
|
||||
homedir "github.com/mitchellh/go-homedir"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
manet "github.com/multiformats/go-multiaddr-net"
|
||||
@ -137,8 +139,8 @@ func (api *HttpApi) Block() iface.BlockAPI {
|
||||
return (*BlockAPI)(api)
|
||||
}
|
||||
|
||||
func (api *HttpApi) Dag() iface.DagAPI {
|
||||
return (*DagAPI)(api)
|
||||
func (api *HttpApi) Dag() format.DAGService {
|
||||
return (*HttpDagServ)(api)
|
||||
}
|
||||
|
||||
func (api *HttpApi) Name() iface.NameAPI {
|
||||
|
||||
@ -125,6 +125,11 @@ func (it *apiIter) Name() string {
|
||||
}
|
||||
|
||||
func (it *apiIter) Next() bool {
|
||||
if it.ctx.Err() != nil {
|
||||
it.err = it.ctx.Err()
|
||||
return false
|
||||
}
|
||||
|
||||
var out lsOutput
|
||||
if err := it.dec.Decode(&out); err != nil {
|
||||
if err != io.EOF {
|
||||
|
||||
@ -1,71 +1,104 @@
|
||||
package httpapi
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"io/ioutil"
|
||||
"sync"
|
||||
|
||||
"github.com/ipfs/go-ipfs/core/coreapi/interface"
|
||||
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
|
||||
"github.com/ipfs/go-ipfs/core/coreapi/interface/options"
|
||||
|
||||
"github.com/ipfs/go-block-format"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-ipld-format"
|
||||
mh "github.com/multiformats/go-multihash"
|
||||
)
|
||||
|
||||
type DagAPI HttpApi
|
||||
type HttpDagServ HttpApi
|
||||
|
||||
func (api *DagAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.DagPutOption) (iface.ResolvedPath, error) {
|
||||
options, err := caopts.DagPutOptions(opts...)
|
||||
func (api *HttpDagServ) Get(ctx context.Context, c cid.Cid) (format.Node, error) {
|
||||
r, err := api.core().Block().Get(ctx, iface.IpldPath(c))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
codec, ok := cid.CodecToStr[options.Codec]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unknowm codec %d", options.MhType)
|
||||
data, err := ioutil.ReadAll(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if options.MhLength != -1 {
|
||||
return nil, fmt.Errorf("setting hash len is not supported yet")
|
||||
blk, err := blocks.NewBlockWithCid(data, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var out struct {
|
||||
Cid cid.Cid
|
||||
}
|
||||
req := api.core().request("dag/put").
|
||||
Option("format", codec).
|
||||
Option("input-enc", options.InputEnc)
|
||||
return format.DefaultBlockDecoder.Decode(blk)
|
||||
}
|
||||
|
||||
if options.MhType != math.MaxUint64 {
|
||||
mht, ok := mh.Codes[options.MhType]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unknowm mhType %d", options.MhType)
|
||||
func (api *HttpDagServ) GetMany(ctx context.Context, cids []cid.Cid) <-chan *format.NodeOption {
|
||||
out := make(chan *format.NodeOption)
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(len(cids))
|
||||
|
||||
for _, c := range cids {
|
||||
// TODO: Consider limiting concurrency of this somehow
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
n, err := api.Get(ctx, c)
|
||||
|
||||
select {
|
||||
case out <- &format.NodeOption{Node: n, Err: err}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}()
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (api *HttpDagServ) Add(ctx context.Context, nd format.Node) error {
|
||||
c := nd.Cid()
|
||||
prefix := c.Prefix()
|
||||
format := cid.CodecToStr[prefix.Codec]
|
||||
if prefix.Version == 0 {
|
||||
format = "v0"
|
||||
}
|
||||
|
||||
stat, err := api.core().Block().Put(ctx, bytes.NewReader(nd.RawData()),
|
||||
options.Block.Hash(prefix.MhType, prefix.MhLength), options.Block.Format(format))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !stat.Path().Cid().Equals(c) {
|
||||
return fmt.Errorf("cids didn't match - local %s, remote %s", c.String(), stat.Path().Cid().String())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (api *HttpDagServ) AddMany(ctx context.Context, nds []format.Node) error {
|
||||
for _, nd := range nds {
|
||||
// TODO: optimize
|
||||
if err := api.Add(ctx, nd); err != nil {
|
||||
return err
|
||||
}
|
||||
req.Option("hash", mht)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
err = req.FileBody(src).Exec(ctx, &out)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
func (api *HttpDagServ) Remove(ctx context.Context, c cid.Cid) error {
|
||||
return api.core().Block().Rm(ctx, iface.IpldPath(c)) //TODO: should we force rm?
|
||||
}
|
||||
|
||||
func (api *HttpDagServ) RemoveMany(ctx context.Context, cids []cid.Cid) error {
|
||||
for _, c := range cids {
|
||||
// TODO: optimize
|
||||
if err := api.Remove(ctx, c); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return iface.IpldPath(out.Cid), nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (api *DagAPI) Get(ctx context.Context, path iface.Path) (format.Node, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (api *DagAPI) Tree(ctx context.Context, path iface.Path, opts ...caopts.DagTreeOption) ([]iface.Path, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (api *DagAPI) Batch(ctx context.Context) iface.DagBatch {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (api *DagAPI) core() *HttpApi {
|
||||
func (api *HttpDagServ) core() *HttpApi {
|
||||
return (*HttpApi)(api)
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user