From c832a32a4c5b6633df9efe4793edb837317ed9fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 6 Dec 2018 22:25:48 +0100 Subject: [PATCH] coreapi: global offline option MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Ɓukasz Magiera --- commands/context.go | 5 +- core/coreapi/coreapi.go | 123 +++++++++++++++------- core/coreapi/dht.go | 24 +++-- core/coreapi/name.go | 9 +- core/coreapi/pubsub.go | 29 ++--- core/coreapi/unixfs_test.go | 5 +- core/corehttp/gateway.go | 7 +- fuse/readonly/ipfs_test.go | 5 +- test/integration/addcat_test.go | 5 +- test/integration/bench_cat_test.go | 5 +- test/integration/three_legged_cat_test.go | 5 +- 11 files changed, 153 insertions(+), 69 deletions(-) diff --git a/commands/context.go b/commands/context.go index 41d6b05d0..29fe652f1 100644 --- a/commands/context.go +++ b/commands/context.go @@ -68,7 +68,10 @@ func (c *Context) GetAPI() (coreiface.CoreAPI, error) { if err != nil { return nil, err } - c.api = coreapi.NewCoreAPI(n) + c.api, err = coreapi.NewCoreAPI(n) + if err != nil { + return nil, err + } } return c.api, nil } diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index ee21bdef8..e9055677f 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -16,26 +16,27 @@ package coreapi import ( "context" "errors" - - core "github.com/ipfs/go-ipfs/core" + "fmt" + "github.com/ipfs/go-ipfs/core" coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" - options "github.com/ipfs/go-ipfs/core/coreapi/interface/options" - namesys "github.com/ipfs/go-ipfs/namesys" - pin "github.com/ipfs/go-ipfs/pin" - repo "github.com/ipfs/go-ipfs/repo" + "github.com/ipfs/go-ipfs/core/coreapi/interface/options" + "github.com/ipfs/go-ipfs/namesys" + "github.com/ipfs/go-ipfs/pin" + "github.com/ipfs/go-ipfs/repo" ci "gx/ipfs/QmNiJiXwWE3kRhZrC5ej3kSjWHm337pYfhjLGSCDNKJP2s/go-libp2p-crypto" - exchange "gx/ipfs/QmP2g3VxmC7g7fyRJDj1VJ72KHZbJ9UW24YjSWEj1XTb4H/go-ipfs-exchange-interface" + "gx/ipfs/QmP2g3VxmC7g7fyRJDj1VJ72KHZbJ9UW24YjSWEj1XTb4H/go-ipfs-exchange-interface" bserv "gx/ipfs/QmPoh3SrQzFBWtdGK6qmHDV4EanKR6kYPj4DD3J2NLoEmZ/go-blockservice" - routing "gx/ipfs/QmRASJXJUFygM5qU4YrH7k7jD6S4Hg8nJmgqJ4bYJvLatd/go-libp2p-routing" - blockstore "gx/ipfs/QmS2aqUZLJp8kF1ihE5rvDGE5LvmKDPnx32w9Z1BW9xLV5/go-ipfs-blockstore" - peer "gx/ipfs/QmY5Grm8pJdiSSVsYxx4uNRgweY72EmYwuSDbRnbFok3iY/go-libp2p-peer" + "gx/ipfs/QmRASJXJUFygM5qU4YrH7k7jD6S4Hg8nJmgqJ4bYJvLatd/go-libp2p-routing" + "gx/ipfs/QmS2aqUZLJp8kF1ihE5rvDGE5LvmKDPnx32w9Z1BW9xLV5/go-ipfs-blockstore" + "gx/ipfs/QmY5Grm8pJdiSSVsYxx4uNRgweY72EmYwuSDbRnbFok3iY/go-libp2p-peer" pstore "gx/ipfs/QmZ9zH2FnLcxv1xyzFeUpDUeo55xEhZQHgveZijcxr7TLj/go-libp2p-peerstore" pubsub "gx/ipfs/QmaqGyUhWLsJbVo1QAujSu13mxNjFJ98Kt2VWGSnShGE1Q/go-libp2p-pubsub" ipld "gx/ipfs/QmcKKBwfz6FyQdHR2jsXrrF6XeSBXYL86anmWNewpFpoF5/go-ipld-format" logging "gx/ipfs/QmcuXC5cxs79ro2cUuHs4HQ2bkDLJUYokwL8aivcX6HW3C/go-log" dag "gx/ipfs/QmdV35UHnL1FM52baPkeUo6u7Fxm2CRUkPTLRPxeF8a4Ap/go-merkledag" - record "gx/ipfs/QmfARXVCzpwFXQdepAJZuqyNDgV9doEsMnVCo1ssmuSe1U/go-libp2p-record" + offlineroute "gx/ipfs/QmdmWkx54g7VfVyxeG8ic84uf4G6Eq1GohuyKA3XDuJ8oC/go-ipfs-routing/offline" + "gx/ipfs/QmfARXVCzpwFXQdepAJZuqyNDgV9doEsMnVCo1ssmuSe1U/go-libp2p-record" p2phost "gx/ipfs/QmfD51tKgJiTMnW9JEiDiPwsCY4mqUoxkhKhBfyW12spTC/go-libp2p-host" ) @@ -50,20 +51,20 @@ type CoreAPI struct { repo repo.Repo blockstore blockstore.GCBlockstore baseBlocks blockstore.Blockstore - blocks bserv.BlockService - dag ipld.DAGService pinning pin.Pinner + blocks bserv.BlockService + dag ipld.DAGService + peerstore pstore.Peerstore peerHost p2phost.Host - namesys namesys.NameSystem recordValidator record.Validator exchange exchange.Interface - routing routing.IpfsRouting - pubSub *pubsub.PubSub + namesys namesys.NameSystem + routing func(bool) (routing.IpfsRouting, error) - checkRouting func(bool) error + pubSub *pubsub.PubSub // TODO: this can be generalized to all functions when we implement some // api based security mechanism @@ -71,7 +72,12 @@ type CoreAPI struct { } // NewCoreAPI creates new instance of IPFS CoreAPI backed by go-ipfs Node. -func NewCoreAPI(n *core.IpfsNode, opts ...options.ApiOption) coreiface.CoreAPI { +func NewCoreAPI(n *core.IpfsNode, opts ...options.ApiOption) (coreiface.CoreAPI, error) { + settings, err := options.ApiOptions(opts...) + if err != nil { + return nil, err + } + api := &CoreAPI{ nctx: n.Context(), @@ -81,38 +87,75 @@ func NewCoreAPI(n *core.IpfsNode, opts ...options.ApiOption) coreiface.CoreAPI { repo: n.Repo, blockstore: n.Blockstore, baseBlocks: n.BaseBlocks, - blocks: n.Blocks, - dag: n.DAG, pinning: n.Pinning, + blocks: n.Blocks, + dag: n.DAG, + peerstore: n.Peerstore, peerHost: n.PeerHost, namesys: n.Namesys, recordValidator: n.RecordValidator, exchange: n.Exchange, - routing: n.Routing, - pubSub: n.PubSub, - - checkRouting: func(allowOffline bool) error { - if !n.OnlineMode() { - if !allowOffline { - return coreiface.ErrOffline - } - return n.SetupOfflineRouting() - } - return nil - }, - - isPublishAllowed: func() error { - if n.Mounts.Ipns != nil && n.Mounts.Ipns.IsActive() { - return errors.New("cannot manually publish while IPNS is mounted") - } - return nil - }, + pubSub: n.PubSub, } - return api + api.routing = func(allowOffline bool) (routing.IpfsRouting, error) { + if !n.OnlineMode() { + if !allowOffline { + return nil, coreiface.ErrOffline + } + if err := n.SetupOfflineRouting(); err != nil { + return nil, err + } + api.privateKey = n.PrivateKey + api.namesys = n.Namesys + return n.Routing, nil + } + if !settings.Offline { + return n.Routing, nil + } + if !allowOffline { + return nil, coreiface.ErrOffline + } + + //todo: might want to cache this + cfg, err := n.Repo.Config() + if err != nil { + return nil, err + } + + cs := cfg.Ipns.ResolveCacheSize + if cs == 0 { + cs = 128 + } + if cs < 0 { + return nil, fmt.Errorf("cannot specify negative resolve cache size") + } + + offroute := offlineroute.NewOfflineRouter(api.repo.Datastore(), api.recordValidator) + api.namesys = namesys.NewNameSystem(offroute, api.repo.Datastore(), cs) + + return offroute, nil + } + + api.isPublishAllowed = func() error { + if n.Mounts.Ipns != nil && n.Mounts.Ipns.IsActive() { + return errors.New("cannot manually publish while IPNS is mounted") + } + return nil + } + + if settings.Offline { + api.peerstore = nil + api.peerHost = nil + api.namesys = nil + api.recordValidator = nil + api.exchange = nil + } + + return api, nil } // Unixfs returns the UnixfsAPI interface implementation backed by the go-ipfs node diff --git a/core/coreapi/dht.go b/core/coreapi/dht.go index 7825fe6ee..cc5447b3c 100644 --- a/core/coreapi/dht.go +++ b/core/coreapi/dht.go @@ -2,7 +2,6 @@ package coreapi import ( "context" - "errors" "fmt" coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" @@ -22,7 +21,12 @@ import ( type DhtAPI CoreAPI func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (pstore.PeerInfo, error) { - pi, err := api.routing.FindPeer(ctx, peer.ID(p)) + r, err := api.routing(false) + if err != nil { + return pstore.PeerInfo{}, err + } + + pi, err := r.FindPeer(ctx, peer.ID(p)) if err != nil { return pstore.PeerInfo{}, err } @@ -36,6 +40,11 @@ func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ... return nil, err } + r, err := api.routing(false) + if err != nil { + return nil, err + } + rp, err := api.core().ResolvePath(ctx, p) if err != nil { return nil, err @@ -46,7 +55,7 @@ func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ... return nil, fmt.Errorf("number of providers must be greater than 0") } - pchan := api.routing.FindProvidersAsync(ctx, rp.Cid(), numProviders) + pchan := r.FindProvidersAsync(ctx, rp.Cid(), numProviders) return pchan, nil } @@ -56,8 +65,9 @@ func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...cao return err } - if api.routing == nil { - return errors.New("cannot provide in offline mode") + r, err := api.routing(false) + if err != nil { + return err } rp, err := api.core().ResolvePath(ctx, path) @@ -77,9 +87,9 @@ func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...cao } if settings.Recursive { - err = provideKeysRec(ctx, api.routing, api.blockstore, []cid.Cid{c}) + err = provideKeysRec(ctx, r, api.blockstore, []cid.Cid{c}) } else { - err = provideKeys(ctx, api.routing, []cid.Cid{c}) + err = provideKeys(ctx, r, []cid.Cid{c}) } if err != nil { return err diff --git a/core/coreapi/name.go b/core/coreapi/name.go index 130361f68..c912bf185 100644 --- a/core/coreapi/name.go +++ b/core/coreapi/name.go @@ -47,7 +47,8 @@ func (api *NameAPI) Publish(ctx context.Context, p coreiface.Path, opts ...caopt return nil, err } - if err := api.checkRouting(options.AllowOffline); err != nil { + _, err = api.routing(options.AllowOffline) + if err != nil { return nil, err } @@ -88,7 +89,8 @@ func (api *NameAPI) Search(ctx context.Context, name string, opts ...caopts.Name return nil, err } - if err := api.checkRouting(true); err != nil { + r, err := api.routing(true) + if err != nil { return nil, err } @@ -98,13 +100,14 @@ func (api *NameAPI) Search(ctx context.Context, name string, opts ...caopts.Name return nil, errors.New("cannot specify both local and nocache") } + //TODO: can replaced with api.WithOpt(opts.Api.Offline(true)) if options.Local { offroute := offline.NewOfflineRouter(api.repo.Datastore(), api.recordValidator) resolver = namesys.NewIpnsResolver(offroute) } if !options.Cache { - resolver = namesys.NewNameSystem(api.routing, api.repo.Datastore(), 0) + resolver = namesys.NewNameSystem(r, api.repo.Datastore(), 0) } if !strings.HasPrefix(name, "/ipns/") { diff --git a/core/coreapi/pubsub.go b/core/coreapi/pubsub.go index 5dbc6e228..378e3dbc6 100644 --- a/core/coreapi/pubsub.go +++ b/core/coreapi/pubsub.go @@ -30,7 +30,8 @@ type pubSubMessage struct { } func (api *PubSubAPI) Ls(ctx context.Context) ([]string, error) { - if err := api.checkNode(); err != nil { + _, err := api.checkNode() + if err != nil { return nil, err } @@ -38,7 +39,8 @@ func (api *PubSubAPI) Ls(ctx context.Context) ([]string, error) { } func (api *PubSubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOption) ([]peer.ID, error) { - if err := api.checkNode(); err != nil { + _, err := api.checkNode() + if err != nil { return nil, err } @@ -58,7 +60,8 @@ func (api *PubSubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOptio } func (api *PubSubAPI) Publish(ctx context.Context, topic string, data []byte) error { - if err := api.checkNode(); err != nil { + _, err := api.checkNode() + if err != nil { return err } @@ -68,7 +71,8 @@ func (api *PubSubAPI) Publish(ctx context.Context, topic string, data []byte) er func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (coreiface.PubSubSubscription, error) { options, err := caopts.PubSubSubscribeOptions(opts...) - if err := api.checkNode(); err != nil { + r, err := api.checkNode() + if err != nil { return nil, err } @@ -87,7 +91,7 @@ func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopt return } - connectToPubSubPeers(pubctx, api.routing, api.peerHost, blk.Path().Cid()) + connectToPubSubPeers(pubctx, r, api.peerHost, blk.Path().Cid()) }() } @@ -118,16 +122,17 @@ func connectToPubSubPeers(ctx context.Context, r routing.IpfsRouting, ph p2phost wg.Wait() } -func (api *PubSubAPI) checkNode() error { - if err := api.checkRouting(false); err != nil { - return err - } - +func (api *PubSubAPI) checkNode() (routing.IpfsRouting, error) { if api.pubSub == nil { - return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.") + return nil, errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.") } - return nil + r, err := api.routing(false) + if err != nil { + return nil, err + } + + return r, nil } func (sub *pubSubSubscription) Close() error { diff --git a/core/coreapi/unixfs_test.go b/core/coreapi/unixfs_test.go index f6a1e48d6..cf30969f2 100644 --- a/core/coreapi/unixfs_test.go +++ b/core/coreapi/unixfs_test.go @@ -102,7 +102,10 @@ func makeAPISwarm(ctx context.Context, fullIdentity bool, n int) ([]*core.IpfsNo return nil, nil, err } nodes[i] = node - apis[i] = coreapi.NewCoreAPI(node) + apis[i], err = coreapi.NewCoreAPI(node) + if err != nil { + return nil, nil, err + } } err := mn.LinkAll() diff --git a/core/corehttp/gateway.go b/core/corehttp/gateway.go index 5fad3acf3..52560750a 100644 --- a/core/corehttp/gateway.go +++ b/core/corehttp/gateway.go @@ -25,11 +25,16 @@ func GatewayOption(writable bool, paths ...string) ServeOption { return nil, err } + api, err := coreapi.NewCoreAPI(n) + if err != nil { + return nil, err + } + gateway := newGatewayHandler(n, GatewayConfig{ Headers: cfg.Gateway.HTTPHeaders, Writable: writable, PathPrefixes: cfg.Gateway.PathPrefixes, - }, coreapi.NewCoreAPI(n)) + }, api) for _, p := range paths { mux.Handle(p+"/", gateway) diff --git a/fuse/readonly/ipfs_test.go b/fuse/readonly/ipfs_test.go index 8a02d68d1..34749db95 100644 --- a/fuse/readonly/ipfs_test.go +++ b/fuse/readonly/ipfs_test.go @@ -118,7 +118,10 @@ func TestIpfsStressRead(t *testing.T) { nd, mnt := setupIpfsTest(t, nil) defer mnt.Close() - api := coreapi.NewCoreAPI(nd) + api, err := coreapi.NewCoreAPI(nd) + if err != nil { + t.Fatal(err) + } var nodes []ipld.Node var paths []string diff --git a/test/integration/addcat_test.go b/test/integration/addcat_test.go index 0d8a20073..099116445 100644 --- a/test/integration/addcat_test.go +++ b/test/integration/addcat_test.go @@ -120,7 +120,10 @@ func DirectAddCat(data []byte, conf testutil.LatencyConfig) error { } defer catter.Close() - catterApi := coreapi.NewCoreAPI(catter) + catterApi, err := coreapi.NewCoreAPI(catter) + if err != nil { + return err + } err = mn.LinkAll() if err != nil { diff --git a/test/integration/bench_cat_test.go b/test/integration/bench_cat_test.go index d228e45a1..25264fd45 100644 --- a/test/integration/bench_cat_test.go +++ b/test/integration/bench_cat_test.go @@ -66,7 +66,10 @@ func benchCat(b *testing.B, data []byte, conf testutil.LatencyConfig) error { } defer catter.Close() - catterApi := coreapi.NewCoreAPI(catter) + catterApi, err := coreapi.NewCoreAPI(catter) + if err != nil { + return err + } err = mn.LinkAll() if err != nil { diff --git a/test/integration/three_legged_cat_test.go b/test/integration/three_legged_cat_test.go index 200c584e3..b118c3579 100644 --- a/test/integration/three_legged_cat_test.go +++ b/test/integration/three_legged_cat_test.go @@ -103,7 +103,10 @@ func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error { } defer catter.Close() - catterApi := coreapi.NewCoreAPI(catter) + catterApi, err := coreapi.NewCoreAPI(catter) + if err != nil { + return err + } mn.LinkAll()