From d5b59a8445b158664e2d8de6df1bfd9a2940a4e9 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 4 Feb 2021 10:26:29 -0500 Subject: [PATCH] bigfilestore implements blockstore. bigfilestore moved to avoid import cycle and used in go-ipfs --- core/{coreunix => bigfilestore}/bigfile.go | 4 +- core/bigfilestore/bigfilestore.go | 297 ++++++++++++++++++ .../bigfilestore_test.go | 11 +- core/core.go | 22 +- core/coreapi/coreapi.go | 6 +- core/coreunix/add.go | 25 +- core/coreunix/bigfilestore.go | 41 --- core/node/groups.go | 1 + core/node/storage.go | 15 + 9 files changed, 355 insertions(+), 67 deletions(-) rename core/{coreunix => bigfilestore}/bigfile.go (97%) create mode 100644 core/bigfilestore/bigfilestore.go rename core/{coreunix => bigfilestore}/bigfilestore_test.go (77%) delete mode 100644 core/coreunix/bigfilestore.go diff --git a/core/coreunix/bigfile.go b/core/bigfilestore/bigfile.go similarity index 97% rename from core/coreunix/bigfile.go rename to core/bigfilestore/bigfile.go index 03ff5e313..d6aa218f7 100644 --- a/core/coreunix/bigfile.go +++ b/core/bigfilestore/bigfile.go @@ -1,4 +1,4 @@ -package coreunix +package bigfilestore import ( "bytes" @@ -25,7 +25,7 @@ type ChunkingManifestChunk struct { Size uint64 } -func extractChunkingManifest(ctx context.Context, dagSvc ipld.DAGService, chunkedFileCid cid.Cid) (*ChunkingManifest, error) { +func ExtractChunkingManifest(ctx context.Context, dagSvc ipld.DAGService, chunkedFileCid cid.Cid) (*ChunkingManifest, error) { getLinks := dag.GetLinksWithDAG(dagSvc) chunking := &ChunkingManifest{ ChunkedCid: chunkedFileCid, diff --git a/core/bigfilestore/bigfilestore.go b/core/bigfilestore/bigfilestore.go new file mode 100644 index 000000000..276985e32 --- /dev/null +++ b/core/bigfilestore/bigfilestore.go @@ -0,0 +1,297 @@ +package bigfilestore + +import ( + "context" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + dsns "github.com/ipfs/go-datastore/namespace" + dsq "github.com/ipfs/go-datastore/query" + blockstore "github.com/ipfs/go-ipfs-blockstore" + dshelp "github.com/ipfs/go-ipfs-ds-help" + logging "github.com/ipfs/go-log" +) + +var log = logging.Logger("bigfilestore") + +type BigFileStore struct { + bs blockstore.Blockstore + dstore ds.Batching + hashOnRead bool +} + +// bigFilePrefix namespaces big file datastores +var bigFilePrefix = ds.NewKey("bigfiles") + +// NewBigFileStore creates a new bifFileStore +func NewBigFileStore(bstore blockstore.Blockstore, dstore ds.Batching) *BigFileStore { + return &BigFileStore{ + bs: bstore, + dstore: dsns.Wrap(dstore, bigFilePrefix), + } +} + +func (b *BigFileStore) PutBigBlock(streamCid cid.Cid, chunks []*ChunkingManifestChunk) error { + chunkData, err := serializeChunks(chunks) + if err != nil { + return err + } + + dsk := dshelp.CidToDsKey(streamCid) + return b.dstore.Put(dsk, chunkData) +} + +func (b *BigFileStore) GetBigBlock(streamCid cid.Cid) ([]*ChunkingManifestChunk, error) { + data, err := b.dstore.Get(dshelp.CidToDsKey(streamCid)) + if err != nil { + return nil, err + } + + return deserializeChunks(data) +} + +// AllKeysChan returns a channel from which to read the keys stored in +// the blockstore. If the given context is cancelled the channel will be closed. +func (b *BigFileStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + ctx, cancel := context.WithCancel(ctx) + + a, err := b.bs.AllKeysChan(ctx) + if err != nil { + cancel() + return nil, err + } + + out := make(chan cid.Cid, dsq.KeysOnlyBufSize) + go func() { + defer cancel() + defer close(out) + + var done bool + for !done { + select { + case c, ok := <-a: + if !ok { + done = true + continue + } + select { + case out <- c: + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } + + // Can't do these at the same time because the abstractions around + // leveldb make us query leveldb for both operations. We apparently + // cant query leveldb concurrently + b, err := b.bsAllKeysChan(ctx) + if err != nil { + log.Error("error querying filestore: ", err) + return + } + + done = false + for !done { + select { + case c, ok := <-b: + if !ok { + done = true + continue + } + select { + case out <- c: + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } + }() + return out, nil +} + +func (b *BigFileStore) bsAllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + // KeysOnly, because that would be _a lot_ of data. + q := dsq.Query{KeysOnly: true} + res, err := b.dstore.Query(q) + if err != nil { + return nil, err + } + + output := make(chan cid.Cid, dsq.KeysOnlyBufSize) + go func() { + defer func() { + res.Close() // ensure exit (signals early exit, too) + close(output) + }() + + for { + e, ok := res.NextSync() + if !ok { + return + } + if e.Error != nil { + log.Errorf("blockstore.AllKeysChan got err: %s", e.Error) + return + } + + // need to convert to key.Key using key.KeyFromDsKey. + k, err := dshelp.DsKeyToCid(ds.RawKey(e.Key)) + if err != nil { + log.Warningf("error parsing key from DsKey: %s", err) + continue + } + + select { + case <-ctx.Done(): + return + case output <- k: + } + } + }() + + return output, nil +} + +// DeleteBlock deletes the block with the given key from the +// blockstore. As expected, in the case of FileManager blocks, only the +// reference is deleted, not its contents. It may return +// ErrNotFound when the block is not stored. +func (b *BigFileStore) DeleteBlock(c cid.Cid) error { + err1 := b.bs.DeleteBlock(c) + if err1 != nil && err1 != blockstore.ErrNotFound { + return err1 + } + + err2 := b.dstore.Delete(dshelp.CidToDsKey(c)) + // if we successfully removed something from the blockstore, but the + // bigfilestore didnt have it, return success + + switch err2 { + case nil: + return nil + case blockstore.ErrNotFound: + if err1 == blockstore.ErrNotFound { + return blockstore.ErrNotFound + } + return nil + default: + return err2 + } +} + +// Get retrieves the block with the given Cid. It may return +// ErrNotFound when the block is not stored. +func (b *BigFileStore) Get(c cid.Cid) (blocks.Block, error) { + blk, err := b.bs.Get(c) + switch err { + case nil: + return blk, nil + case blockstore.ErrNotFound: + chunks, err := b.GetBigBlock(c) + if err == ds.ErrNotFound { + return nil, blockstore.ErrNotFound + } + if err != nil { + return nil, err + } + var data []byte + for _, chunk := range chunks { + blk, err := b.bs.Get(chunk.ChunkCid) + if err != nil { + return nil, err + } + data = append(data, blk.RawData()...) + } + + if b.hashOnRead { + rbcid, err := c.Prefix().Sum(data) + if err != nil { + return nil, err + } + + if !rbcid.Equals(c) { + return nil, blockstore.ErrHashMismatch + } + } + + return blocks.NewBlockWithCid(data, c) + default: + return nil, err + } +} + +// GetSize returns the size of the requested block. It may return ErrNotFound +// when the block is not stored. +func (b *BigFileStore) GetSize(c cid.Cid) (int, error) { + size, err := b.bs.GetSize(c) + switch err { + case nil: + return size, nil + case blockstore.ErrNotFound: + chunks, err := b.GetBigBlock(c) + if err == ds.ErrNotFound { + return 0, blockstore.ErrNotFound + } + if err != nil { + return 0, err + } + sz := 0 + for _, chunk := range chunks { + sz += int(chunk.Size) + } + return sz, nil + default: + return -1, err + } +} + +// Has returns true if the block with the given Cid is +// stored in the Filestore. +func (b *BigFileStore) Has(c cid.Cid) (bool, error) { + has, err := b.bs.Has(c) + if err != nil { + return false, err + } + + if has { + return true, nil + } + + return b.dstore.Has(dshelp.CidToDsKey(c)) +} + +// Put stores a block in the Filestore. For blocks of +// underlying type FilestoreNode, the operation is +// delegated to the FileManager, while the rest of blocks +// are handled by the regular blockstore. +func (b *BigFileStore) Put(blk blocks.Block) error { + has, err := b.Has(blk.Cid()) + if err != nil { + return err + } + + if has { + return nil + } + + return b.bs.Put(blk) +} + +// PutMany is like Put(), but takes a slice of blocks, allowing +// the underlying blockstore to perform batch transactions. +func (b *BigFileStore) PutMany(bs []blocks.Block) error { + return b.bs.PutMany(bs) +} + +// HashOnRead calls blockstore.HashOnRead. +func (b *BigFileStore) HashOnRead(enabled bool) { + b.bs.HashOnRead(enabled) + b.hashOnRead = true +} + +var _ blockstore.Blockstore = (*BigFileStore)(nil) diff --git a/core/coreunix/bigfilestore_test.go b/core/bigfilestore/bigfilestore_test.go similarity index 77% rename from core/coreunix/bigfilestore_test.go rename to core/bigfilestore/bigfilestore_test.go index bebe8162f..6ee1b7ac3 100644 --- a/core/coreunix/bigfilestore_test.go +++ b/core/bigfilestore/bigfilestore_test.go @@ -1,6 +1,7 @@ -package coreunix +package bigfilestore import ( + blockstore "github.com/ipfs/go-ipfs-blockstore" "testing" "github.com/ipfs/go-cid" @@ -9,7 +10,9 @@ import ( ) func TestBigFileStorePutThenGet(t *testing.T) { - bfs := NewBigFileStore(ds_sync.MutexWrap(ds.NewMapDatastore())) + dstore := ds_sync.MutexWrap(ds.NewMapDatastore()) + bstore := blockstore.NewBlockstore(dstore) + bfs := NewBigFileStore(bstore, dstore) streamCid, err := cid.Parse("QmWQadpxHe1UgAMdkZ5tm7znzqiixwo5u9XLKCtPGLtdDs") if err != nil { @@ -35,12 +38,12 @@ func TestBigFileStorePutThenGet(t *testing.T) { } } - err = bfs.Put(streamCid, chunks) + err = bfs.PutBigBlock(streamCid, chunks) if err != nil { t.Fatal(err) } - chunks2, err := bfs.Get(streamCid) + chunks2, err := bfs.GetBigBlock(streamCid) if err != nil { t.Fatal(err) } diff --git a/core/core.go b/core/core.go index d422a1aa8..c3241e0d1 100644 --- a/core/core.go +++ b/core/core.go @@ -11,6 +11,7 @@ package core import ( "context" + "github.com/ipfs/go-ipfs/core/bigfilestore" "io" "github.com/ipfs/go-filestore" @@ -69,16 +70,17 @@ type IpfsNode struct { PNetFingerprint libp2p.PNetFingerprint `optional:"true"` // fingerprint of private network // Services - Peerstore pstore.Peerstore `optional:"true"` // storage for other Peer instances - Blockstore bstore.GCBlockstore // the block store (lower level) - Filestore *filestore.Filestore `optional:"true"` // the filestore blockstore - BaseBlocks node.BaseBlocks // the raw blockstore, no filestore wrapping - GCLocker bstore.GCLocker // the locker used to protect the blockstore during gc - Blocks bserv.BlockService // the block service, get/add blocks. - DAG ipld.DAGService // the merkle dag service, get/add objects. - Resolver *resolver.Resolver // the path resolution system - Reporter *metrics.BandwidthCounter `optional:"true"` - Discovery discovery.Service `optional:"true"` + Peerstore pstore.Peerstore `optional:"true"` // storage for other Peer instances + Blockstore bstore.GCBlockstore // the block store (lower level) + Filestore *filestore.Filestore `optional:"true"` // the filestore blockstore + BigBlockStore *bigfilestore.BigFileStore `optional:"true"` // the bigfile blockstore + BaseBlocks node.BaseBlocks // the raw blockstore, no filestore wrapping + GCLocker bstore.GCLocker // the locker used to protect the blockstore during gc + Blocks bserv.BlockService // the block service, get/add blocks. + DAG ipld.DAGService // the merkle dag service, get/add objects. + Resolver *resolver.Resolver // the path resolution system + Reporter *metrics.BandwidthCounter `optional:"true"` + Discovery discovery.Service `optional:"true"` FilesRoot *mfs.Root RecordValidator record.Validator diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index 8bad01c27..ca1198a52 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "github.com/ipfs/go-ipfs/core/bigfilestore" bserv "github.com/ipfs/go-blockservice" blockstore "github.com/ipfs/go-ipfs-blockstore" @@ -38,7 +39,6 @@ import ( record "github.com/libp2p/go-libp2p-record" "github.com/ipfs/go-ipfs/core" - "github.com/ipfs/go-ipfs/core/coreunix" "github.com/ipfs/go-ipfs/core/node" "github.com/ipfs/go-ipfs/namesys" "github.com/ipfs/go-ipfs/repo" @@ -57,7 +57,7 @@ type CoreAPI struct { blocks bserv.BlockService dag ipld.DAGService - bfs *coreunix.BigFileStore //XXX + bfs *bigfilestore.BigFileStore peerstore pstore.Peerstore peerHost p2phost.Host @@ -169,7 +169,7 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e blocks: n.Blocks, dag: n.DAG, - bfs: nil, // XXX: initialize big file store, e.g. coreunix.NewBigFileStore(???), + bfs: n.BigBlockStore, peerstore: n.Peerstore, peerHost: n.PeerHost, diff --git a/core/coreunix/add.go b/core/coreunix/add.go index 62c465ea3..91a2f9193 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -5,6 +5,7 @@ import ( "crypto/sha256" "errors" "fmt" + "github.com/ipfs/go-ipfs/core/bigfilestore" "io" gopath "path" "strconv" @@ -26,7 +27,7 @@ import ( ihelper "github.com/ipfs/go-unixfs/importer/helpers" "github.com/ipfs/go-unixfs/importer/trickle" coreiface "github.com/ipfs/interface-go-ipfs-core" - "github.com/ipfs/interface-go-ipfs-core/path" + ipfsPath "github.com/ipfs/interface-go-ipfs-core/path" ) var log = logging.Logger("coreunix") @@ -46,7 +47,7 @@ type syncer interface { } // NewAdder Returns a new Adder used for a file add operation. -func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCLocker, ds ipld.DAGService, bfs *BigFileStore) (*Adder, error) { +func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCLocker, ds ipld.DAGService, bfs *bigfilestore.BigFileStore) (*Adder, error) { bufferedDS := ipld.NewBufferedDAG(ctx, ds) return &Adder{ @@ -70,7 +71,7 @@ type Adder struct { gcLocker bstore.GCLocker dagService ipld.DAGService bufferedDS *ipld.BufferedDAG - bfs *BigFileStore + bfs *bigfilestore.BigFileStore Out chan<- interface{} Progress bool Pin bool @@ -407,8 +408,8 @@ func (adder *Adder) addFile(path string, file files.File) error { return err } - if adder.RawFileHash != nil && adder.bfs != nil { - chunkingManifest, err := extractChunkingManifest(adder.ctx, adder.bufferedDS, dagnode.Cid()) + if adder.RawFileHash != nil { + chunkingManifest, err := bigfilestore.ExtractChunkingManifest(adder.ctx, adder.bufferedDS, dagnode.Cid()) if err != nil { return err } @@ -431,9 +432,19 @@ func (adder *Adder) addFile(path string, file files.File) error { return err } sha256StreamCid := cid.NewCidV1(cid.Raw, mh) - if err = adder.bfs.Put(sha256StreamCid, chunkingManifest.Chunks); err != nil { + if err = adder.bfs.PutBigBlock(sha256StreamCid, chunkingManifest.Chunks); err != nil { return err } + + sz, err := adder.bfs.GetSize(sha256StreamCid) + if err != nil { + return err + } + + adder.Out <- &coreiface.AddEvent{ + Path: ipfsPath.IpfsPath(sha256StreamCid), + Bytes: int64(sz), + } } } @@ -519,7 +530,7 @@ func getOutput(dagnode ipld.Node) (*coreiface.AddEvent, error) { } output := &coreiface.AddEvent{ - Path: path.IpfsPath(c), + Path: ipfsPath.IpfsPath(c), Size: strconv.FormatUint(s, 10), } diff --git a/core/coreunix/bigfilestore.go b/core/coreunix/bigfilestore.go deleted file mode 100644 index 7cee4ec9c..000000000 --- a/core/coreunix/bigfilestore.go +++ /dev/null @@ -1,41 +0,0 @@ -package coreunix - -import ( - "github.com/ipfs/go-cid" - ds "github.com/ipfs/go-datastore" - dsns "github.com/ipfs/go-datastore/namespace" - dshelp "github.com/ipfs/go-ipfs-ds-help" -) - -type BigFileStore struct { - dstore ds.Datastore -} - -// bigFilePrefix namespaces big file datastores -var bigFilePrefix = ds.NewKey("bigfiles") - -// NewBigFileStore creates a new bifFileStore -func NewBigFileStore(dstore ds.Datastore) *BigFileStore { - return &BigFileStore{ - dstore: dsns.Wrap(dstore, bigFilePrefix), - } -} - -func (b *BigFileStore) Put(streamCid cid.Cid, chunks []*ChunkingManifestChunk) error { - chunkData, err := serializeChunks(chunks) - if err != nil { - return err - } - - dsk := dshelp.CidToDsKey(streamCid) - return b.dstore.Put(dsk, chunkData) -} - -func (b *BigFileStore) Get(streamCid cid.Cid) ([]*ChunkingManifestChunk, error) { - data, err := b.dstore.Get(dshelp.CidToDsKey(streamCid)) - if err != nil { - return nil, err - } - - return deserializeChunks(data) -} diff --git a/core/node/groups.go b/core/node/groups.go index 90245ba05..93bfb27cb 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -165,6 +165,7 @@ func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option { if cfg.Experimental.FilestoreEnabled || cfg.Experimental.UrlstoreEnabled { finalBstore = fx.Provide(FilestoreBlockstoreCtor) } + finalBstore = fx.Provide(BigFileBlockstoreCtor) return fx.Options( fx.Provide(RepoConfig), diff --git a/core/node/storage.go b/core/node/storage.go index 92d475339..37bfeaf04 100644 --- a/core/node/storage.go +++ b/core/node/storage.go @@ -4,6 +4,7 @@ import ( "github.com/ipfs/go-datastore" blockstore "github.com/ipfs/go-ipfs-blockstore" config "github.com/ipfs/go-ipfs-config" + "github.com/ipfs/go-ipfs/core/bigfilestore" "go.uber.org/fx" "github.com/ipfs/go-filestore" @@ -72,3 +73,17 @@ func FilestoreBlockstoreCtor(repo repo.Repo, bb BaseBlocks) (gclocker blockstore bs = gcbs return } + +// BigBlockstoreCtor wraps base blockstore with GcBlockstore and BigFileStore +// TODO: Filestore support +func BigFileBlockstoreCtor(repo repo.Repo, bb BaseBlocks) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore, bbstore *bigfilestore.BigFileStore) { + gclocker = blockstore.NewGCLocker() + + // hash security + bbstore = bigfilestore.NewBigFileStore(bb, repo.Datastore()) + gcbs = blockstore.NewGCBlockstore(bbstore, gclocker) + gcbs = &verifbs.VerifBSGC{GCBlockstore: gcbs} + + bs = gcbs + return +}