Merge pull request #6511 from ipfs/extract/filestore

Extract Filestore
This commit is contained in:
Steven Allen 2019-07-15 07:22:16 -07:00 committed by GitHub
commit 333c9d1613
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 16 additions and 1473 deletions

View File

@ -51,10 +51,6 @@ endif
dir := pin/internal/pb
include $(dir)/Rules.mk
dir := filestore/pb
include $(dir)/Rules.mk
# -------------------- #
# universal rules #
# -------------------- #

View File

@ -5,10 +5,10 @@ import (
"io"
"os"
filestore "github.com/ipfs/go-filestore"
core "github.com/ipfs/go-ipfs/core"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
e "github.com/ipfs/go-ipfs/core/commands/e"
filestore "github.com/ipfs/go-ipfs/filestore"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-cmds"

View File

@ -5,8 +5,8 @@ import (
"io"
"net/url"
filestore "github.com/ipfs/go-filestore"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
filestore "github.com/ipfs/go-ipfs/filestore"
cmds "github.com/ipfs/go-ipfs-cmds"
files "github.com/ipfs/go-ipfs-files"

View File

@ -13,11 +13,11 @@ import (
"context"
"io"
"github.com/ipfs/go-filestore"
version "github.com/ipfs/go-ipfs"
"github.com/ipfs/go-ipfs/core/bootstrap"
"github.com/ipfs/go-ipfs/core/node"
"github.com/ipfs/go-ipfs/core/node/libp2p"
"github.com/ipfs/go-ipfs/filestore"
"github.com/ipfs/go-ipfs/fuse/mount"
"github.com/ipfs/go-ipfs/namesys"
ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher"

View File

@ -8,11 +8,11 @@ import (
"path/filepath"
"testing"
"github.com/ipfs/go-filestore"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/core/bootstrap"
"github.com/ipfs/go-ipfs/core/coreapi"
mock "github.com/ipfs/go-ipfs/core/mock"
"github.com/ipfs/go-ipfs/filestore"
"github.com/ipfs/go-ipfs/keystore"
"github.com/ipfs/go-ipfs/repo"

View File

@ -11,8 +11,8 @@ import (
config "github.com/ipfs/go-ipfs-config"
"go.uber.org/fx"
"github.com/ipfs/go-filestore"
"github.com/ipfs/go-ipfs/core/node/helpers"
"github.com/ipfs/go-ipfs/filestore"
"github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs/thirdparty/cidv0v1"
"github.com/ipfs/go-ipfs/thirdparty/verifbs"

View File

@ -1,251 +0,0 @@
// Package filestore implements a Blockstore which is able to read certain
// blocks of data directly from its original location in the filesystem.
//
// In a Filestore, object leaves are stored as FilestoreNodes. FilestoreNodes
// include a filesystem path and an offset, allowing a Blockstore dealing with
// such blocks to avoid storing the whole contents and reading them from their
// filesystem location instead.
package filestore
import (
"context"
"errors"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
dsq "github.com/ipfs/go-datastore/query"
blockstore "github.com/ipfs/go-ipfs-blockstore"
posinfo "github.com/ipfs/go-ipfs-posinfo"
logging "github.com/ipfs/go-log"
)
var log = logging.Logger("filestore")
var ErrFilestoreNotEnabled = errors.New("filestore is not enabled, see https://git.io/vNItf")
var ErrUrlstoreNotEnabled = errors.New("urlstore is not enabled")
// Filestore implements a Blockstore by combining a standard Blockstore
// to store regular blocks and a special Blockstore called
// FileManager to store blocks which data exists in an external file.
type Filestore struct {
fm *FileManager
bs blockstore.Blockstore
}
// FileManager returns the FileManager in Filestore.
func (f *Filestore) FileManager() *FileManager {
return f.fm
}
// MainBlockstore returns the standard Blockstore in the Filestore.
func (f *Filestore) MainBlockstore() blockstore.Blockstore {
return f.bs
}
// NewFilestore creates one using the given Blockstore and FileManager.
func NewFilestore(bs blockstore.Blockstore, fm *FileManager) *Filestore {
return &Filestore{fm, bs}
}
// AllKeysChan returns a channel from which to read the keys stored in
// the blockstore. If the given context is cancelled the channel will be closed.
func (f *Filestore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
ctx, cancel := context.WithCancel(ctx)
a, err := f.bs.AllKeysChan(ctx)
if err != nil {
cancel()
return nil, err
}
out := make(chan cid.Cid, dsq.KeysOnlyBufSize)
go func() {
defer cancel()
defer close(out)
var done bool
for !done {
select {
case c, ok := <-a:
if !ok {
done = true
continue
}
select {
case out <- c:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
// Can't do these at the same time because the abstractions around
// leveldb make us query leveldb for both operations. We apparently
// cant query leveldb concurrently
b, err := f.fm.AllKeysChan(ctx)
if err != nil {
log.Error("error querying filestore: ", err)
return
}
done = false
for !done {
select {
case c, ok := <-b:
if !ok {
done = true
continue
}
select {
case out <- c:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out, nil
}
// DeleteBlock deletes the block with the given key from the
// blockstore. As expected, in the case of FileManager blocks, only the
// reference is deleted, not its contents. It may return
// ErrNotFound when the block is not stored.
func (f *Filestore) DeleteBlock(c cid.Cid) error {
err1 := f.bs.DeleteBlock(c)
if err1 != nil && err1 != blockstore.ErrNotFound {
return err1
}
err2 := f.fm.DeleteBlock(c)
// if we successfully removed something from the blockstore, but the
// filestore didnt have it, return success
switch err2 {
case nil:
return nil
case blockstore.ErrNotFound:
if err1 == blockstore.ErrNotFound {
return blockstore.ErrNotFound
}
return nil
default:
return err2
}
}
// Get retrieves the block with the given Cid. It may return
// ErrNotFound when the block is not stored.
func (f *Filestore) Get(c cid.Cid) (blocks.Block, error) {
blk, err := f.bs.Get(c)
switch err {
case nil:
return blk, nil
case blockstore.ErrNotFound:
return f.fm.Get(c)
default:
return nil, err
}
}
// GetSize returns the size of the requested block. It may return ErrNotFound
// when the block is not stored.
func (f *Filestore) GetSize(c cid.Cid) (int, error) {
size, err := f.bs.GetSize(c)
switch err {
case nil:
return size, nil
case blockstore.ErrNotFound:
return f.fm.GetSize(c)
default:
return -1, err
}
}
// Has returns true if the block with the given Cid is
// stored in the Filestore.
func (f *Filestore) Has(c cid.Cid) (bool, error) {
has, err := f.bs.Has(c)
if err != nil {
return false, err
}
if has {
return true, nil
}
return f.fm.Has(c)
}
// Put stores a block in the Filestore. For blocks of
// underlying type FilestoreNode, the operation is
// delegated to the FileManager, while the rest of blocks
// are handled by the regular blockstore.
func (f *Filestore) Put(b blocks.Block) error {
has, err := f.Has(b.Cid())
if err != nil {
return err
}
if has {
return nil
}
switch b := b.(type) {
case *posinfo.FilestoreNode:
return f.fm.Put(b)
default:
return f.bs.Put(b)
}
}
// PutMany is like Put(), but takes a slice of blocks, allowing
// the underlying blockstore to perform batch transactions.
func (f *Filestore) PutMany(bs []blocks.Block) error {
var normals []blocks.Block
var fstores []*posinfo.FilestoreNode
for _, b := range bs {
has, err := f.Has(b.Cid())
if err != nil {
return err
}
if has {
continue
}
switch b := b.(type) {
case *posinfo.FilestoreNode:
fstores = append(fstores, b)
default:
normals = append(normals, b)
}
}
if len(normals) > 0 {
err := f.bs.PutMany(normals)
if err != nil {
return err
}
}
if len(fstores) > 0 {
err := f.fm.PutMany(fstores)
if err != nil {
return err
}
}
return nil
}
// HashOnRead calls blockstore.HashOnRead.
func (f *Filestore) HashOnRead(enabled bool) {
f.bs.HashOnRead(enabled)
}
var _ blockstore.Blockstore = (*Filestore)(nil)

View File

@ -1,177 +0,0 @@
package filestore
import (
"bytes"
"context"
"io/ioutil"
"math/rand"
"testing"
dag "github.com/ipfs/go-merkledag"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
posinfo "github.com/ipfs/go-ipfs-posinfo"
)
func newTestFilestore(t *testing.T) (string, *Filestore) {
mds := ds.NewMapDatastore()
testdir, err := ioutil.TempDir("", "filestore-test")
if err != nil {
t.Fatal(err)
}
fm := NewFileManager(mds, testdir)
fm.AllowFiles = true
bs := blockstore.NewBlockstore(mds)
fstore := NewFilestore(bs, fm)
return testdir, fstore
}
func makeFile(dir string, data []byte) (string, error) {
f, err := ioutil.TempFile(dir, "file")
if err != nil {
return "", err
}
_, err = f.Write(data)
if err != nil {
return "", err
}
return f.Name(), nil
}
func TestBasicFilestore(t *testing.T) {
dir, fs := newTestFilestore(t)
buf := make([]byte, 1000)
rand.Read(buf)
fname, err := makeFile(dir, buf)
if err != nil {
t.Fatal(err)
}
var cids []cid.Cid
for i := 0; i < 100; i++ {
n := &posinfo.FilestoreNode{
PosInfo: &posinfo.PosInfo{
FullPath: fname,
Offset: uint64(i * 10),
},
Node: dag.NewRawNode(buf[i*10 : (i+1)*10]),
}
err := fs.Put(n)
if err != nil {
t.Fatal(err)
}
cids = append(cids, n.Node.Cid())
}
for i, c := range cids {
blk, err := fs.Get(c)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(blk.RawData(), buf[i*10:(i+1)*10]) {
t.Fatal("data didnt match on the way out")
}
}
kch, err := fs.AllKeysChan(context.Background())
if err != nil {
t.Fatal(err)
}
out := make(map[string]struct{})
for c := range kch {
out[c.KeyString()] = struct{}{}
}
if len(out) != len(cids) {
t.Fatal("mismatch in number of entries")
}
for _, c := range cids {
if _, ok := out[c.KeyString()]; !ok {
t.Fatal("missing cid: ", c)
}
}
}
func randomFileAdd(t *testing.T, fs *Filestore, dir string, size int) (string, []cid.Cid) {
buf := make([]byte, size)
rand.Read(buf)
fname, err := makeFile(dir, buf)
if err != nil {
t.Fatal(err)
}
var out []cid.Cid
for i := 0; i < size/10; i++ {
n := &posinfo.FilestoreNode{
PosInfo: &posinfo.PosInfo{
FullPath: fname,
Offset: uint64(i * 10),
},
Node: dag.NewRawNode(buf[i*10 : (i+1)*10]),
}
err := fs.Put(n)
if err != nil {
t.Fatal(err)
}
out = append(out, n.Cid())
}
return fname, out
}
func TestDeletes(t *testing.T) {
dir, fs := newTestFilestore(t)
_, cids := randomFileAdd(t, fs, dir, 100)
todelete := cids[:4]
for _, c := range todelete {
err := fs.DeleteBlock(c)
if err != nil {
t.Fatal(err)
}
}
deleted := make(map[string]bool)
for _, c := range todelete {
_, err := fs.Get(c)
if err != blockstore.ErrNotFound {
t.Fatal("expected blockstore not found error")
}
deleted[c.KeyString()] = true
}
keys, err := fs.AllKeysChan(context.Background())
if err != nil {
t.Fatal(err)
}
for c := range keys {
if deleted[c.KeyString()] {
t.Fatal("shouldnt have reference to this key anymore")
}
}
}
func TestIsURL(t *testing.T) {
if !IsURL("http://www.example.com") {
t.Fatal("IsURL failed: http://www.example.com")
}
if !IsURL("https://www.example.com") {
t.Fatal("IsURL failed: https://www.example.com")
}
if IsURL("adir/afile") || IsURL("http:/ /afile") || IsURL("http:/a/file") {
t.Fatal("IsURL recognized non-url")
}
}

View File

@ -1,330 +0,0 @@
package filestore
import (
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
pb "github.com/ipfs/go-ipfs/filestore/pb"
proto "github.com/gogo/protobuf/proto"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dsns "github.com/ipfs/go-datastore/namespace"
dsq "github.com/ipfs/go-datastore/query"
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
posinfo "github.com/ipfs/go-ipfs-posinfo"
)
// FilestorePrefix identifies the key prefix for FileManager blocks.
var FilestorePrefix = ds.NewKey("filestore")
// FileManager is a blockstore implementation which stores special
// blocks FilestoreNode type. These nodes only contain a reference
// to the actual location of the block data in the filesystem
// (a path and an offset).
type FileManager struct {
AllowFiles bool
AllowUrls bool
ds ds.Batching
root string
}
// CorruptReferenceError implements the error interface.
// It is used to indicate that the block contents pointed
// by the referencing blocks cannot be retrieved (i.e. the
// file is not found, or the data changed as it was being read).
type CorruptReferenceError struct {
Code Status
Err error
}
// Error() returns the error message in the CorruptReferenceError
// as a string.
func (c CorruptReferenceError) Error() string {
return c.Err.Error()
}
// NewFileManager initializes a new file manager with the given
// datastore and root. All FilestoreNodes paths are relative to the
// root path given here, which is prepended for any operations.
func NewFileManager(ds ds.Batching, root string) *FileManager {
return &FileManager{ds: dsns.Wrap(ds, FilestorePrefix), root: root}
}
// AllKeysChan returns a channel from which to read the keys stored in
// the FileManager. If the given context is cancelled the channel will be
// closed.
func (f *FileManager) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
q := dsq.Query{KeysOnly: true}
res, err := f.ds.Query(q)
if err != nil {
return nil, err
}
out := make(chan cid.Cid, dsq.KeysOnlyBufSize)
go func() {
defer close(out)
for {
v, ok := res.NextSync()
if !ok {
return
}
k := ds.RawKey(v.Key)
c, err := dshelp.DsKeyToCid(k)
if err != nil {
log.Errorf("decoding cid from filestore: %s", err)
continue
}
select {
case out <- c:
case <-ctx.Done():
return
}
}
}()
return out, nil
}
// DeleteBlock deletes the reference-block from the underlying
// datastore. It does not touch the referenced data.
func (f *FileManager) DeleteBlock(c cid.Cid) error {
err := f.ds.Delete(dshelp.CidToDsKey(c))
if err == ds.ErrNotFound {
return blockstore.ErrNotFound
}
return err
}
// Get reads a block from the datastore. Reading a block
// is done in two steps: the first step retrieves the reference
// block from the datastore. The second step uses the stored
// path and offsets to read the raw block data directly from disk.
func (f *FileManager) Get(c cid.Cid) (blocks.Block, error) {
dobj, err := f.getDataObj(c)
if err != nil {
return nil, err
}
out, err := f.readDataObj(c, dobj)
if err != nil {
return nil, err
}
return blocks.NewBlockWithCid(out, c)
}
// GetSize gets the size of the block from the datastore.
//
// This method may successfully return the size even if returning the block
// would fail because the associated file is no longer available.
func (f *FileManager) GetSize(c cid.Cid) (int, error) {
dobj, err := f.getDataObj(c)
if err != nil {
return -1, err
}
return int(dobj.GetSize_()), nil
}
func (f *FileManager) readDataObj(c cid.Cid, d *pb.DataObj) ([]byte, error) {
if IsURL(d.GetFilePath()) {
return f.readURLDataObj(c, d)
}
return f.readFileDataObj(c, d)
}
func (f *FileManager) getDataObj(c cid.Cid) (*pb.DataObj, error) {
o, err := f.ds.Get(dshelp.CidToDsKey(c))
switch err {
case ds.ErrNotFound:
return nil, blockstore.ErrNotFound
default:
return nil, err
case nil:
//
}
return unmarshalDataObj(o)
}
func unmarshalDataObj(data []byte) (*pb.DataObj, error) {
var dobj pb.DataObj
if err := proto.Unmarshal(data, &dobj); err != nil {
return nil, err
}
return &dobj, nil
}
func (f *FileManager) readFileDataObj(c cid.Cid, d *pb.DataObj) ([]byte, error) {
if !f.AllowFiles {
return nil, ErrFilestoreNotEnabled
}
p := filepath.FromSlash(d.GetFilePath())
abspath := filepath.Join(f.root, p)
fi, err := os.Open(abspath)
if os.IsNotExist(err) {
return nil, &CorruptReferenceError{StatusFileNotFound, err}
} else if err != nil {
return nil, &CorruptReferenceError{StatusFileError, err}
}
defer fi.Close()
_, err = fi.Seek(int64(d.GetOffset()), io.SeekStart)
if err != nil {
return nil, &CorruptReferenceError{StatusFileError, err}
}
outbuf := make([]byte, d.GetSize_())
_, err = io.ReadFull(fi, outbuf)
if err == io.EOF || err == io.ErrUnexpectedEOF {
return nil, &CorruptReferenceError{StatusFileChanged, err}
} else if err != nil {
return nil, &CorruptReferenceError{StatusFileError, err}
}
outcid, err := c.Prefix().Sum(outbuf)
if err != nil {
return nil, err
}
if !c.Equals(outcid) {
return nil, &CorruptReferenceError{StatusFileChanged,
fmt.Errorf("data in file did not match. %s offset %d", d.GetFilePath(), d.GetOffset())}
}
return outbuf, nil
}
// reads and verifies the block from URL
func (f *FileManager) readURLDataObj(c cid.Cid, d *pb.DataObj) ([]byte, error) {
if !f.AllowUrls {
return nil, ErrUrlstoreNotEnabled
}
req, err := http.NewRequest("GET", d.GetFilePath(), nil)
if err != nil {
return nil, err
}
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", d.GetOffset(), d.GetOffset()+d.GetSize_()-1))
res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, &CorruptReferenceError{StatusFileError, err}
}
if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusPartialContent {
return nil, &CorruptReferenceError{StatusFileError,
fmt.Errorf("expected HTTP 200 or 206 got %d", res.StatusCode)}
}
outbuf := make([]byte, d.GetSize_())
_, err = io.ReadFull(res.Body, outbuf)
if err == io.EOF || err == io.ErrUnexpectedEOF {
return nil, &CorruptReferenceError{StatusFileChanged, err}
} else if err != nil {
return nil, &CorruptReferenceError{StatusFileError, err}
}
res.Body.Close()
outcid, err := c.Prefix().Sum(outbuf)
if err != nil {
return nil, err
}
if !c.Equals(outcid) {
return nil, &CorruptReferenceError{StatusFileChanged,
fmt.Errorf("data in file did not match. %s offset %d", d.GetFilePath(), d.GetOffset())}
}
return outbuf, nil
}
// Has returns if the FileManager is storing a block reference. It does not
// validate the data, nor checks if the reference is valid.
func (f *FileManager) Has(c cid.Cid) (bool, error) {
// NOTE: interesting thing to consider. Has doesnt validate the data.
// So the data on disk could be invalid, and we could think we have it.
dsk := dshelp.CidToDsKey(c)
return f.ds.Has(dsk)
}
type putter interface {
Put(ds.Key, []byte) error
}
// Put adds a new reference block to the FileManager. It does not check
// that the reference is valid.
func (f *FileManager) Put(b *posinfo.FilestoreNode) error {
return f.putTo(b, f.ds)
}
func (f *FileManager) putTo(b *posinfo.FilestoreNode, to putter) error {
var dobj pb.DataObj
if IsURL(b.PosInfo.FullPath) {
if !f.AllowUrls {
return ErrUrlstoreNotEnabled
}
dobj.FilePath = b.PosInfo.FullPath
} else {
if !f.AllowFiles {
return ErrFilestoreNotEnabled
}
if !filepath.HasPrefix(b.PosInfo.FullPath, f.root) { //nolint:staticcheck
return fmt.Errorf("cannot add filestore references outside ipfs root (%s)", f.root)
}
p, err := filepath.Rel(f.root, b.PosInfo.FullPath)
if err != nil {
return err
}
dobj.FilePath = filepath.ToSlash(p)
}
dobj.Offset = b.PosInfo.Offset
dobj.Size_ = uint64(len(b.RawData()))
data, err := proto.Marshal(&dobj)
if err != nil {
return err
}
return to.Put(dshelp.CidToDsKey(b.Cid()), data)
}
// PutMany is like Put() but takes a slice of blocks instead,
// allowing it to create a batch transaction.
func (f *FileManager) PutMany(bs []*posinfo.FilestoreNode) error {
batch, err := f.ds.Batch()
if err != nil {
return err
}
for _, b := range bs {
if err := f.putTo(b, batch); err != nil {
return err
}
}
return batch.Commit()
}
// IsURL returns true if the string represents a valid URL that the
// urlstore can handle. More specifically it returns true if a string
// begins with 'http://' or 'https://'.
func IsURL(str string) bool {
return (len(str) > 7 && str[0] == 'h' && str[1] == 't' && str[2] == 't' && str[3] == 'p') &&
((len(str) > 8 && str[4] == 's' && str[5] == ':' && str[6] == '/' && str[7] == '/') ||
(str[4] == ':' && str[5] == '/' && str[6] == '/'))
}

View File

@ -1,8 +0,0 @@
include mk/header.mk
PB_$(d) = $(wildcard $(d)/*.proto)
TGTS_$(d) = $(PB_$(d):.proto=.pb.go)
#DEPS_GO += $(TGTS_$(d))
include mk/footer.mk

View File

@ -1,399 +0,0 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: filestore/pb/dataobj.proto
package datastore_pb
import (
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type DataObj struct {
FilePath string `protobuf:"bytes,1,opt,name=FilePath" json:"FilePath"`
Offset uint64 `protobuf:"varint,2,opt,name=Offset" json:"Offset"`
Size_ uint64 `protobuf:"varint,3,opt,name=Size" json:"Size"`
}
func (m *DataObj) Reset() { *m = DataObj{} }
func (m *DataObj) String() string { return proto.CompactTextString(m) }
func (*DataObj) ProtoMessage() {}
func (*DataObj) Descriptor() ([]byte, []int) {
return fileDescriptor_86a3613fbaff9a6c, []int{0}
}
func (m *DataObj) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *DataObj) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_DataObj.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *DataObj) XXX_Merge(src proto.Message) {
xxx_messageInfo_DataObj.Merge(m, src)
}
func (m *DataObj) XXX_Size() int {
return m.Size()
}
func (m *DataObj) XXX_DiscardUnknown() {
xxx_messageInfo_DataObj.DiscardUnknown(m)
}
var xxx_messageInfo_DataObj proto.InternalMessageInfo
func (m *DataObj) GetFilePath() string {
if m != nil {
return m.FilePath
}
return ""
}
func (m *DataObj) GetOffset() uint64 {
if m != nil {
return m.Offset
}
return 0
}
func (m *DataObj) GetSize_() uint64 {
if m != nil {
return m.Size_
}
return 0
}
func init() {
proto.RegisterType((*DataObj)(nil), "datastore.pb.DataObj")
}
func init() { proto.RegisterFile("filestore/pb/dataobj.proto", fileDescriptor_86a3613fbaff9a6c) }
var fileDescriptor_86a3613fbaff9a6c = []byte{
// 160 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x4a, 0xcb, 0xcc, 0x49,
0x2d, 0x2e, 0xc9, 0x2f, 0x4a, 0xd5, 0x2f, 0x48, 0xd2, 0x4f, 0x49, 0x2c, 0x49, 0xcc, 0x4f, 0xca,
0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x01, 0x71, 0xc1, 0x72, 0x7a, 0x05, 0x49, 0x4a,
0xc9, 0x5c, 0xec, 0x2e, 0x89, 0x25, 0x89, 0xfe, 0x49, 0x59, 0x42, 0x0a, 0x5c, 0x1c, 0x6e, 0x99,
0x39, 0xa9, 0x01, 0x89, 0x25, 0x19, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x4e, 0x2c, 0x27, 0xee,
0xc9, 0x33, 0x04, 0xc1, 0x45, 0x85, 0x64, 0xb8, 0xd8, 0xfc, 0xd3, 0xd2, 0x8a, 0x53, 0x4b, 0x24,
0x98, 0x14, 0x18, 0x35, 0x58, 0xa0, 0xf2, 0x50, 0x31, 0x21, 0x09, 0x2e, 0x96, 0xe0, 0xcc, 0xaa,
0x54, 0x09, 0x66, 0x24, 0x39, 0xb0, 0x88, 0x93, 0xc4, 0x89, 0x47, 0x72, 0x8c, 0x17, 0x1e, 0xc9,
0x31, 0x3e, 0x78, 0x24, 0xc7, 0x38, 0xe1, 0xb1, 0x1c, 0xc3, 0x85, 0xc7, 0x72, 0x0c, 0x37, 0x1e,
0xcb, 0x31, 0x00, 0x02, 0x00, 0x00, 0xff, 0xff, 0x7f, 0x87, 0xf5, 0x88, 0xa9, 0x00, 0x00, 0x00,
}
func (m *DataObj) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *DataObj) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
dAtA[i] = 0xa
i++
i = encodeVarintDataobj(dAtA, i, uint64(len(m.FilePath)))
i += copy(dAtA[i:], m.FilePath)
dAtA[i] = 0x10
i++
i = encodeVarintDataobj(dAtA, i, uint64(m.Offset))
dAtA[i] = 0x18
i++
i = encodeVarintDataobj(dAtA, i, uint64(m.Size_))
return i, nil
}
func encodeVarintDataobj(dAtA []byte, offset int, v uint64) int {
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return offset + 1
}
func (m *DataObj) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.FilePath)
n += 1 + l + sovDataobj(uint64(l))
n += 1 + sovDataobj(uint64(m.Offset))
n += 1 + sovDataobj(uint64(m.Size_))
return n
}
func sovDataobj(x uint64) (n int) {
for {
n++
x >>= 7
if x == 0 {
break
}
}
return n
}
func sozDataobj(x uint64) (n int) {
return sovDataobj(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *DataObj) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowDataobj
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: DataObj: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: DataObj: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field FilePath", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowDataobj
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthDataobj
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthDataobj
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.FilePath = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Offset", wireType)
}
m.Offset = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowDataobj
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Offset |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Size_", wireType)
}
m.Size_ = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowDataobj
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Size_ |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipDataobj(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthDataobj
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthDataobj
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipDataobj(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowDataobj
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowDataobj
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
return iNdEx, nil
case 1:
iNdEx += 8
return iNdEx, nil
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowDataobj
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthDataobj
}
iNdEx += length
if iNdEx < 0 {
return 0, ErrInvalidLengthDataobj
}
return iNdEx, nil
case 3:
for {
var innerWire uint64
var start int = iNdEx
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowDataobj
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
innerWire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
innerWireType := int(innerWire & 0x7)
if innerWireType == 4 {
break
}
next, err := skipDataobj(dAtA[start:])
if err != nil {
return 0, err
}
iNdEx = start + next
if iNdEx < 0 {
return 0, ErrInvalidLengthDataobj
}
}
return iNdEx, nil
case 4:
return iNdEx, nil
case 5:
iNdEx += 4
return iNdEx, nil
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
}
panic("unreachable")
}
var (
ErrInvalidLengthDataobj = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowDataobj = fmt.Errorf("proto: integer overflow")
)

View File

@ -1,9 +0,0 @@
syntax = "proto2";
package datastore.pb;
message DataObj {
optional string FilePath = 1;
optional uint64 Offset = 2;
optional uint64 Size = 3;
}

View File

@ -1,287 +0,0 @@
package filestore
import (
"fmt"
"sort"
pb "github.com/ipfs/go-ipfs/filestore/pb"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
)
// Status is used to identify the state of the block data referenced
// by a FilestoreNode. Among other places, it is used by CorruptReferenceError.
type Status int32
// These are the supported Status codes.
const (
StatusOk Status = 0
StatusFileError Status = 10 // Backing File Error
StatusFileNotFound Status = 11 // Backing File Not Found
StatusFileChanged Status = 12 // Contents of the file changed
StatusOtherError Status = 20 // Internal Error, likely corrupt entry
StatusKeyNotFound Status = 30
)
// String provides a human-readable representation for Status codes.
func (s Status) String() string {
switch s {
case StatusOk:
return "ok"
case StatusFileError:
return "error"
case StatusFileNotFound:
return "no-file"
case StatusFileChanged:
return "changed"
case StatusOtherError:
return "ERROR"
case StatusKeyNotFound:
return "missing"
default:
return "???"
}
}
// Format returns the status formatted as a string
// with leading 0s.
func (s Status) Format() string {
return fmt.Sprintf("%-7s", s.String())
}
// ListRes wraps the response of the List*() functions, which
// allows to obtain and verify blocks stored by the FileManager
// of a Filestore. It includes information about the referenced
// block.
type ListRes struct {
Status Status
ErrorMsg string
Key cid.Cid
FilePath string
Offset uint64
Size uint64
}
// FormatLong returns a human readable string for a ListRes object
func (r *ListRes) FormatLong(enc func(cid.Cid) string) string {
if enc == nil {
enc = (cid.Cid).String
}
switch {
case !r.Key.Defined():
return "<corrupt key>"
case r.FilePath == "":
return r.Key.String()
default:
return fmt.Sprintf("%-50s %6d %s %d", enc(r.Key), r.Size, r.FilePath, r.Offset)
}
}
// List fetches the block with the given key from the Filemanager
// of the given Filestore and returns a ListRes object with the information.
// List does not verify that the reference is valid or whether the
// raw data is accesible. See Verify().
func List(fs *Filestore, key cid.Cid) *ListRes {
return list(fs, false, key)
}
// ListAll returns a function as an iterator which, once invoked, returns
// one by one each block in the Filestore's FileManager.
// ListAll does not verify that the references are valid or whether
// the raw data is accessible. See VerifyAll().
func ListAll(fs *Filestore, fileOrder bool) (func() *ListRes, error) {
if fileOrder {
return listAllFileOrder(fs, false)
}
return listAll(fs, false)
}
// Verify fetches the block with the given key from the Filemanager
// of the given Filestore and returns a ListRes object with the information.
// Verify makes sure that the reference is valid and the block data can be
// read.
func Verify(fs *Filestore, key cid.Cid) *ListRes {
return list(fs, true, key)
}
// VerifyAll returns a function as an iterator which, once invoked,
// returns one by one each block in the Filestore's FileManager.
// VerifyAll checks that the reference is valid and that the block data
// can be read.
func VerifyAll(fs *Filestore, fileOrder bool) (func() *ListRes, error) {
if fileOrder {
return listAllFileOrder(fs, true)
}
return listAll(fs, true)
}
func list(fs *Filestore, verify bool, key cid.Cid) *ListRes {
dobj, err := fs.fm.getDataObj(key)
if err != nil {
return mkListRes(key, nil, err)
}
if verify {
_, err = fs.fm.readDataObj(key, dobj)
}
return mkListRes(key, dobj, err)
}
func listAll(fs *Filestore, verify bool) (func() *ListRes, error) {
q := dsq.Query{}
qr, err := fs.fm.ds.Query(q)
if err != nil {
return nil, err
}
return func() *ListRes {
cid, dobj, err := next(qr)
if dobj == nil && err == nil {
return nil
} else if err == nil && verify {
_, err = fs.fm.readDataObj(cid, dobj)
}
return mkListRes(cid, dobj, err)
}, nil
}
func next(qr dsq.Results) (cid.Cid, *pb.DataObj, error) {
v, ok := qr.NextSync()
if !ok {
return cid.Cid{}, nil, nil
}
k := ds.RawKey(v.Key)
c, err := dshelp.DsKeyToCid(k)
if err != nil {
return cid.Cid{}, nil, fmt.Errorf("decoding cid from filestore: %s", err)
}
dobj, err := unmarshalDataObj(v.Value)
if err != nil {
return c, nil, err
}
return c, dobj, nil
}
func listAllFileOrder(fs *Filestore, verify bool) (func() *ListRes, error) {
q := dsq.Query{}
qr, err := fs.fm.ds.Query(q)
if err != nil {
return nil, err
}
var entries listEntries
for {
v, ok := qr.NextSync()
if !ok {
break
}
dobj, err := unmarshalDataObj(v.Value)
if err != nil {
entries = append(entries, &listEntry{
dsKey: v.Key,
err: err,
})
} else {
entries = append(entries, &listEntry{
dsKey: v.Key,
filePath: dobj.GetFilePath(),
offset: dobj.GetOffset(),
size: dobj.GetSize_(),
})
}
}
sort.Sort(entries)
i := 0
return func() *ListRes {
if i >= len(entries) {
return nil
}
v := entries[i]
i++
// attempt to convert the datastore key to a CID,
// store the error but don't use it yet
cid, keyErr := dshelp.DsKeyToCid(ds.RawKey(v.dsKey))
// first if they listRes already had an error return that error
if v.err != nil {
return mkListRes(cid, nil, v.err)
}
// now reconstruct the DataObj
dobj := pb.DataObj{
FilePath: v.filePath,
Offset: v.offset,
Size_: v.size,
}
// now if we could not convert the datastore key return that
// error
if keyErr != nil {
return mkListRes(cid, &dobj, keyErr)
}
// finally verify the dataobj if requested
var err error
if verify {
_, err = fs.fm.readDataObj(cid, &dobj)
}
return mkListRes(cid, &dobj, err)
}, nil
}
type listEntry struct {
filePath string
offset uint64
dsKey string
size uint64
err error
}
type listEntries []*listEntry
func (l listEntries) Len() int { return len(l) }
func (l listEntries) Swap(i, j int) { l[i], l[j] = l[j], l[i] }
func (l listEntries) Less(i, j int) bool {
if l[i].filePath == l[j].filePath {
if l[i].offset == l[j].offset {
return l[i].dsKey < l[j].dsKey
}
return l[i].offset < l[j].offset
}
return l[i].filePath < l[j].filePath
}
func mkListRes(c cid.Cid, d *pb.DataObj, err error) *ListRes {
status := StatusOk
errorMsg := ""
if err != nil {
if err == ds.ErrNotFound || err == blockstore.ErrNotFound {
status = StatusKeyNotFound
} else if err, ok := err.(*CorruptReferenceError); ok {
status = err.Code
} else {
status = StatusOtherError
}
errorMsg = err.Error()
}
if d == nil {
return &ListRes{
Status: status,
ErrorMsg: errorMsg,
Key: c,
}
}
return &ListRes{
Status: status,
ErrorMsg: errorMsg,
Key: c,
FilePath: d.FilePath,
Size: d.Size_,
Offset: d.Offset,
}
}

2
go.mod
View File

@ -26,6 +26,7 @@ require (
github.com/ipfs/go-ds-flatfs v0.0.2
github.com/ipfs/go-ds-leveldb v0.0.2
github.com/ipfs/go-ds-measure v0.0.1
github.com/ipfs/go-filestore v0.0.2
github.com/ipfs/go-fs-lock v0.0.1
github.com/ipfs/go-ipfs-blockstore v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.1
@ -110,6 +111,7 @@ require (
go4.org v0.0.0-20190313082347-94abd6928b1d // indirect
golang.org/x/sync v0.0.0-20190423024810-112230192c58 // indirect
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb
google.golang.org/appengine v1.4.0 // indirect
gopkg.in/cheggaaa/pb.v1 v1.0.28
gotest.tools/gotestsum v0.3.4
)

6
go.sum
View File

@ -239,6 +239,10 @@ github.com/ipfs/go-ds-leveldb v0.0.2 h1:P5HB59Zblym0B5XYOeEyw3YtPtbpIqQCavCSWaWE
github.com/ipfs/go-ds-leveldb v0.0.2/go.mod h1:CWFeBh5IAAscWyG/QRH+lJaAlnLWjsfPSNs4teyPUp0=
github.com/ipfs/go-ds-measure v0.0.1 h1:PrCueug+yZLkDCOthZTXKinuoCal/GvlAT7cNxzr03g=
github.com/ipfs/go-ds-measure v0.0.1/go.mod h1:wiH6bepKsgyNKpz3nyb4erwhhIVpIxnZbsjN1QpVbbE=
github.com/ipfs/go-filestore v0.0.1 h1:LJkPZvA31kAXV/FIOnkoOvD8UsDTdPC8WOb5VkgBDiU=
github.com/ipfs/go-filestore v0.0.1/go.mod h1:rTVr5W+01wWKB5wuM0Rvy49vtBoAbw5+DBF2kochxVQ=
github.com/ipfs/go-filestore v0.0.2 h1:pcYwpjtXXwirtbjBXKVJM9CTa9F7/8v1EkfnDaHTO3s=
github.com/ipfs/go-filestore v0.0.2/go.mod h1:KnZ41qJsCt2OX2mxZS0xsK3Psr0/oB93HMMssLujjVc=
github.com/ipfs/go-fs-lock v0.0.1 h1:XHX8uW4jQBYWHj59XXcjg7BHlHxV9ZOYs6Y43yb7/l0=
github.com/ipfs/go-fs-lock v0.0.1/go.mod h1:DNBekbboPKcxs1aukPSaOtFA3QfSdi5C855v0i9XJ8Y=
github.com/ipfs/go-ipfs-blockstore v0.0.1 h1:O9n3PbmTYZoNhkgkEyrXTznbmktIXif62xLX+8dPHzc=
@ -294,6 +298,8 @@ github.com/ipfs/go-merkledag v0.0.6 h1:rYZc0yzhO7y1cKi3Rw425a2HhEJDdLvNOWsqtmO3P
github.com/ipfs/go-merkledag v0.0.6/go.mod h1:QYPdnlvkOg7GnQRofu9XZimC5ZW5Wi3bKys/4GQQfto=
github.com/ipfs/go-merkledag v0.1.0 h1:CAEXjRFEDPvealQj3TgEjV1IJckwjvmxAqtq5QSXJrg=
github.com/ipfs/go-merkledag v0.1.0/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk=
github.com/ipfs/go-merkledag v0.2.0 h1:EAjIQCgZ6/DnOAlKY3+59j72FD9BsYtNaCRSmN0xIbU=
github.com/ipfs/go-merkledag v0.2.0/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk=
github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg=
github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY=
github.com/ipfs/go-metrics-prometheus v0.0.2 h1:9i2iljLg12S78OhC6UAiXi176xvQGiZaGVF1CUVdE+s=

View File

@ -11,7 +11,7 @@ import (
"strings"
"sync"
filestore "github.com/ipfs/go-ipfs/filestore"
filestore "github.com/ipfs/go-filestore"
keystore "github.com/ipfs/go-ipfs/keystore"
repo "github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs/repo/common"

View File

@ -3,7 +3,7 @@ package repo
import (
"errors"
filestore "github.com/ipfs/go-ipfs/filestore"
filestore "github.com/ipfs/go-filestore"
keystore "github.com/ipfs/go-ipfs/keystore"
config "github.com/ipfs/go-ipfs-config"

View File

@ -4,7 +4,7 @@ import (
"errors"
"io"
filestore "github.com/ipfs/go-ipfs/filestore"
filestore "github.com/ipfs/go-filestore"
keystore "github.com/ipfs/go-ipfs/keystore"
ds "github.com/ipfs/go-datastore"