Merge pull request ipfs/go-ipfs-http-client#1 from ipfs/feat/implement

Initial implementation

This commit was moved from ipfs/go-ipfs-http-client@449b614d69
This commit is contained in:
Łukasz Magiera 2019-02-23 03:45:29 +01:00 committed by GitHub
commit 4b30924309
17 changed files with 2673 additions and 0 deletions

183
client/httpapi/api.go Normal file
View File

@ -0,0 +1,183 @@
package httpapi
import (
"fmt"
"io/ioutil"
gohttp "net/http"
"os"
"path"
"strings"
iface "github.com/ipfs/interface-go-ipfs-core"
caopts "github.com/ipfs/interface-go-ipfs-core/options"
homedir "github.com/mitchellh/go-homedir"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)
const (
DefaultPathName = ".ipfs"
DefaultPathRoot = "~/" + DefaultPathName
DefaultApiFile = "api"
EnvDir = "IPFS_PATH"
)
// HttpApi implements github.com/ipfs/interface-go-ipfs-core/CoreAPI using
// IPFS HTTP API.
//
// For interface docs see
// https://godoc.org/github.com/ipfs/interface-go-ipfs-core#CoreAPI
type HttpApi struct {
url string
httpcli gohttp.Client
applyGlobal func(*RequestBuilder)
}
// NewLocalApi tries to construct new HttpApi instance communicating with local
// IPFS daemon
//
// Daemon api address is pulled from the $IPFS_PATH/api file.
// If $IPFS_PATH env var is not present, it defaults to ~/.ipfs
func NewLocalApi() (iface.CoreAPI, error) {
baseDir := os.Getenv(EnvDir)
if baseDir == "" {
baseDir = DefaultPathRoot
}
return NewPathApi(baseDir)
}
// NewPathApi constructs new HttpApi by pulling api address from specified
// ipfspath. Api file should be located at $ipfspath/api
func NewPathApi(ipfspath string) (iface.CoreAPI, error) {
a, err := ApiAddr(ipfspath)
if err != nil {
if os.IsNotExist(err) {
err = nil
}
return nil, err
}
return NewApi(a)
}
// ApiAddr reads api file in specified ipfs path
func ApiAddr(ipfspath string) (ma.Multiaddr, error) {
baseDir, err := homedir.Expand(ipfspath)
if err != nil {
return nil, err
}
apiFile := path.Join(baseDir, DefaultApiFile)
api, err := ioutil.ReadFile(apiFile)
if err != nil {
return nil, err
}
return ma.NewMultiaddr(strings.TrimSpace(string(api)))
}
// NewApi constructs HttpApi with specified endpoint
func NewApi(a ma.Multiaddr) (*HttpApi, error) {
c := &gohttp.Client{
Transport: &gohttp.Transport{
Proxy: gohttp.ProxyFromEnvironment,
DisableKeepAlives: true,
},
}
return NewApiWithClient(a, c)
}
// NewApiWithClient constructs HttpApi with specified endpoint and custom http client
func NewApiWithClient(a ma.Multiaddr, c *gohttp.Client) (*HttpApi, error) {
_, url, err := manet.DialArgs(a)
if err != nil {
return nil, err
}
if a, err := ma.NewMultiaddr(url); err == nil {
_, host, err := manet.DialArgs(a)
if err == nil {
url = host
}
}
api := &HttpApi{
url: url,
httpcli: *c,
applyGlobal: func(*RequestBuilder) {},
}
// We don't support redirects.
api.httpcli.CheckRedirect = func(_ *gohttp.Request, _ []*gohttp.Request) error {
return fmt.Errorf("unexpected redirect")
}
return api, nil
}
func (api *HttpApi) WithOptions(opts ...caopts.ApiOption) (iface.CoreAPI, error) {
options, err := caopts.ApiOptions(opts...)
if err != nil {
return nil, err
}
subApi := *api
subApi.applyGlobal = func(req *RequestBuilder) {
if options.Offline {
req.Option("offline", options.Offline)
}
}
return &subApi, nil
}
func (api *HttpApi) request(command string, args ...string) *RequestBuilder {
return &RequestBuilder{
command: command,
args: args,
shell: api,
}
}
func (api *HttpApi) Unixfs() iface.UnixfsAPI {
return (*UnixfsAPI)(api)
}
func (api *HttpApi) Block() iface.BlockAPI {
return (*BlockAPI)(api)
}
func (api *HttpApi) Dag() iface.APIDagService {
return (*HttpDagServ)(api)
}
func (api *HttpApi) Name() iface.NameAPI {
return (*NameAPI)(api)
}
func (api *HttpApi) Key() iface.KeyAPI {
return (*KeyAPI)(api)
}
func (api *HttpApi) Pin() iface.PinAPI {
return (*PinAPI)(api)
}
func (api *HttpApi) Object() iface.ObjectAPI {
return (*ObjectAPI)(api)
}
func (api *HttpApi) Dht() iface.DhtAPI {
return (*DhtAPI)(api)
}
func (api *HttpApi) Swarm() iface.SwarmAPI {
return (*SwarmAPI)(api)
}
func (api *HttpApi) PubSub() iface.PubSubAPI {
return (*PubsubAPI)(api)
}

213
client/httpapi/api_test.go Normal file
View File

@ -0,0 +1,213 @@
package httpapi
import (
"context"
"io/ioutil"
gohttp "net/http"
"os"
"strconv"
"sync"
"testing"
"github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/tests"
local "github.com/ipfs/iptb-plugins/local"
"github.com/ipfs/iptb/testbed"
"github.com/ipfs/iptb/testbed/interfaces"
ma "github.com/multiformats/go-multiaddr"
)
const parallelSpeculativeNodes = 15 // 15 seems to work best
func init() {
_, err := testbed.RegisterPlugin(testbed.IptbPlugin{
From: "<builtin>",
NewNode: local.NewNode,
GetAttrList: local.GetAttrList,
GetAttrDesc: local.GetAttrDesc,
PluginName: local.PluginName,
BuiltIn: true,
}, false)
if err != nil {
panic(err)
}
}
type NodeProvider struct {
simple <-chan func(context.Context) ([]iface.CoreAPI, error)
}
func newNodeProvider(ctx context.Context) *NodeProvider {
simpleNodes := make(chan func(context.Context) ([]iface.CoreAPI, error), parallelSpeculativeNodes)
np := &NodeProvider{
simple: simpleNodes,
}
// start basic nodes speculatively in parallel
for i := 0; i < parallelSpeculativeNodes; i++ {
go func() {
for {
ctx, cancel := context.WithCancel(ctx)
snd, err := np.makeAPISwarm(ctx, false, 1)
res := func(ctx context.Context) ([]iface.CoreAPI, error) {
if err != nil {
return nil, err
}
go func() {
<-ctx.Done()
cancel()
}()
return snd, nil
}
select {
case simpleNodes <- res:
case <-ctx.Done():
return
}
}
}()
}
return np
}
func (np *NodeProvider) MakeAPISwarm(ctx context.Context, fullIdentity bool, n int) ([]iface.CoreAPI, error) {
if !fullIdentity && n == 1 {
return (<-np.simple)(ctx)
}
return np.makeAPISwarm(ctx, fullIdentity, n)
}
func (NodeProvider) makeAPISwarm(ctx context.Context, fullIdentity bool, n int) ([]iface.CoreAPI, error) {
dir, err := ioutil.TempDir("", "httpapi-tb-")
if err != nil {
return nil, err
}
tb := testbed.NewTestbed(dir)
specs, err := testbed.BuildSpecs(tb.Dir(), n, "localipfs", nil)
if err != nil {
return nil, err
}
if err := testbed.WriteNodeSpecs(tb.Dir(), specs); err != nil {
return nil, err
}
nodes, err := tb.Nodes()
if err != nil {
return nil, err
}
apis := make([]iface.CoreAPI, n)
wg := sync.WaitGroup{}
zero := sync.WaitGroup{}
wg.Add(len(nodes))
zero.Add(1)
errs := make(chan error, len(nodes))
for i, nd := range nodes {
go func(i int, nd testbedi.Core) {
defer wg.Done()
if _, err := nd.Init(ctx, "--empty-repo"); err != nil {
errs <- err
return
}
if _, err := nd.RunCmd(ctx, nil, "ipfs", "config", "--json", "Experimental.FilestoreEnabled", "true"); err != nil {
errs <- err
return
}
if _, err := nd.Start(ctx, true, "--enable-pubsub-experiment", "--offline="+strconv.FormatBool(n == 1)); err != nil {
errs <- err
return
}
if i > 0 {
zero.Wait()
if err := nd.Connect(ctx, nodes[0]); err != nil {
errs <- err
return
}
} else {
zero.Done()
}
addr, err := nd.APIAddr()
if err != nil {
errs <- err
return
}
maddr, err := ma.NewMultiaddr(addr)
if err != nil {
errs <- err
return
}
c := &gohttp.Client{
Transport: &gohttp.Transport{
Proxy: gohttp.ProxyFromEnvironment,
DisableKeepAlives: true,
DisableCompression: true,
},
}
apis[i], err = NewApiWithClient(maddr, c)
if err != nil {
errs <- err
return
}
// empty node is pinned even with --empty-repo, we don't want that
emptyNode, err := iface.ParsePath("/ipfs/QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn")
if err != nil {
errs <- err
return
}
if err := apis[i].Pin().Rm(ctx, emptyNode); err != nil {
errs <- err
return
}
}(i, nd)
}
wg.Wait()
go func() {
<-ctx.Done()
defer os.Remove(dir)
defer func() {
for _, nd := range nodes {
_ = nd.Stop(context.Background())
}
}()
}()
select {
case err = <-errs:
default:
}
return apis, err
}
func TestHttpApi(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tests.TestApi(newNodeProvider(ctx))(t)
}

254
client/httpapi/apifile.go Normal file
View File

@ -0,0 +1,254 @@
package httpapi
import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-files"
"github.com/ipfs/interface-go-ipfs-core"
)
const forwardSeekLimit = 1 << 14 //16k
func (api *UnixfsAPI) Get(ctx context.Context, p iface.Path) (files.Node, error) {
if p.Mutable() { // use resolved path in case we are dealing with IPNS / MFS
var err error
p, err = api.core().ResolvePath(ctx, p)
if err != nil {
return nil, err
}
}
var stat struct {
Hash string
Type string
Size int64 // unixfs size
}
err := api.core().request("files/stat", p.String()).Exec(ctx, &stat)
if err != nil {
return nil, err
}
switch stat.Type {
case "file":
return api.getFile(ctx, p, stat.Size)
case "directory":
return api.getDir(ctx, p, stat.Size)
default:
return nil, fmt.Errorf("unsupported file type '%s'", stat.Type)
}
}
type apiFile struct {
ctx context.Context
core *HttpApi
size int64
path iface.Path
r *Response
at int64
}
func (f *apiFile) reset() error {
if f.r != nil {
f.r.Cancel()
}
req := f.core.request("cat", f.path.String())
if f.at != 0 {
req.Option("offset", f.at)
}
resp, err := req.Send(f.ctx)
if err != nil {
return err
}
if resp.Error != nil {
return resp.Error
}
f.r = resp
return nil
}
func (f *apiFile) Read(p []byte) (int, error) {
n, err := f.r.Output.Read(p)
if n > 0 {
f.at += int64(n)
}
return n, err
}
func (f *apiFile) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekEnd:
offset = f.size + offset
case io.SeekCurrent:
offset = f.at + offset
}
if f.at == offset { //noop
return offset, nil
}
if f.at < offset && offset-f.at < forwardSeekLimit { //forward skip
r, err := io.CopyN(ioutil.Discard, f.r.Output, offset-f.at)
f.at += r
return f.at, err
}
f.at = offset
return f.at, f.reset()
}
func (f *apiFile) Close() error {
if f.r != nil {
return f.r.Cancel()
}
return nil
}
func (f *apiFile) Size() (int64, error) {
return f.size, nil
}
func (api *UnixfsAPI) getFile(ctx context.Context, p iface.Path, size int64) (files.Node, error) {
f := &apiFile{
ctx: ctx,
core: api.core(),
size: size,
path: p,
}
return f, f.reset()
}
type apiIter struct {
ctx context.Context
core *UnixfsAPI
err error
dec *json.Decoder
curFile files.Node
cur lsLink
}
func (it *apiIter) Err() error {
return it.err
}
func (it *apiIter) Name() string {
return it.cur.Name
}
func (it *apiIter) Next() bool {
if it.ctx.Err() != nil {
it.err = it.ctx.Err()
return false
}
var out lsOutput
if err := it.dec.Decode(&out); err != nil {
if err != io.EOF {
it.err = err
}
return false
}
if len(out.Objects) != 1 {
it.err = fmt.Errorf("ls returned more objects than expected (%d)", len(out.Objects))
return false
}
if len(out.Objects[0].Links) != 1 {
it.err = fmt.Errorf("ls returned more links than expected (%d)", len(out.Objects[0].Links))
return false
}
it.cur = out.Objects[0].Links[0]
c, err := cid.Parse(it.cur.Hash)
if err != nil {
it.err = err
return false
}
switch it.cur.Type {
case iface.THAMTShard:
fallthrough
case iface.TMetadata:
fallthrough
case iface.TDirectory:
it.curFile, err = it.core.getDir(it.ctx, iface.IpfsPath(c), int64(it.cur.Size))
if err != nil {
it.err = err
return false
}
case iface.TFile:
it.curFile, err = it.core.getFile(it.ctx, iface.IpfsPath(c), int64(it.cur.Size))
if err != nil {
it.err = err
return false
}
default:
it.err = fmt.Errorf("file type %d not supported", it.cur.Type)
return false
}
return true
}
func (it *apiIter) Node() files.Node {
return it.curFile
}
type apiDir struct {
ctx context.Context
core *UnixfsAPI
size int64
path iface.Path
dec *json.Decoder
}
func (d *apiDir) Close() error {
return nil
}
func (d *apiDir) Size() (int64, error) {
return d.size, nil
}
func (d *apiDir) Entries() files.DirIterator {
return &apiIter{
ctx: d.ctx,
core: d.core,
dec: d.dec,
}
}
func (api *UnixfsAPI) getDir(ctx context.Context, p iface.Path, size int64) (files.Node, error) {
resp, err := api.core().request("ls", p.String()).
Option("resolve-size", true).
Option("stream", true).Send(ctx)
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}
d := &apiDir{
ctx: ctx,
core: api,
size: size,
path: p,
dec: json.NewDecoder(resp.Output),
}
return d, nil
}
var _ files.File = &apiFile{}
var _ files.Directory = &apiDir{}

124
client/httpapi/block.go Normal file
View File

@ -0,0 +1,124 @@
package httpapi
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"github.com/ipfs/go-cid"
"github.com/ipfs/interface-go-ipfs-core"
caopts "github.com/ipfs/interface-go-ipfs-core/options"
mh "github.com/multiformats/go-multihash"
)
type BlockAPI HttpApi
type blockStat struct {
Key string
BSize int `json:"Size"`
cid cid.Cid
}
func (s *blockStat) Size() int {
return s.BSize
}
func (s *blockStat) Path() iface.ResolvedPath {
return iface.IpldPath(s.cid)
}
func (api *BlockAPI) Put(ctx context.Context, r io.Reader, opts ...caopts.BlockPutOption) (iface.BlockStat, error) {
options, _, err := caopts.BlockPutOptions(opts...)
if err != nil {
return nil, err
}
mht, ok := mh.Codes[options.MhType]
if !ok {
return nil, fmt.Errorf("unknowm mhType %d", options.MhType)
}
req := api.core().request("block/put").
Option("mhtype", mht).
Option("mhlen", options.MhLength).
Option("format", options.Codec).
Option("pin", options.Pin).
FileBody(r)
var out blockStat
if err := req.Exec(ctx, &out); err != nil {
return nil, err
}
out.cid, err = cid.Parse(out.Key)
if err != nil {
return nil, err
}
return &out, nil
}
func (api *BlockAPI) Get(ctx context.Context, p iface.Path) (io.Reader, error) {
resp, err := api.core().request("block/get", p.String()).Send(ctx)
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}
//TODO: make get return ReadCloser to avoid copying
defer resp.Close()
b := new(bytes.Buffer)
if _, err := io.Copy(b, resp.Output); err != nil {
return nil, err
}
return b, nil
}
func (api *BlockAPI) Rm(ctx context.Context, p iface.Path, opts ...caopts.BlockRmOption) error {
options, err := caopts.BlockRmOptions(opts...)
if err != nil {
return err
}
removedBlock := struct {
Hash string `json:",omitempty"`
Error string `json:",omitempty"`
}{}
req := api.core().request("block/rm").
Option("force", options.Force).
Arguments(p.String())
if err := req.Exec(ctx, &removedBlock); err != nil {
return err
}
if removedBlock.Error != "" {
return errors.New(removedBlock.Error)
}
return nil
}
func (api *BlockAPI) Stat(ctx context.Context, p iface.Path) (iface.BlockStat, error) {
var out blockStat
err := api.core().request("block/stat", p.String()).Exec(ctx, &out)
if err != nil {
return nil, err
}
out.cid, err = cid.Parse(out.Key)
if err != nil {
return nil, err
}
return &out, nil
}
func (api *BlockAPI) core() *HttpApi {
return (*HttpApi)(api)
}

127
client/httpapi/dag.go Normal file
View File

@ -0,0 +1,127 @@
package httpapi
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipld-format"
"github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/options"
)
type httpNodeAdder HttpApi
type HttpDagServ httpNodeAdder
type pinningHttpNodeAdder httpNodeAdder
func (api *HttpDagServ) Get(ctx context.Context, c cid.Cid) (format.Node, error) {
r, err := api.core().Block().Get(ctx, iface.IpldPath(c))
if err != nil {
return nil, err
}
data, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
blk, err := blocks.NewBlockWithCid(data, c)
if err != nil {
return nil, err
}
return format.DefaultBlockDecoder.Decode(blk)
}
func (api *HttpDagServ) GetMany(ctx context.Context, cids []cid.Cid) <-chan *format.NodeOption {
out := make(chan *format.NodeOption)
for _, c := range cids {
// TODO: Consider limiting concurrency of this somehow
go func(c cid.Cid) {
n, err := api.Get(ctx, c)
select {
case out <- &format.NodeOption{Node: n, Err: err}:
case <-ctx.Done():
}
}(c)
}
return out
}
func (api *httpNodeAdder) add(ctx context.Context, nd format.Node, pin bool) error {
c := nd.Cid()
prefix := c.Prefix()
format := cid.CodecToStr[prefix.Codec]
if prefix.Version == 0 {
format = "v0"
}
stat, err := api.core().Block().Put(ctx, bytes.NewReader(nd.RawData()),
options.Block.Hash(prefix.MhType, prefix.MhLength),
options.Block.Format(format),
options.Block.Pin(pin))
if err != nil {
return err
}
if !stat.Path().Cid().Equals(c) {
return fmt.Errorf("cids didn't match - local %s, remote %s", c.String(), stat.Path().Cid().String())
}
return nil
}
func (api *httpNodeAdder) addMany(ctx context.Context, nds []format.Node, pin bool) error {
for _, nd := range nds {
// TODO: optimize
if err := api.add(ctx, nd, pin); err != nil {
return err
}
}
return nil
}
func (api *HttpDagServ) AddMany(ctx context.Context, nds []format.Node) error {
return (*httpNodeAdder)(api).addMany(ctx, nds, false)
}
func (api *HttpDagServ) Add(ctx context.Context, nd format.Node) error {
return (*httpNodeAdder)(api).add(ctx, nd, false)
}
func (api *pinningHttpNodeAdder) Add(ctx context.Context, nd format.Node) error {
return (*httpNodeAdder)(api).add(ctx, nd, true)
}
func (api *pinningHttpNodeAdder) AddMany(ctx context.Context, nds []format.Node) error {
return (*httpNodeAdder)(api).addMany(ctx, nds, true)
}
func (api *HttpDagServ) Pinning() format.NodeAdder {
return (*pinningHttpNodeAdder)(api)
}
func (api *HttpDagServ) Remove(ctx context.Context, c cid.Cid) error {
return api.core().Block().Rm(ctx, iface.IpldPath(c)) //TODO: should we force rm?
}
func (api *HttpDagServ) RemoveMany(ctx context.Context, cids []cid.Cid) error {
for _, c := range cids {
// TODO: optimize
if err := api.Remove(ctx, c); err != nil {
return err
}
}
return nil
}
func (api *httpNodeAdder) core() *HttpApi {
return (*HttpApi)(api)
}
func (api *HttpDagServ) core() *HttpApi {
return (*HttpApi)(api)
}

114
client/httpapi/dht.go Normal file
View File

@ -0,0 +1,114 @@
package httpapi
import (
"context"
"encoding/json"
"github.com/ipfs/interface-go-ipfs-core"
caopts "github.com/ipfs/interface-go-ipfs-core/options"
"github.com/libp2p/go-libp2p-peer"
"github.com/libp2p/go-libp2p-peerstore"
notif "github.com/libp2p/go-libp2p-routing/notifications"
)
type DhtAPI HttpApi
func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (peerstore.PeerInfo, error) {
var out struct {
Type notif.QueryEventType
Responses []peerstore.PeerInfo
}
resp, err := api.core().request("dht/findpeer", p.Pretty()).Send(ctx)
if err != nil {
return peerstore.PeerInfo{}, err
}
if resp.Error != nil {
return peerstore.PeerInfo{}, resp.Error
}
defer resp.Close()
dec := json.NewDecoder(resp.Output)
for {
if err := dec.Decode(&out); err != nil {
return peerstore.PeerInfo{}, err
}
if out.Type == notif.FinalPeer {
return out.Responses[0], nil
}
}
}
func (api *DhtAPI) FindProviders(ctx context.Context, p iface.Path, opts ...caopts.DhtFindProvidersOption) (<-chan peerstore.PeerInfo, error) {
options, err := caopts.DhtFindProvidersOptions(opts...)
if err != nil {
return nil, err
}
rp, err := api.core().ResolvePath(ctx, p)
if err != nil {
return nil, err
}
resp, err := api.core().request("dht/findprovs", rp.Cid().String()).
Option("num-providers", options.NumProviders).
Send(ctx)
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}
res := make(chan peerstore.PeerInfo)
go func() {
defer resp.Close()
defer close(res)
dec := json.NewDecoder(resp.Output)
for {
var out struct {
Extra string
Type notif.QueryEventType
Responses []peerstore.PeerInfo
}
if err := dec.Decode(&out); err != nil {
return // todo: handle this somehow
}
if out.Type == notif.QueryError {
return // usually a 'not found' error
// todo: handle other errors
}
if out.Type == notif.Provider {
for _, pi := range out.Responses {
select {
case res <- pi:
case <-ctx.Done():
return
}
}
}
}
}()
return res, nil
}
func (api *DhtAPI) Provide(ctx context.Context, p iface.Path, opts ...caopts.DhtProvideOption) error {
options, err := caopts.DhtProvideOptions(opts...)
if err != nil {
return err
}
rp, err := api.core().ResolvePath(ctx, p)
if err != nil {
return err
}
return api.core().request("dht/provide", rp.Cid().String()).
Option("recursive", options.Recursive).
Exec(ctx, nil)
}
func (api *DhtAPI) core() *HttpApi {
return (*HttpApi)(api)
}

123
client/httpapi/key.go Normal file
View File

@ -0,0 +1,123 @@
package httpapi
import (
"context"
"errors"
"github.com/ipfs/interface-go-ipfs-core"
caopts "github.com/ipfs/interface-go-ipfs-core/options"
"github.com/libp2p/go-libp2p-peer"
)
type KeyAPI HttpApi
type keyOutput struct {
JName string `json:"Name"`
Id string
pid peer.ID
}
func (k *keyOutput) Name() string {
return k.JName
}
func (k *keyOutput) Path() iface.Path {
p, _ := iface.ParsePath("/ipns/" + k.Id)
return p
}
func (k *keyOutput) ID() peer.ID {
return k.pid
}
func (api *KeyAPI) Generate(ctx context.Context, name string, opts ...caopts.KeyGenerateOption) (iface.Key, error) {
options, err := caopts.KeyGenerateOptions(opts...)
if err != nil {
return nil, err
}
var out keyOutput
err = api.core().request("key/gen", name).
Option("type", options.Algorithm).
Option("size", options.Size).
Exec(ctx, &out)
if err != nil {
return nil, err
}
out.pid, err = peer.IDB58Decode(out.Id)
return &out, err
}
func (api *KeyAPI) Rename(ctx context.Context, oldName string, newName string, opts ...caopts.KeyRenameOption) (iface.Key, bool, error) {
options, err := caopts.KeyRenameOptions(opts...)
if err != nil {
return nil, false, err
}
var out struct {
Was string
Now string
Id string
Overwrite bool
}
err = api.core().request("key/rename", oldName, newName).
Option("force", options.Force).
Exec(ctx, &out)
if err != nil {
return nil, false, err
}
id := &keyOutput{JName: out.Now, Id: out.Id}
id.pid, err = peer.IDB58Decode(id.Id)
return id, out.Overwrite, err
}
func (api *KeyAPI) List(ctx context.Context) ([]iface.Key, error) {
var out struct{ Keys []*keyOutput }
if err := api.core().request("key/list").Exec(ctx, &out); err != nil {
return nil, err
}
res := make([]iface.Key, len(out.Keys))
for i, k := range out.Keys {
var err error
k.pid, err = peer.IDB58Decode(k.Id)
if err != nil {
return nil, err
}
res[i] = k
}
return res, nil
}
func (api *KeyAPI) Self(ctx context.Context) (iface.Key, error) {
var id struct{ ID string }
if err := api.core().request("id").Exec(ctx, &id); err != nil {
return nil, err
}
var err error
out := keyOutput{JName: "self", Id: id.ID}
out.pid, err = peer.IDB58Decode(out.Id)
return &out, err
}
func (api *KeyAPI) Remove(ctx context.Context, name string) (iface.Key, error) {
var out struct{ Keys []keyOutput }
if err := api.core().request("key/rm", name).Exec(ctx, &out); err != nil {
return nil, err
}
if len(out.Keys) != 1 {
return nil, errors.New("got unexpected number of keys back")
}
var err error
out.Keys[0].pid, err = peer.IDB58Decode(out.Keys[0].Id)
return &out.Keys[0], err
}
func (api *KeyAPI) core() *HttpApi {
return (*HttpApi)(api)
}

139
client/httpapi/name.go Normal file
View File

@ -0,0 +1,139 @@
package httpapi
import (
"context"
"encoding/json"
"fmt"
"io"
"github.com/ipfs/interface-go-ipfs-core"
caopts "github.com/ipfs/interface-go-ipfs-core/options"
"github.com/ipfs/interface-go-ipfs-core/options/namesys"
)
type NameAPI HttpApi
type ipnsEntry struct {
JName string `json:"Name"`
JValue string `json:"Value"`
path iface.Path
}
func (e *ipnsEntry) Name() string {
return e.JName
}
func (e *ipnsEntry) Value() iface.Path {
return e.path
}
func (api *NameAPI) Publish(ctx context.Context, p iface.Path, opts ...caopts.NamePublishOption) (iface.IpnsEntry, error) {
options, err := caopts.NamePublishOptions(opts...)
if err != nil {
return nil, err
}
req := api.core().request("name/publish", p.String()).
Option("key", options.Key).
Option("allow-offline", options.AllowOffline).
Option("lifetime", options.ValidTime).
Option("resolve", false)
if options.TTL != nil {
req.Option("ttl", options.TTL)
}
var out ipnsEntry
if err := req.Exec(ctx, &out); err != nil {
return nil, err
}
out.path, err = iface.ParsePath(out.JValue)
return &out, err
}
func (api *NameAPI) Search(ctx context.Context, name string, opts ...caopts.NameResolveOption) (<-chan iface.IpnsResult, error) {
options, err := caopts.NameResolveOptions(opts...)
if err != nil {
return nil, err
}
ropts := nsopts.ProcessOpts(options.ResolveOpts)
if ropts.Depth != nsopts.DefaultDepthLimit && ropts.Depth != 1 {
return nil, fmt.Errorf("Name.Resolve: depth other than 1 or %d not supported", nsopts.DefaultDepthLimit)
}
req := api.core().request("name/resolve", name).
Option("nocache", !options.Cache).
Option("recursive", ropts.Depth != 1).
Option("dht-record-count", ropts.DhtRecordCount).
Option("dht-timeout", ropts.DhtTimeout).
Option("stream", true)
resp, err := req.Send(ctx)
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}
res := make(chan iface.IpnsResult)
go func() {
defer close(res)
defer resp.Close()
dec := json.NewDecoder(resp.Output)
for {
var out struct{ Path string }
err := dec.Decode(&out)
if err == io.EOF {
return
}
var ires iface.IpnsResult
if err == nil {
ires.Path, err = iface.ParsePath(out.Path)
}
select {
case res <- ires:
case <-ctx.Done():
}
if err != nil {
return
}
}
}()
return res, nil
}
func (api *NameAPI) Resolve(ctx context.Context, name string, opts ...caopts.NameResolveOption) (iface.Path, error) {
options, err := caopts.NameResolveOptions(opts...)
if err != nil {
return nil, err
}
ropts := nsopts.ProcessOpts(options.ResolveOpts)
if ropts.Depth != nsopts.DefaultDepthLimit && ropts.Depth != 1 {
return nil, fmt.Errorf("Name.Resolve: depth other than 1 or %d not supported", nsopts.DefaultDepthLimit)
}
req := api.core().request("name/resolve", name).
Option("nocache", !options.Cache).
Option("recursive", ropts.Depth != 1).
Option("dht-record-count", ropts.DhtRecordCount).
Option("dht-timeout", ropts.DhtTimeout)
var out struct{ Path string }
if err := req.Exec(ctx, &out); err != nil {
return nil, err
}
return iface.ParsePath(out.Path)
}
func (api *NameAPI) core() *HttpApi {
return (*HttpApi)(api)
}

261
client/httpapi/object.go Normal file
View File

@ -0,0 +1,261 @@
package httpapi
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
dag "github.com/ipfs/go-merkledag"
ft "github.com/ipfs/go-unixfs"
"github.com/ipfs/interface-go-ipfs-core"
caopts "github.com/ipfs/interface-go-ipfs-core/options"
)
type ObjectAPI HttpApi
type objectOut struct {
Hash string
}
func (api *ObjectAPI) New(ctx context.Context, opts ...caopts.ObjectNewOption) (ipld.Node, error) {
options, err := caopts.ObjectNewOptions(opts...)
if err != nil {
return nil, err
}
var n ipld.Node
switch options.Type {
case "empty":
n = new(dag.ProtoNode)
case "unixfs-dir":
n = ft.EmptyDirNode()
default:
return nil, fmt.Errorf("unknown object type: %s", options.Type)
}
return n, nil
}
func (api *ObjectAPI) Put(ctx context.Context, r io.Reader, opts ...caopts.ObjectPutOption) (iface.ResolvedPath, error) {
options, err := caopts.ObjectPutOptions(opts...)
if err != nil {
return nil, err
}
var out objectOut
err = api.core().request("object/put").
Option("inputenc", options.InputEnc).
Option("datafieldenc", options.DataType).
Option("pin", options.Pin).
FileBody(r).
Exec(ctx, &out)
if err != nil {
return nil, err
}
c, err := cid.Parse(out.Hash)
if err != nil {
return nil, err
}
return iface.IpfsPath(c), nil
}
func (api *ObjectAPI) Get(ctx context.Context, p iface.Path) (ipld.Node, error) {
r, err := api.core().Block().Get(ctx, p)
if err != nil {
return nil, err
}
b, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
return merkledag.DecodeProtobuf(b)
}
func (api *ObjectAPI) Data(ctx context.Context, p iface.Path) (io.Reader, error) {
resp, err := api.core().request("object/data", p.String()).Send(ctx)
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}
//TODO: make Data return ReadCloser to avoid copying
defer resp.Close()
b := new(bytes.Buffer)
if _, err := io.Copy(b, resp.Output); err != nil {
return nil, err
}
return b, nil
}
func (api *ObjectAPI) Links(ctx context.Context, p iface.Path) ([]*ipld.Link, error) {
var out struct {
Links []struct {
Name string
Hash string
Size uint64
}
}
if err := api.core().request("object/links", p.String()).Exec(ctx, &out); err != nil {
return nil, err
}
res := make([]*ipld.Link, len(out.Links))
for i, l := range out.Links {
c, err := cid.Parse(l.Hash)
if err != nil {
return nil, err
}
res[i] = &ipld.Link{
Cid: c,
Name: l.Name,
Size: l.Size,
}
}
return res, nil
}
func (api *ObjectAPI) Stat(ctx context.Context, p iface.Path) (*iface.ObjectStat, error) {
var out struct {
Hash string
NumLinks int
BlockSize int
LinksSize int
DataSize int
CumulativeSize int
}
if err := api.core().request("object/stat", p.String()).Exec(ctx, &out); err != nil {
return nil, err
}
c, err := cid.Parse(out.Hash)
if err != nil {
return nil, err
}
return &iface.ObjectStat{
Cid: c,
NumLinks: out.NumLinks,
BlockSize: out.BlockSize,
LinksSize: out.LinksSize,
DataSize: out.DataSize,
CumulativeSize: out.CumulativeSize,
}, nil
}
func (api *ObjectAPI) AddLink(ctx context.Context, base iface.Path, name string, child iface.Path, opts ...caopts.ObjectAddLinkOption) (iface.ResolvedPath, error) {
options, err := caopts.ObjectAddLinkOptions(opts...)
if err != nil {
return nil, err
}
var out objectOut
err = api.core().request("object/patch/add-link", base.String(), name, child.String()).
Option("create", options.Create).
Exec(ctx, &out)
if err != nil {
return nil, err
}
c, err := cid.Parse(out.Hash)
if err != nil {
return nil, err
}
return iface.IpfsPath(c), nil
}
func (api *ObjectAPI) RmLink(ctx context.Context, base iface.Path, link string) (iface.ResolvedPath, error) {
var out objectOut
err := api.core().request("object/patch/rm-link", base.String(), link).
Exec(ctx, &out)
if err != nil {
return nil, err
}
c, err := cid.Parse(out.Hash)
if err != nil {
return nil, err
}
return iface.IpfsPath(c), nil
}
func (api *ObjectAPI) AppendData(ctx context.Context, p iface.Path, r io.Reader) (iface.ResolvedPath, error) {
var out objectOut
err := api.core().request("object/patch/append-data", p.String()).
FileBody(r).
Exec(ctx, &out)
if err != nil {
return nil, err
}
c, err := cid.Parse(out.Hash)
if err != nil {
return nil, err
}
return iface.IpfsPath(c), nil
}
func (api *ObjectAPI) SetData(ctx context.Context, p iface.Path, r io.Reader) (iface.ResolvedPath, error) {
var out objectOut
err := api.core().request("object/patch/set-data", p.String()).
FileBody(r).
Exec(ctx, &out)
if err != nil {
return nil, err
}
c, err := cid.Parse(out.Hash)
if err != nil {
return nil, err
}
return iface.IpfsPath(c), nil
}
type change struct {
Type iface.ChangeType
Path string
Before cid.Cid
After cid.Cid
}
func (api *ObjectAPI) Diff(ctx context.Context, a iface.Path, b iface.Path) ([]iface.ObjectChange, error) {
var out struct {
Changes []change
}
if err := api.core().request("object/diff", a.String(), b.String()).Exec(ctx, &out); err != nil {
return nil, err
}
res := make([]iface.ObjectChange, len(out.Changes))
for i, ch := range out.Changes {
res[i] = iface.ObjectChange{
Type: ch.Type,
Path: ch.Path,
}
if ch.Before != cid.Undef {
res[i].Before = iface.IpfsPath(ch.Before)
}
if ch.After != cid.Undef {
res[i].After = iface.IpfsPath(ch.After)
}
}
return res, nil
}
func (api *ObjectAPI) core() *HttpApi {
return (*HttpApi)(api)
}

52
client/httpapi/path.go Normal file
View File

@ -0,0 +1,52 @@
package httpapi
import (
"context"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
ipfspath "github.com/ipfs/go-path"
"github.com/ipfs/interface-go-ipfs-core"
)
func (api *HttpApi) ResolvePath(ctx context.Context, path iface.Path) (iface.ResolvedPath, error) {
var out struct {
Cid cid.Cid
RemPath string
}
//TODO: this is hacky, fixing https://github.com/ipfs/go-ipfs/issues/5703 would help
var err error
if path.Namespace() == "ipns" {
if path, err = api.Name().Resolve(ctx, path.String()); err != nil {
return nil, err
}
}
if err := api.request("dag/resolve", path.String()).Exec(ctx, &out); err != nil {
return nil, err
}
// TODO:
ipath, err := ipfspath.FromSegments("/"+path.Namespace()+"/", out.Cid.String(), out.RemPath)
if err != nil {
return nil, err
}
root, err := cid.Parse(ipfspath.Path(path.String()).Segments()[1])
if err != nil {
return nil, err
}
return iface.NewResolvedPath(ipath, out.Cid, root, out.RemPath), nil
}
func (api *HttpApi) ResolveNode(ctx context.Context, p iface.Path) (ipld.Node, error) {
rp, err := api.ResolvePath(ctx, p)
if err != nil {
return nil, err
}
return api.Dag().Get(ctx, rp.Cid())
}

183
client/httpapi/pin.go Normal file
View File

@ -0,0 +1,183 @@
package httpapi
import (
"context"
"encoding/json"
"github.com/ipfs/go-cid"
"github.com/ipfs/interface-go-ipfs-core"
caopts "github.com/ipfs/interface-go-ipfs-core/options"
"github.com/pkg/errors"
)
type PinAPI HttpApi
type pinRefKeyObject struct {
Type string
}
type pinRefKeyList struct {
Keys map[string]pinRefKeyObject
}
type pin struct {
path iface.ResolvedPath
typ string
}
func (p *pin) Path() iface.ResolvedPath {
return p.path
}
func (p *pin) Type() string {
return p.typ
}
func (api *PinAPI) Add(ctx context.Context, p iface.Path, opts ...caopts.PinAddOption) error {
options, err := caopts.PinAddOptions(opts...)
if err != nil {
return err
}
return api.core().request("pin/add", p.String()).
Option("recursive", options.Recursive).Exec(ctx, nil)
}
func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) ([]iface.Pin, error) {
options, err := caopts.PinLsOptions(opts...)
if err != nil {
return nil, err
}
var out pinRefKeyList
err = api.core().request("pin/ls").
Option("type", options.Type).Exec(ctx, &out)
if err != nil {
return nil, err
}
pins := make([]iface.Pin, 0, len(out.Keys))
for hash, p := range out.Keys {
c, err := cid.Parse(hash)
if err != nil {
return nil, err
}
pins = append(pins, &pin{typ: p.Type, path: iface.IpldPath(c)})
}
return pins, nil
}
func (api *PinAPI) Rm(ctx context.Context, p iface.Path, opts ...caopts.PinRmOption) error {
options, err := caopts.PinRmOptions(opts...)
if err != nil {
return err
}
return api.core().request("pin/rm", p.String()).
Option("recursive", options.Recursive).
Exec(ctx, nil)
}
func (api *PinAPI) Update(ctx context.Context, from iface.Path, to iface.Path, opts ...caopts.PinUpdateOption) error {
options, err := caopts.PinUpdateOptions(opts...)
if err != nil {
return err
}
return api.core().request("pin/update").
Option("unpin", options.Unpin).Exec(ctx, nil)
}
type pinVerifyRes struct {
ok bool
badNodes []iface.BadPinNode
}
func (r *pinVerifyRes) Ok() bool {
return r.ok
}
func (r *pinVerifyRes) BadNodes() []iface.BadPinNode {
return r.badNodes
}
type badNode struct {
err error
cid cid.Cid
}
func (n *badNode) Path() iface.ResolvedPath {
return iface.IpldPath(n.cid)
}
func (n *badNode) Err() error {
return n.err
}
func (api *PinAPI) Verify(ctx context.Context) (<-chan iface.PinStatus, error) {
resp, err := api.core().request("pin/verify").Option("verbose", true).Send(ctx)
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}
res := make(chan iface.PinStatus)
go func() {
defer resp.Close()
defer close(res)
dec := json.NewDecoder(resp.Output)
for {
var out struct {
Cid string
Ok bool
BadNodes []struct {
Cid string
Err string
}
}
if err := dec.Decode(&out); err != nil {
return // todo: handle non io.EOF somehow
}
badNodes := make([]iface.BadPinNode, len(out.BadNodes))
for i, n := range out.BadNodes {
c, err := cid.Decode(n.Cid)
if err != nil {
badNodes[i] = &badNode{
cid: c,
err: err,
}
continue
}
if n.Err != "" {
err = errors.New(n.Err)
}
badNodes[i] = &badNode{
cid: c,
err: err,
}
}
select {
case res <- &pinVerifyRes{
ok: out.Ok,
badNodes: badNodes,
}:
case <-ctx.Done():
return
}
}
}()
return res, nil
}
func (api *PinAPI) core() *HttpApi {
return (*HttpApi)(api)
}

170
client/httpapi/pubsub.go Normal file
View File

@ -0,0 +1,170 @@
package httpapi
import (
"bytes"
"context"
"encoding/json"
"io"
"github.com/ipfs/interface-go-ipfs-core"
caopts "github.com/ipfs/interface-go-ipfs-core/options"
"github.com/libp2p/go-libp2p-peer"
)
type PubsubAPI HttpApi
func (api *PubsubAPI) Ls(ctx context.Context) ([]string, error) {
var out struct {
Strings []string
}
if err := api.core().request("pubsub/ls").Exec(ctx, &out); err != nil {
return nil, err
}
return out.Strings, nil
}
func (api *PubsubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOption) ([]peer.ID, error) {
options, err := caopts.PubSubPeersOptions(opts...)
if err != nil {
return nil, err
}
var out struct {
Strings []string
}
if err := api.core().request("pubsub/peers", options.Topic).Exec(ctx, &out); err != nil {
return nil, err
}
res := make([]peer.ID, len(out.Strings))
for i, sid := range out.Strings {
id, err := peer.IDB58Decode(sid)
if err != nil {
return nil, err
}
res[i] = id
}
return res, nil
}
func (api *PubsubAPI) Publish(ctx context.Context, topic string, message []byte) error {
return api.core().request("pubsub/pub", topic).
FileBody(bytes.NewReader(message)).
Exec(ctx, nil)
}
type pubsubSub struct {
messages chan pubsubMessage
done chan struct{}
rcloser func() error
}
type pubsubMessage struct {
JFrom []byte `json:"from,omitempty"`
JData []byte `json:"data,omitempty"`
JSeqno []byte `json:"seqno,omitempty"`
JTopicIDs []string `json:"topicIDs,omitempty"`
from peer.ID
err error
}
func (msg *pubsubMessage) From() peer.ID {
return msg.from
}
func (msg *pubsubMessage) Data() []byte {
return msg.JData
}
func (msg *pubsubMessage) Seq() []byte {
return msg.JSeqno
}
func (msg *pubsubMessage) Topics() []string {
return msg.JTopicIDs
}
func (s *pubsubSub) Next(ctx context.Context) (iface.PubSubMessage, error) {
select {
case msg, ok := <-s.messages:
if !ok {
return nil, io.EOF
}
if msg.err != nil {
return nil, msg.err
}
var err error
msg.from, err = peer.IDFromBytes(msg.JFrom)
return &msg, err
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (api *PubsubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (iface.PubSubSubscription, error) {
options, err := caopts.PubSubSubscribeOptions(opts...)
if err != nil {
return nil, err
}
resp, err := api.core().request("pubsub/sub", topic).
Option("discover", options.Discover).Send(ctx)
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}
sub := &pubsubSub{
messages: make(chan pubsubMessage),
done: make(chan struct{}),
rcloser: func() error {
return resp.Cancel()
},
}
dec := json.NewDecoder(resp.Output)
go func() {
defer close(sub.messages)
for {
var msg pubsubMessage
if err := dec.Decode(&msg); err != nil {
if err == io.EOF {
return
}
msg.err = err
}
select {
case sub.messages <- msg:
case <-sub.done:
return
case <-ctx.Done():
return
}
}
}()
return sub, nil
}
func (s *pubsubSub) Close() error {
if s.done != nil {
close(s.done)
s.done = nil
}
return s.rcloser()
}
func (api *PubsubAPI) core() *HttpApi {
return (*HttpApi)(api)
}

34
client/httpapi/request.go Normal file
View File

@ -0,0 +1,34 @@
package httpapi
import (
"context"
"io"
"strings"
)
type Request struct {
ApiBase string
Command string
Args []string
Opts map[string]string
Body io.Reader
Headers map[string]string
}
func NewRequest(ctx context.Context, url, command string, args ...string) *Request {
if !strings.HasPrefix(url, "http") {
url = "http://" + url
}
opts := map[string]string{
"encoding": "json",
"stream-channels": "true",
}
return &Request{
ApiBase: url + "/api/v0",
Command: command,
Args: args,
Opts: opts,
Headers: make(map[string]string),
}
}

View File

@ -0,0 +1,114 @@
package httpapi
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"strconv"
"strings"
"github.com/ipfs/go-ipfs-files"
)
// RequestBuilder is an IPFS commands request builder.
type RequestBuilder struct {
command string
args []string
opts map[string]string
headers map[string]string
body io.Reader
shell *HttpApi
}
// Arguments adds the arguments to the args.
func (r *RequestBuilder) Arguments(args ...string) *RequestBuilder {
r.args = append(r.args, args...)
return r
}
// BodyString sets the request body to the given string.
func (r *RequestBuilder) BodyString(body string) *RequestBuilder {
return r.Body(strings.NewReader(body))
}
// BodyBytes sets the request body to the given buffer.
func (r *RequestBuilder) BodyBytes(body []byte) *RequestBuilder {
return r.Body(bytes.NewReader(body))
}
// Body sets the request body to the given reader.
func (r *RequestBuilder) Body(body io.Reader) *RequestBuilder {
r.body = body
return r
}
// FileBody sets the request body to the given reader wrapped into multipartreader.
func (r *RequestBuilder) FileBody(body io.Reader) *RequestBuilder {
pr, _ := files.NewReaderPathFile("/dev/stdin", ioutil.NopCloser(body), nil)
d := files.NewMapDirectory(map[string]files.Node{"": pr})
r.body = files.NewMultiFileReader(d, false)
return r
}
// Option sets the given option.
func (r *RequestBuilder) Option(key string, value interface{}) *RequestBuilder {
var s string
switch v := value.(type) {
case bool:
s = strconv.FormatBool(v)
case string:
s = v
case []byte:
s = string(v)
default:
// slow case.
s = fmt.Sprint(value)
}
if r.opts == nil {
r.opts = make(map[string]string, 1)
}
r.opts[key] = s
return r
}
// Header sets the given header.
func (r *RequestBuilder) Header(name, value string) *RequestBuilder {
if r.headers == nil {
r.headers = make(map[string]string, 1)
}
r.headers[name] = value
return r
}
// Send sends the request and return the response.
func (r *RequestBuilder) Send(ctx context.Context) (*Response, error) {
r.shell.applyGlobal(r)
req := NewRequest(ctx, r.shell.url, r.command, r.args...)
req.Opts = r.opts
req.Headers = r.headers
req.Body = r.body
return req.Send(&r.shell.httpcli)
}
// Exec sends the request a request and decodes the response.
func (r *RequestBuilder) Exec(ctx context.Context, res interface{}) error {
httpRes, err := r.Send(ctx)
if err != nil {
return err
}
if res == nil {
lateErr := httpRes.Close()
if httpRes.Error != nil {
return httpRes.Error
}
return lateErr
}
return httpRes.decode(res)
}

168
client/httpapi/response.go Normal file
View File

@ -0,0 +1,168 @@
package httpapi
import (
"encoding/json"
"errors"
"fmt"
"github.com/ipfs/go-ipfs-files"
"io"
"io/ioutil"
"mime"
"net/http"
"net/url"
"os"
)
type trailerReader struct {
resp *http.Response
}
func (r *trailerReader) Read(b []byte) (int, error) {
n, err := r.resp.Body.Read(b)
if err != nil {
if e := r.resp.Trailer.Get("X-Stream-Error"); e != "" {
err = errors.New(e)
}
}
return n, err
}
func (r *trailerReader) Close() error {
return r.resp.Body.Close()
}
type Response struct {
Output io.ReadCloser
Error *Error
}
func (r *Response) Close() error {
if r.Output != nil {
// drain output (response body)
_, err1 := io.Copy(ioutil.Discard, r.Output)
err2 := r.Output.Close()
if err1 != nil {
return err1
}
return err2
}
return nil
}
// Cancel aborts running request (without draining request body)
func (r *Response) Cancel() error {
if r.Output != nil {
return r.Output.Close()
}
return nil
}
// Decode reads request body and decodes it as json
func (r *Response) decode(dec interface{}) error {
if r.Error != nil {
return r.Error
}
err := json.NewDecoder(r.Output).Decode(dec)
err2 := r.Close()
if err != nil {
return err
}
return err2
}
type Error struct {
Command string
Message string
Code int
}
func (e *Error) Error() string {
var out string
if e.Code != 0 {
out = fmt.Sprintf("%s%d: ", out, e.Code)
}
return out + e.Message
}
func (r *Request) Send(c *http.Client) (*Response, error) {
url := r.getURL()
req, err := http.NewRequest("POST", url, r.Body)
if err != nil {
return nil, err
}
// Add any headers that were supplied via the RequestBuilder.
for k, v := range r.Headers {
req.Header.Add(k, v)
}
if fr, ok := r.Body.(*files.MultiFileReader); ok {
req.Header.Set("Content-Type", "multipart/form-data; boundary="+fr.Boundary())
req.Header.Set("Content-Disposition", "form-data; name=\"files\"")
}
resp, err := c.Do(req)
if err != nil {
return nil, err
}
contentType, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
if err != nil {
return nil, err
}
nresp := new(Response)
nresp.Output = &trailerReader{resp}
if resp.StatusCode >= http.StatusBadRequest {
e := &Error{
Command: r.Command,
}
switch {
case resp.StatusCode == http.StatusNotFound:
e.Message = "command not found"
case contentType == "text/plain":
out, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Fprintf(os.Stderr, "ipfs-shell: warning! response (%d) read error: %s\n", resp.StatusCode, err)
}
e.Message = string(out)
case contentType == "application/json":
if err = json.NewDecoder(resp.Body).Decode(e); err != nil {
fmt.Fprintf(os.Stderr, "ipfs-shell: warning! response (%d) unmarshall error: %s\n", resp.StatusCode, err)
}
default:
fmt.Fprintf(os.Stderr, "ipfs-shell: warning! unhandled response (%d) encoding: %s", resp.StatusCode, contentType)
out, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Fprintf(os.Stderr, "ipfs-shell: response (%d) read error: %s\n", resp.StatusCode, err)
}
e.Message = fmt.Sprintf("unknown ipfs-shell error encoding: %q - %q", contentType, out)
}
nresp.Error = e
nresp.Output = nil
// drain body and close
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}
return nresp, nil
}
func (r *Request) getURL() string {
values := make(url.Values)
for _, arg := range r.Args {
values.Add("arg", arg)
}
for k, v := range r.Opts {
values.Add(k, v)
}
return fmt.Sprintf("%s/%s?%s", r.ApiBase, r.Command, values.Encode())
}

187
client/httpapi/swarm.go Normal file
View File

@ -0,0 +1,187 @@
package httpapi
import (
"context"
"time"
"github.com/ipfs/interface-go-ipfs-core"
inet "github.com/libp2p/go-libp2p-net"
"github.com/libp2p/go-libp2p-peer"
"github.com/libp2p/go-libp2p-peerstore"
"github.com/libp2p/go-libp2p-protocol"
"github.com/multiformats/go-multiaddr"
)
type SwarmAPI HttpApi
func (api *SwarmAPI) Connect(ctx context.Context, pi peerstore.PeerInfo) error {
pidma, err := multiaddr.NewComponent("p2p", pi.ID.Pretty())
if err != nil {
return err
}
saddrs := make([]string, len(pi.Addrs))
for i, addr := range pi.Addrs {
saddrs[i] = addr.Encapsulate(pidma).String()
}
return api.core().request("swarm/connect", saddrs...).Exec(ctx, nil)
}
func (api *SwarmAPI) Disconnect(ctx context.Context, addr multiaddr.Multiaddr) error {
return api.core().request("swarm/disconnect", addr.String()).Exec(ctx, nil)
}
type connInfo struct {
addr multiaddr.Multiaddr
peer peer.ID
latency time.Duration
muxer string
direction inet.Direction
streams []protocol.ID
}
func (c *connInfo) ID() peer.ID {
return c.peer
}
func (c *connInfo) Address() multiaddr.Multiaddr {
return c.addr
}
func (c *connInfo) Direction() inet.Direction {
return c.direction
}
func (c *connInfo) Latency() (time.Duration, error) {
return c.latency, nil
}
func (c *connInfo) Streams() ([]protocol.ID, error) {
return c.streams, nil
}
func (api *SwarmAPI) Peers(ctx context.Context) ([]iface.ConnectionInfo, error) {
var resp struct {
Peers []struct {
Addr string
Peer string
Latency time.Duration
Muxer string
Direction inet.Direction
Streams []struct {
Protocol string
}
}
}
err := api.core().request("swarm/peers").
Option("streams", true).
Option("latency", true).
Exec(ctx, &resp)
if err != nil {
return nil, err
}
res := make([]iface.ConnectionInfo, len(resp.Peers))
for i, conn := range resp.Peers {
out := &connInfo{
latency: conn.Latency,
muxer: conn.Muxer,
direction: conn.Direction,
}
out.peer, err = peer.IDB58Decode(conn.Peer)
if err != nil {
return nil, err
}
out.addr, err = multiaddr.NewMultiaddr(conn.Addr)
if err != nil {
return nil, err
}
out.streams = make([]protocol.ID, len(conn.Streams))
for i, p := range conn.Streams {
out.streams[i] = protocol.ID(p.Protocol)
}
res[i] = out
}
return res, nil
}
func (api *SwarmAPI) KnownAddrs(ctx context.Context) (map[peer.ID][]multiaddr.Multiaddr, error) {
var out struct {
Addrs map[string][]string
}
if err := api.core().request("swarm/addrs").Exec(ctx, &out); err != nil {
return nil, err
}
res := map[peer.ID][]multiaddr.Multiaddr{}
for spid, saddrs := range out.Addrs {
addrs := make([]multiaddr.Multiaddr, len(saddrs))
for i, addr := range saddrs {
a, err := multiaddr.NewMultiaddr(addr)
if err != nil {
return nil, err
}
addrs[i] = a
}
pid, err := peer.IDB58Decode(spid)
if err != nil {
return nil, err
}
res[pid] = addrs
}
return res, nil
}
func (api *SwarmAPI) LocalAddrs(ctx context.Context) ([]multiaddr.Multiaddr, error) {
var out struct {
Strings []string
}
if err := api.core().request("swarm/addrs/local").Exec(ctx, &out); err != nil {
return nil, err
}
res := make([]multiaddr.Multiaddr, len(out.Strings))
for i, addr := range out.Strings {
ma, err := multiaddr.NewMultiaddr(addr)
if err != nil {
return nil, err
}
res[i] = ma
}
return res, nil
}
func (api *SwarmAPI) ListenAddrs(ctx context.Context) ([]multiaddr.Multiaddr, error) {
var out struct {
Strings []string
}
if err := api.core().request("swarm/addrs/listen").Exec(ctx, &out); err != nil {
return nil, err
}
res := make([]multiaddr.Multiaddr, len(out.Strings))
for i, addr := range out.Strings {
ma, err := multiaddr.NewMultiaddr(addr)
if err != nil {
return nil, err
}
res[i] = ma
}
return res, nil
}
func (api *SwarmAPI) core() *HttpApi {
return (*HttpApi)(api)
}

227
client/httpapi/unixfs.go Normal file
View File

@ -0,0 +1,227 @@
package httpapi
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-files"
"github.com/ipfs/go-ipld-format"
"github.com/ipfs/interface-go-ipfs-core"
caopts "github.com/ipfs/interface-go-ipfs-core/options"
mh "github.com/multiformats/go-multihash"
)
type addEvent struct {
Name string
Hash string `json:",omitempty"`
Bytes int64 `json:",omitempty"`
Size string `json:",omitempty"`
}
type UnixfsAPI HttpApi
func (api *UnixfsAPI) Add(ctx context.Context, f files.Node, opts ...caopts.UnixfsAddOption) (iface.ResolvedPath, error) {
options, _, err := caopts.UnixfsAddOptions(opts...)
if err != nil {
return nil, err
}
mht, ok := mh.Codes[options.MhType]
if !ok {
return nil, fmt.Errorf("unknowm mhType %d", options.MhType)
}
req := api.core().request("add").
Option("hash", mht).
Option("chunker", options.Chunker).
Option("cid-version", options.CidVersion).
Option("fscache", options.FsCache).
Option("hidden", options.Hidden).
Option("inline", options.Inline).
Option("inline-limit", options.InlineLimit).
Option("nocopy", options.NoCopy).
Option("only-hash", options.OnlyHash).
Option("pin", options.Pin).
Option("silent", options.Silent).
Option("stdin-name", options.StdinName).
Option("wrap-with-directory", options.Wrap).
Option("progress", options.Progress)
if options.RawLeavesSet {
req.Option("raw-leaves", options.RawLeaves)
}
switch options.Layout {
case caopts.BalancedLayout:
// noop, default
case caopts.TrickleLayout:
req.Option("trickle", true)
}
switch c := f.(type) {
case files.Directory:
req.Body(files.NewMultiFileReader(c, false))
case files.File:
d := files.NewMapDirectory(map[string]files.Node{"": c}) // unwrapped on the other side
req.Body(files.NewMultiFileReader(d, false))
}
var out addEvent
resp, err := req.Send(ctx)
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}
defer resp.Output.Close()
dec := json.NewDecoder(resp.Output)
loop:
for {
var evt addEvent
switch err := dec.Decode(&evt); err {
case nil:
case io.EOF:
break loop
default:
return nil, err
}
out = evt
if options.Events != nil {
ifevt := &iface.AddEvent{
Name: out.Name,
Size: out.Size,
Bytes: out.Bytes,
}
if out.Hash != "" {
c, err := cid.Parse(out.Hash)
if err != nil {
return nil, err
}
ifevt.Path = iface.IpfsPath(c)
}
select {
case options.Events <- ifevt:
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
c, err := cid.Parse(out.Hash)
if err != nil {
return nil, err
}
return iface.IpfsPath(c), nil
}
type lsLink struct {
Name, Hash string
Size uint64
Type iface.FileType
}
type lsObject struct {
Hash string
Links []lsLink
}
type lsOutput struct {
Objects []lsObject
}
func (api *UnixfsAPI) Ls(ctx context.Context, p iface.Path, opts ...caopts.UnixfsLsOption) (<-chan iface.LsLink, error) {
options, err := caopts.UnixfsLsOptions(opts...)
if err != nil {
return nil, err
}
resp, err := api.core().request("ls", p.String()).
Option("resolve-type", options.ResolveChildren).
Option("size", options.ResolveChildren).
Option("stream", true).
Send(ctx)
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}
dec := json.NewDecoder(resp.Output)
out := make(chan iface.LsLink)
go func() {
defer resp.Close()
defer close(out)
for {
var link lsOutput
if err := dec.Decode(&link); err != nil {
if err == io.EOF {
return
}
select {
case out <- iface.LsLink{Err: err}:
case <-ctx.Done():
}
return
}
if len(link.Objects) != 1 {
select {
case out <- iface.LsLink{Err: errors.New("unexpected Objects len")}:
case <-ctx.Done():
}
return
}
if len(link.Objects[0].Links) != 1 {
select {
case out <- iface.LsLink{Err: errors.New("unexpected Links len")}:
case <-ctx.Done():
}
return
}
l0 := link.Objects[0].Links[0]
c, err := cid.Decode(l0.Hash)
if err != nil {
select {
case out <- iface.LsLink{Err: err}:
case <-ctx.Done():
}
return
}
select {
case out <- iface.LsLink{
Link: &format.Link{
Cid: c,
Name: l0.Name,
Size: l0.Size,
},
Size: l0.Size,
Type: l0.Type,
}:
case <-ctx.Done():
}
}
}()
return out, nil
}
func (api *UnixfsAPI) core() *HttpApi {
return (*HttpApi)(api)
}