bigfilestore implements blockstore. bigfilestore moved to avoid import cycle and used in go-ipfs

This commit is contained in:
Adin Schmahmann 2021-02-04 10:26:29 -05:00
parent 58a062367e
commit d5b59a8445
9 changed files with 355 additions and 67 deletions

View File

@ -1,4 +1,4 @@
package coreunix
package bigfilestore
import (
"bytes"
@ -25,7 +25,7 @@ type ChunkingManifestChunk struct {
Size uint64
}
func extractChunkingManifest(ctx context.Context, dagSvc ipld.DAGService, chunkedFileCid cid.Cid) (*ChunkingManifest, error) {
func ExtractChunkingManifest(ctx context.Context, dagSvc ipld.DAGService, chunkedFileCid cid.Cid) (*ChunkingManifest, error) {
getLinks := dag.GetLinksWithDAG(dagSvc)
chunking := &ChunkingManifest{
ChunkedCid: chunkedFileCid,

View File

@ -0,0 +1,297 @@
package bigfilestore
import (
"context"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dsns "github.com/ipfs/go-datastore/namespace"
dsq "github.com/ipfs/go-datastore/query"
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
logging "github.com/ipfs/go-log"
)
var log = logging.Logger("bigfilestore")
type BigFileStore struct {
bs blockstore.Blockstore
dstore ds.Batching
hashOnRead bool
}
// bigFilePrefix namespaces big file datastores
var bigFilePrefix = ds.NewKey("bigfiles")
// NewBigFileStore creates a new bifFileStore
func NewBigFileStore(bstore blockstore.Blockstore, dstore ds.Batching) *BigFileStore {
return &BigFileStore{
bs: bstore,
dstore: dsns.Wrap(dstore, bigFilePrefix),
}
}
func (b *BigFileStore) PutBigBlock(streamCid cid.Cid, chunks []*ChunkingManifestChunk) error {
chunkData, err := serializeChunks(chunks)
if err != nil {
return err
}
dsk := dshelp.CidToDsKey(streamCid)
return b.dstore.Put(dsk, chunkData)
}
func (b *BigFileStore) GetBigBlock(streamCid cid.Cid) ([]*ChunkingManifestChunk, error) {
data, err := b.dstore.Get(dshelp.CidToDsKey(streamCid))
if err != nil {
return nil, err
}
return deserializeChunks(data)
}
// AllKeysChan returns a channel from which to read the keys stored in
// the blockstore. If the given context is cancelled the channel will be closed.
func (b *BigFileStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
ctx, cancel := context.WithCancel(ctx)
a, err := b.bs.AllKeysChan(ctx)
if err != nil {
cancel()
return nil, err
}
out := make(chan cid.Cid, dsq.KeysOnlyBufSize)
go func() {
defer cancel()
defer close(out)
var done bool
for !done {
select {
case c, ok := <-a:
if !ok {
done = true
continue
}
select {
case out <- c:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
// Can't do these at the same time because the abstractions around
// leveldb make us query leveldb for both operations. We apparently
// cant query leveldb concurrently
b, err := b.bsAllKeysChan(ctx)
if err != nil {
log.Error("error querying filestore: ", err)
return
}
done = false
for !done {
select {
case c, ok := <-b:
if !ok {
done = true
continue
}
select {
case out <- c:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out, nil
}
func (b *BigFileStore) bsAllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
// KeysOnly, because that would be _a lot_ of data.
q := dsq.Query{KeysOnly: true}
res, err := b.dstore.Query(q)
if err != nil {
return nil, err
}
output := make(chan cid.Cid, dsq.KeysOnlyBufSize)
go func() {
defer func() {
res.Close() // ensure exit (signals early exit, too)
close(output)
}()
for {
e, ok := res.NextSync()
if !ok {
return
}
if e.Error != nil {
log.Errorf("blockstore.AllKeysChan got err: %s", e.Error)
return
}
// need to convert to key.Key using key.KeyFromDsKey.
k, err := dshelp.DsKeyToCid(ds.RawKey(e.Key))
if err != nil {
log.Warningf("error parsing key from DsKey: %s", err)
continue
}
select {
case <-ctx.Done():
return
case output <- k:
}
}
}()
return output, nil
}
// DeleteBlock deletes the block with the given key from the
// blockstore. As expected, in the case of FileManager blocks, only the
// reference is deleted, not its contents. It may return
// ErrNotFound when the block is not stored.
func (b *BigFileStore) DeleteBlock(c cid.Cid) error {
err1 := b.bs.DeleteBlock(c)
if err1 != nil && err1 != blockstore.ErrNotFound {
return err1
}
err2 := b.dstore.Delete(dshelp.CidToDsKey(c))
// if we successfully removed something from the blockstore, but the
// bigfilestore didnt have it, return success
switch err2 {
case nil:
return nil
case blockstore.ErrNotFound:
if err1 == blockstore.ErrNotFound {
return blockstore.ErrNotFound
}
return nil
default:
return err2
}
}
// Get retrieves the block with the given Cid. It may return
// ErrNotFound when the block is not stored.
func (b *BigFileStore) Get(c cid.Cid) (blocks.Block, error) {
blk, err := b.bs.Get(c)
switch err {
case nil:
return blk, nil
case blockstore.ErrNotFound:
chunks, err := b.GetBigBlock(c)
if err == ds.ErrNotFound {
return nil, blockstore.ErrNotFound
}
if err != nil {
return nil, err
}
var data []byte
for _, chunk := range chunks {
blk, err := b.bs.Get(chunk.ChunkCid)
if err != nil {
return nil, err
}
data = append(data, blk.RawData()...)
}
if b.hashOnRead {
rbcid, err := c.Prefix().Sum(data)
if err != nil {
return nil, err
}
if !rbcid.Equals(c) {
return nil, blockstore.ErrHashMismatch
}
}
return blocks.NewBlockWithCid(data, c)
default:
return nil, err
}
}
// GetSize returns the size of the requested block. It may return ErrNotFound
// when the block is not stored.
func (b *BigFileStore) GetSize(c cid.Cid) (int, error) {
size, err := b.bs.GetSize(c)
switch err {
case nil:
return size, nil
case blockstore.ErrNotFound:
chunks, err := b.GetBigBlock(c)
if err == ds.ErrNotFound {
return 0, blockstore.ErrNotFound
}
if err != nil {
return 0, err
}
sz := 0
for _, chunk := range chunks {
sz += int(chunk.Size)
}
return sz, nil
default:
return -1, err
}
}
// Has returns true if the block with the given Cid is
// stored in the Filestore.
func (b *BigFileStore) Has(c cid.Cid) (bool, error) {
has, err := b.bs.Has(c)
if err != nil {
return false, err
}
if has {
return true, nil
}
return b.dstore.Has(dshelp.CidToDsKey(c))
}
// Put stores a block in the Filestore. For blocks of
// underlying type FilestoreNode, the operation is
// delegated to the FileManager, while the rest of blocks
// are handled by the regular blockstore.
func (b *BigFileStore) Put(blk blocks.Block) error {
has, err := b.Has(blk.Cid())
if err != nil {
return err
}
if has {
return nil
}
return b.bs.Put(blk)
}
// PutMany is like Put(), but takes a slice of blocks, allowing
// the underlying blockstore to perform batch transactions.
func (b *BigFileStore) PutMany(bs []blocks.Block) error {
return b.bs.PutMany(bs)
}
// HashOnRead calls blockstore.HashOnRead.
func (b *BigFileStore) HashOnRead(enabled bool) {
b.bs.HashOnRead(enabled)
b.hashOnRead = true
}
var _ blockstore.Blockstore = (*BigFileStore)(nil)

View File

@ -1,6 +1,7 @@
package coreunix
package bigfilestore
import (
blockstore "github.com/ipfs/go-ipfs-blockstore"
"testing"
"github.com/ipfs/go-cid"
@ -9,7 +10,9 @@ import (
)
func TestBigFileStorePutThenGet(t *testing.T) {
bfs := NewBigFileStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
dstore := ds_sync.MutexWrap(ds.NewMapDatastore())
bstore := blockstore.NewBlockstore(dstore)
bfs := NewBigFileStore(bstore, dstore)
streamCid, err := cid.Parse("QmWQadpxHe1UgAMdkZ5tm7znzqiixwo5u9XLKCtPGLtdDs")
if err != nil {
@ -35,12 +38,12 @@ func TestBigFileStorePutThenGet(t *testing.T) {
}
}
err = bfs.Put(streamCid, chunks)
err = bfs.PutBigBlock(streamCid, chunks)
if err != nil {
t.Fatal(err)
}
chunks2, err := bfs.Get(streamCid)
chunks2, err := bfs.GetBigBlock(streamCid)
if err != nil {
t.Fatal(err)
}

View File

@ -11,6 +11,7 @@ package core
import (
"context"
"github.com/ipfs/go-ipfs/core/bigfilestore"
"io"
"github.com/ipfs/go-filestore"
@ -69,16 +70,17 @@ type IpfsNode struct {
PNetFingerprint libp2p.PNetFingerprint `optional:"true"` // fingerprint of private network
// Services
Peerstore pstore.Peerstore `optional:"true"` // storage for other Peer instances
Blockstore bstore.GCBlockstore // the block store (lower level)
Filestore *filestore.Filestore `optional:"true"` // the filestore blockstore
BaseBlocks node.BaseBlocks // the raw blockstore, no filestore wrapping
GCLocker bstore.GCLocker // the locker used to protect the blockstore during gc
Blocks bserv.BlockService // the block service, get/add blocks.
DAG ipld.DAGService // the merkle dag service, get/add objects.
Resolver *resolver.Resolver // the path resolution system
Reporter *metrics.BandwidthCounter `optional:"true"`
Discovery discovery.Service `optional:"true"`
Peerstore pstore.Peerstore `optional:"true"` // storage for other Peer instances
Blockstore bstore.GCBlockstore // the block store (lower level)
Filestore *filestore.Filestore `optional:"true"` // the filestore blockstore
BigBlockStore *bigfilestore.BigFileStore `optional:"true"` // the bigfile blockstore
BaseBlocks node.BaseBlocks // the raw blockstore, no filestore wrapping
GCLocker bstore.GCLocker // the locker used to protect the blockstore during gc
Blocks bserv.BlockService // the block service, get/add blocks.
DAG ipld.DAGService // the merkle dag service, get/add objects.
Resolver *resolver.Resolver // the path resolution system
Reporter *metrics.BandwidthCounter `optional:"true"`
Discovery discovery.Service `optional:"true"`
FilesRoot *mfs.Root
RecordValidator record.Validator

View File

@ -17,6 +17,7 @@ import (
"context"
"errors"
"fmt"
"github.com/ipfs/go-ipfs/core/bigfilestore"
bserv "github.com/ipfs/go-blockservice"
blockstore "github.com/ipfs/go-ipfs-blockstore"
@ -38,7 +39,6 @@ import (
record "github.com/libp2p/go-libp2p-record"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/core/coreunix"
"github.com/ipfs/go-ipfs/core/node"
"github.com/ipfs/go-ipfs/namesys"
"github.com/ipfs/go-ipfs/repo"
@ -57,7 +57,7 @@ type CoreAPI struct {
blocks bserv.BlockService
dag ipld.DAGService
bfs *coreunix.BigFileStore //XXX
bfs *bigfilestore.BigFileStore
peerstore pstore.Peerstore
peerHost p2phost.Host
@ -169,7 +169,7 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e
blocks: n.Blocks,
dag: n.DAG,
bfs: nil, // XXX: initialize big file store, e.g. coreunix.NewBigFileStore(???),
bfs: n.BigBlockStore,
peerstore: n.Peerstore,
peerHost: n.PeerHost,

View File

@ -5,6 +5,7 @@ import (
"crypto/sha256"
"errors"
"fmt"
"github.com/ipfs/go-ipfs/core/bigfilestore"
"io"
gopath "path"
"strconv"
@ -26,7 +27,7 @@ import (
ihelper "github.com/ipfs/go-unixfs/importer/helpers"
"github.com/ipfs/go-unixfs/importer/trickle"
coreiface "github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/path"
ipfsPath "github.com/ipfs/interface-go-ipfs-core/path"
)
var log = logging.Logger("coreunix")
@ -46,7 +47,7 @@ type syncer interface {
}
// NewAdder Returns a new Adder used for a file add operation.
func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCLocker, ds ipld.DAGService, bfs *BigFileStore) (*Adder, error) {
func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCLocker, ds ipld.DAGService, bfs *bigfilestore.BigFileStore) (*Adder, error) {
bufferedDS := ipld.NewBufferedDAG(ctx, ds)
return &Adder{
@ -70,7 +71,7 @@ type Adder struct {
gcLocker bstore.GCLocker
dagService ipld.DAGService
bufferedDS *ipld.BufferedDAG
bfs *BigFileStore
bfs *bigfilestore.BigFileStore
Out chan<- interface{}
Progress bool
Pin bool
@ -407,8 +408,8 @@ func (adder *Adder) addFile(path string, file files.File) error {
return err
}
if adder.RawFileHash != nil && adder.bfs != nil {
chunkingManifest, err := extractChunkingManifest(adder.ctx, adder.bufferedDS, dagnode.Cid())
if adder.RawFileHash != nil {
chunkingManifest, err := bigfilestore.ExtractChunkingManifest(adder.ctx, adder.bufferedDS, dagnode.Cid())
if err != nil {
return err
}
@ -431,9 +432,19 @@ func (adder *Adder) addFile(path string, file files.File) error {
return err
}
sha256StreamCid := cid.NewCidV1(cid.Raw, mh)
if err = adder.bfs.Put(sha256StreamCid, chunkingManifest.Chunks); err != nil {
if err = adder.bfs.PutBigBlock(sha256StreamCid, chunkingManifest.Chunks); err != nil {
return err
}
sz, err := adder.bfs.GetSize(sha256StreamCid)
if err != nil {
return err
}
adder.Out <- &coreiface.AddEvent{
Path: ipfsPath.IpfsPath(sha256StreamCid),
Bytes: int64(sz),
}
}
}
@ -519,7 +530,7 @@ func getOutput(dagnode ipld.Node) (*coreiface.AddEvent, error) {
}
output := &coreiface.AddEvent{
Path: path.IpfsPath(c),
Path: ipfsPath.IpfsPath(c),
Size: strconv.FormatUint(s, 10),
}

View File

@ -1,41 +0,0 @@
package coreunix
import (
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dsns "github.com/ipfs/go-datastore/namespace"
dshelp "github.com/ipfs/go-ipfs-ds-help"
)
type BigFileStore struct {
dstore ds.Datastore
}
// bigFilePrefix namespaces big file datastores
var bigFilePrefix = ds.NewKey("bigfiles")
// NewBigFileStore creates a new bifFileStore
func NewBigFileStore(dstore ds.Datastore) *BigFileStore {
return &BigFileStore{
dstore: dsns.Wrap(dstore, bigFilePrefix),
}
}
func (b *BigFileStore) Put(streamCid cid.Cid, chunks []*ChunkingManifestChunk) error {
chunkData, err := serializeChunks(chunks)
if err != nil {
return err
}
dsk := dshelp.CidToDsKey(streamCid)
return b.dstore.Put(dsk, chunkData)
}
func (b *BigFileStore) Get(streamCid cid.Cid) ([]*ChunkingManifestChunk, error) {
data, err := b.dstore.Get(dshelp.CidToDsKey(streamCid))
if err != nil {
return nil, err
}
return deserializeChunks(data)
}

View File

@ -165,6 +165,7 @@ func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option {
if cfg.Experimental.FilestoreEnabled || cfg.Experimental.UrlstoreEnabled {
finalBstore = fx.Provide(FilestoreBlockstoreCtor)
}
finalBstore = fx.Provide(BigFileBlockstoreCtor)
return fx.Options(
fx.Provide(RepoConfig),

View File

@ -4,6 +4,7 @@ import (
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
config "github.com/ipfs/go-ipfs-config"
"github.com/ipfs/go-ipfs/core/bigfilestore"
"go.uber.org/fx"
"github.com/ipfs/go-filestore"
@ -72,3 +73,17 @@ func FilestoreBlockstoreCtor(repo repo.Repo, bb BaseBlocks) (gclocker blockstore
bs = gcbs
return
}
// BigBlockstoreCtor wraps base blockstore with GcBlockstore and BigFileStore
// TODO: Filestore support
func BigFileBlockstoreCtor(repo repo.Repo, bb BaseBlocks) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore, bbstore *bigfilestore.BigFileStore) {
gclocker = blockstore.NewGCLocker()
// hash security
bbstore = bigfilestore.NewBigFileStore(bb, repo.Datastore())
gcbs = blockstore.NewGCBlockstore(bbstore, gclocker)
gcbs = &verifbs.VerifBSGC{GCBlockstore: gcbs}
bs = gcbs
return
}