coreapi: global offline option

License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:
Łukasz Magiera 2018-12-06 22:25:48 +01:00
parent 3183b1cb8e
commit c832a32a4c
11 changed files with 153 additions and 69 deletions

View File

@ -68,7 +68,10 @@ func (c *Context) GetAPI() (coreiface.CoreAPI, error) {
if err != nil {
return nil, err
}
c.api = coreapi.NewCoreAPI(n)
c.api, err = coreapi.NewCoreAPI(n)
if err != nil {
return nil, err
}
}
return c.api, nil
}

View File

@ -16,26 +16,27 @@ package coreapi
import (
"context"
"errors"
core "github.com/ipfs/go-ipfs/core"
"fmt"
"github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
namesys "github.com/ipfs/go-ipfs/namesys"
pin "github.com/ipfs/go-ipfs/pin"
repo "github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs/core/coreapi/interface/options"
"github.com/ipfs/go-ipfs/namesys"
"github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/repo"
ci "gx/ipfs/QmNiJiXwWE3kRhZrC5ej3kSjWHm337pYfhjLGSCDNKJP2s/go-libp2p-crypto"
exchange "gx/ipfs/QmP2g3VxmC7g7fyRJDj1VJ72KHZbJ9UW24YjSWEj1XTb4H/go-ipfs-exchange-interface"
"gx/ipfs/QmP2g3VxmC7g7fyRJDj1VJ72KHZbJ9UW24YjSWEj1XTb4H/go-ipfs-exchange-interface"
bserv "gx/ipfs/QmPoh3SrQzFBWtdGK6qmHDV4EanKR6kYPj4DD3J2NLoEmZ/go-blockservice"
routing "gx/ipfs/QmRASJXJUFygM5qU4YrH7k7jD6S4Hg8nJmgqJ4bYJvLatd/go-libp2p-routing"
blockstore "gx/ipfs/QmS2aqUZLJp8kF1ihE5rvDGE5LvmKDPnx32w9Z1BW9xLV5/go-ipfs-blockstore"
peer "gx/ipfs/QmY5Grm8pJdiSSVsYxx4uNRgweY72EmYwuSDbRnbFok3iY/go-libp2p-peer"
"gx/ipfs/QmRASJXJUFygM5qU4YrH7k7jD6S4Hg8nJmgqJ4bYJvLatd/go-libp2p-routing"
"gx/ipfs/QmS2aqUZLJp8kF1ihE5rvDGE5LvmKDPnx32w9Z1BW9xLV5/go-ipfs-blockstore"
"gx/ipfs/QmY5Grm8pJdiSSVsYxx4uNRgweY72EmYwuSDbRnbFok3iY/go-libp2p-peer"
pstore "gx/ipfs/QmZ9zH2FnLcxv1xyzFeUpDUeo55xEhZQHgveZijcxr7TLj/go-libp2p-peerstore"
pubsub "gx/ipfs/QmaqGyUhWLsJbVo1QAujSu13mxNjFJ98Kt2VWGSnShGE1Q/go-libp2p-pubsub"
ipld "gx/ipfs/QmcKKBwfz6FyQdHR2jsXrrF6XeSBXYL86anmWNewpFpoF5/go-ipld-format"
logging "gx/ipfs/QmcuXC5cxs79ro2cUuHs4HQ2bkDLJUYokwL8aivcX6HW3C/go-log"
dag "gx/ipfs/QmdV35UHnL1FM52baPkeUo6u7Fxm2CRUkPTLRPxeF8a4Ap/go-merkledag"
record "gx/ipfs/QmfARXVCzpwFXQdepAJZuqyNDgV9doEsMnVCo1ssmuSe1U/go-libp2p-record"
offlineroute "gx/ipfs/QmdmWkx54g7VfVyxeG8ic84uf4G6Eq1GohuyKA3XDuJ8oC/go-ipfs-routing/offline"
"gx/ipfs/QmfARXVCzpwFXQdepAJZuqyNDgV9doEsMnVCo1ssmuSe1U/go-libp2p-record"
p2phost "gx/ipfs/QmfD51tKgJiTMnW9JEiDiPwsCY4mqUoxkhKhBfyW12spTC/go-libp2p-host"
)
@ -50,20 +51,20 @@ type CoreAPI struct {
repo repo.Repo
blockstore blockstore.GCBlockstore
baseBlocks blockstore.Blockstore
blocks bserv.BlockService
dag ipld.DAGService
pinning pin.Pinner
blocks bserv.BlockService
dag ipld.DAGService
peerstore pstore.Peerstore
peerHost p2phost.Host
namesys namesys.NameSystem
recordValidator record.Validator
exchange exchange.Interface
routing routing.IpfsRouting
pubSub *pubsub.PubSub
namesys namesys.NameSystem
routing func(bool) (routing.IpfsRouting, error)
checkRouting func(bool) error
pubSub *pubsub.PubSub
// TODO: this can be generalized to all functions when we implement some
// api based security mechanism
@ -71,7 +72,12 @@ type CoreAPI struct {
}
// NewCoreAPI creates new instance of IPFS CoreAPI backed by go-ipfs Node.
func NewCoreAPI(n *core.IpfsNode, opts ...options.ApiOption) coreiface.CoreAPI {
func NewCoreAPI(n *core.IpfsNode, opts ...options.ApiOption) (coreiface.CoreAPI, error) {
settings, err := options.ApiOptions(opts...)
if err != nil {
return nil, err
}
api := &CoreAPI{
nctx: n.Context(),
@ -81,38 +87,75 @@ func NewCoreAPI(n *core.IpfsNode, opts ...options.ApiOption) coreiface.CoreAPI {
repo: n.Repo,
blockstore: n.Blockstore,
baseBlocks: n.BaseBlocks,
blocks: n.Blocks,
dag: n.DAG,
pinning: n.Pinning,
blocks: n.Blocks,
dag: n.DAG,
peerstore: n.Peerstore,
peerHost: n.PeerHost,
namesys: n.Namesys,
recordValidator: n.RecordValidator,
exchange: n.Exchange,
routing: n.Routing,
pubSub: n.PubSub,
checkRouting: func(allowOffline bool) error {
if !n.OnlineMode() {
if !allowOffline {
return coreiface.ErrOffline
}
return n.SetupOfflineRouting()
}
return nil
},
isPublishAllowed: func() error {
if n.Mounts.Ipns != nil && n.Mounts.Ipns.IsActive() {
return errors.New("cannot manually publish while IPNS is mounted")
}
return nil
},
pubSub: n.PubSub,
}
return api
api.routing = func(allowOffline bool) (routing.IpfsRouting, error) {
if !n.OnlineMode() {
if !allowOffline {
return nil, coreiface.ErrOffline
}
if err := n.SetupOfflineRouting(); err != nil {
return nil, err
}
api.privateKey = n.PrivateKey
api.namesys = n.Namesys
return n.Routing, nil
}
if !settings.Offline {
return n.Routing, nil
}
if !allowOffline {
return nil, coreiface.ErrOffline
}
//todo: might want to cache this
cfg, err := n.Repo.Config()
if err != nil {
return nil, err
}
cs := cfg.Ipns.ResolveCacheSize
if cs == 0 {
cs = 128
}
if cs < 0 {
return nil, fmt.Errorf("cannot specify negative resolve cache size")
}
offroute := offlineroute.NewOfflineRouter(api.repo.Datastore(), api.recordValidator)
api.namesys = namesys.NewNameSystem(offroute, api.repo.Datastore(), cs)
return offroute, nil
}
api.isPublishAllowed = func() error {
if n.Mounts.Ipns != nil && n.Mounts.Ipns.IsActive() {
return errors.New("cannot manually publish while IPNS is mounted")
}
return nil
}
if settings.Offline {
api.peerstore = nil
api.peerHost = nil
api.namesys = nil
api.recordValidator = nil
api.exchange = nil
}
return api, nil
}
// Unixfs returns the UnixfsAPI interface implementation backed by the go-ipfs node

View File

@ -2,7 +2,6 @@ package coreapi
import (
"context"
"errors"
"fmt"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
@ -22,7 +21,12 @@ import (
type DhtAPI CoreAPI
func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (pstore.PeerInfo, error) {
pi, err := api.routing.FindPeer(ctx, peer.ID(p))
r, err := api.routing(false)
if err != nil {
return pstore.PeerInfo{}, err
}
pi, err := r.FindPeer(ctx, peer.ID(p))
if err != nil {
return pstore.PeerInfo{}, err
}
@ -36,6 +40,11 @@ func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ...
return nil, err
}
r, err := api.routing(false)
if err != nil {
return nil, err
}
rp, err := api.core().ResolvePath(ctx, p)
if err != nil {
return nil, err
@ -46,7 +55,7 @@ func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ...
return nil, fmt.Errorf("number of providers must be greater than 0")
}
pchan := api.routing.FindProvidersAsync(ctx, rp.Cid(), numProviders)
pchan := r.FindProvidersAsync(ctx, rp.Cid(), numProviders)
return pchan, nil
}
@ -56,8 +65,9 @@ func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...cao
return err
}
if api.routing == nil {
return errors.New("cannot provide in offline mode")
r, err := api.routing(false)
if err != nil {
return err
}
rp, err := api.core().ResolvePath(ctx, path)
@ -77,9 +87,9 @@ func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...cao
}
if settings.Recursive {
err = provideKeysRec(ctx, api.routing, api.blockstore, []cid.Cid{c})
err = provideKeysRec(ctx, r, api.blockstore, []cid.Cid{c})
} else {
err = provideKeys(ctx, api.routing, []cid.Cid{c})
err = provideKeys(ctx, r, []cid.Cid{c})
}
if err != nil {
return err

View File

@ -47,7 +47,8 @@ func (api *NameAPI) Publish(ctx context.Context, p coreiface.Path, opts ...caopt
return nil, err
}
if err := api.checkRouting(options.AllowOffline); err != nil {
_, err = api.routing(options.AllowOffline)
if err != nil {
return nil, err
}
@ -88,7 +89,8 @@ func (api *NameAPI) Search(ctx context.Context, name string, opts ...caopts.Name
return nil, err
}
if err := api.checkRouting(true); err != nil {
r, err := api.routing(true)
if err != nil {
return nil, err
}
@ -98,13 +100,14 @@ func (api *NameAPI) Search(ctx context.Context, name string, opts ...caopts.Name
return nil, errors.New("cannot specify both local and nocache")
}
//TODO: can replaced with api.WithOpt(opts.Api.Offline(true))
if options.Local {
offroute := offline.NewOfflineRouter(api.repo.Datastore(), api.recordValidator)
resolver = namesys.NewIpnsResolver(offroute)
}
if !options.Cache {
resolver = namesys.NewNameSystem(api.routing, api.repo.Datastore(), 0)
resolver = namesys.NewNameSystem(r, api.repo.Datastore(), 0)
}
if !strings.HasPrefix(name, "/ipns/") {

View File

@ -30,7 +30,8 @@ type pubSubMessage struct {
}
func (api *PubSubAPI) Ls(ctx context.Context) ([]string, error) {
if err := api.checkNode(); err != nil {
_, err := api.checkNode()
if err != nil {
return nil, err
}
@ -38,7 +39,8 @@ func (api *PubSubAPI) Ls(ctx context.Context) ([]string, error) {
}
func (api *PubSubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOption) ([]peer.ID, error) {
if err := api.checkNode(); err != nil {
_, err := api.checkNode()
if err != nil {
return nil, err
}
@ -58,7 +60,8 @@ func (api *PubSubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOptio
}
func (api *PubSubAPI) Publish(ctx context.Context, topic string, data []byte) error {
if err := api.checkNode(); err != nil {
_, err := api.checkNode()
if err != nil {
return err
}
@ -68,7 +71,8 @@ func (api *PubSubAPI) Publish(ctx context.Context, topic string, data []byte) er
func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (coreiface.PubSubSubscription, error) {
options, err := caopts.PubSubSubscribeOptions(opts...)
if err := api.checkNode(); err != nil {
r, err := api.checkNode()
if err != nil {
return nil, err
}
@ -87,7 +91,7 @@ func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopt
return
}
connectToPubSubPeers(pubctx, api.routing, api.peerHost, blk.Path().Cid())
connectToPubSubPeers(pubctx, r, api.peerHost, blk.Path().Cid())
}()
}
@ -118,16 +122,17 @@ func connectToPubSubPeers(ctx context.Context, r routing.IpfsRouting, ph p2phost
wg.Wait()
}
func (api *PubSubAPI) checkNode() error {
if err := api.checkRouting(false); err != nil {
return err
}
func (api *PubSubAPI) checkNode() (routing.IpfsRouting, error) {
if api.pubSub == nil {
return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.")
return nil, errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.")
}
return nil
r, err := api.routing(false)
if err != nil {
return nil, err
}
return r, nil
}
func (sub *pubSubSubscription) Close() error {

View File

@ -102,7 +102,10 @@ func makeAPISwarm(ctx context.Context, fullIdentity bool, n int) ([]*core.IpfsNo
return nil, nil, err
}
nodes[i] = node
apis[i] = coreapi.NewCoreAPI(node)
apis[i], err = coreapi.NewCoreAPI(node)
if err != nil {
return nil, nil, err
}
}
err := mn.LinkAll()

View File

@ -25,11 +25,16 @@ func GatewayOption(writable bool, paths ...string) ServeOption {
return nil, err
}
api, err := coreapi.NewCoreAPI(n)
if err != nil {
return nil, err
}
gateway := newGatewayHandler(n, GatewayConfig{
Headers: cfg.Gateway.HTTPHeaders,
Writable: writable,
PathPrefixes: cfg.Gateway.PathPrefixes,
}, coreapi.NewCoreAPI(n))
}, api)
for _, p := range paths {
mux.Handle(p+"/", gateway)

View File

@ -118,7 +118,10 @@ func TestIpfsStressRead(t *testing.T) {
nd, mnt := setupIpfsTest(t, nil)
defer mnt.Close()
api := coreapi.NewCoreAPI(nd)
api, err := coreapi.NewCoreAPI(nd)
if err != nil {
t.Fatal(err)
}
var nodes []ipld.Node
var paths []string

View File

@ -120,7 +120,10 @@ func DirectAddCat(data []byte, conf testutil.LatencyConfig) error {
}
defer catter.Close()
catterApi := coreapi.NewCoreAPI(catter)
catterApi, err := coreapi.NewCoreAPI(catter)
if err != nil {
return err
}
err = mn.LinkAll()
if err != nil {

View File

@ -66,7 +66,10 @@ func benchCat(b *testing.B, data []byte, conf testutil.LatencyConfig) error {
}
defer catter.Close()
catterApi := coreapi.NewCoreAPI(catter)
catterApi, err := coreapi.NewCoreAPI(catter)
if err != nil {
return err
}
err = mn.LinkAll()
if err != nil {

View File

@ -103,7 +103,10 @@ func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error {
}
defer catter.Close()
catterApi := coreapi.NewCoreAPI(catter)
catterApi, err := coreapi.NewCoreAPI(catter)
if err != nil {
return err
}
mn.LinkAll()