diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 3fb47aa0b..d2f068acc 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -23,34 +23,52 @@ var ErrNotFound = errors.New("blockservice: key not found") // BlockService is a hybrid block datastore. It stores data in a local // datastore and may retrieve data from a remote Exchange. // It uses an internal `datastore.Datastore` instance to store values. -type BlockService struct { - // TODO don't expose underlying impl details - Blockstore blockstore.Blockstore - Exchange exchange.Interface +type BlockService interface { + Blockstore() blockstore.Blockstore + Exchange() exchange.Interface + AddBlock(o blocks.Block) (*cid.Cid, error) + AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) + GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) + GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block + DeleteBlock(o blocks.Block) error + Close() error +} + +type blockService struct { + blockstore blockstore.Blockstore + exchange exchange.Interface } // NewBlockService creates a BlockService with given datastore instance. -func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService { +func New(bs blockstore.Blockstore, rem exchange.Interface) BlockService { if rem == nil { log.Warning("blockservice running in local (offline) mode.") } - return &BlockService{ - Blockstore: bs, - Exchange: rem, + return &blockService{ + blockstore: bs, + exchange: rem, } } +func (bs *blockService) Blockstore() blockstore.Blockstore { + return bs.blockstore +} + +func (bs *blockService) Exchange() exchange.Interface { + return bs.exchange +} + // AddBlock adds a particular block to the service, Putting it into the datastore. // TODO pass a context into this if the remote.HasBlock is going to remain here. -func (s *BlockService) AddBlock(o blocks.Block) (*cid.Cid, error) { +func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) { // TODO: while this is a great optimization, we should think about the // possibility of streaming writes directly to disk. If we can pass this object // all the way down to the datastore without having to 'buffer' its data, // we could implement a `WriteTo` method on it that could do a streaming write // of the content, saving us (probably) considerable memory. c := o.Cid() - has, err := s.Blockstore.Has(c) + has, err := s.blockstore.Has(c) if err != nil { return nil, err } @@ -59,22 +77,22 @@ func (s *BlockService) AddBlock(o blocks.Block) (*cid.Cid, error) { return c, nil } - err = s.Blockstore.Put(o) + err = s.blockstore.Put(o) if err != nil { return nil, err } - if err := s.Exchange.HasBlock(o); err != nil { + if err := s.exchange.HasBlock(o); err != nil { return nil, errors.New("blockservice is closed") } return c, nil } -func (s *BlockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) { +func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) { var toput []blocks.Block for _, b := range bs { - has, err := s.Blockstore.Has(b.Cid()) + has, err := s.blockstore.Has(b.Cid()) if err != nil { return nil, err } @@ -86,14 +104,14 @@ func (s *BlockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) { toput = append(toput, b) } - err := s.Blockstore.PutMany(toput) + err := s.blockstore.PutMany(toput) if err != nil { return nil, err } var ks []*cid.Cid for _, o := range toput { - if err := s.Exchange.HasBlock(o); err != nil { + if err := s.exchange.HasBlock(o); err != nil { return nil, fmt.Errorf("blockservice is closed (%s)", err) } @@ -104,19 +122,19 @@ func (s *BlockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) { // GetBlock retrieves a particular block from the service, // Getting it from the datastore using the key (hash). -func (s *BlockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) { +func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) { log.Debugf("BlockService GetBlock: '%s'", c) - block, err := s.Blockstore.Get(c) + block, err := s.blockstore.Get(c) if err == nil { return block, nil } - if err == blockstore.ErrNotFound && s.Exchange != nil { + if err == blockstore.ErrNotFound && s.exchange != nil { // TODO be careful checking ErrNotFound. If the underlying // implementation changes, this will break. log.Debug("Blockservice: Searching bitswap") - blk, err := s.Exchange.GetBlock(ctx, c) + blk, err := s.exchange.GetBlock(ctx, c) if err != nil { if err == blockstore.ErrNotFound { return nil, ErrNotFound @@ -137,13 +155,13 @@ func (s *BlockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, // GetBlocks gets a list of blocks asynchronously and returns through // the returned channel. // NB: No guarantees are made about order. -func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block { +func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block { out := make(chan blocks.Block, 0) go func() { defer close(out) var misses []*cid.Cid for _, c := range ks { - hit, err := s.Blockstore.Get(c) + hit, err := s.blockstore.Get(c) if err != nil { misses = append(misses, c) continue @@ -160,7 +178,7 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc return } - rblocks, err := s.Exchange.GetBlocks(ctx, misses) + rblocks, err := s.exchange.GetBlocks(ctx, misses) if err != nil { log.Debugf("Error with GetBlocks: %s", err) return @@ -178,11 +196,11 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc } // DeleteBlock deletes a block in the blockservice from the datastore -func (s *BlockService) DeleteBlock(o blocks.Block) error { - return s.Blockstore.DeleteBlock(o.Cid()) +func (s *blockService) DeleteBlock(o blocks.Block) error { + return s.blockstore.DeleteBlock(o.Cid()) } -func (s *BlockService) Close() error { +func (s *blockService) Close() error { log.Debug("blockservice is shutting down...") - return s.Exchange.Close() + return s.exchange.Close() } diff --git a/blockservice/test/mock.go b/blockservice/test/mock.go index 28e3a4e99..622d1c8d6 100644 --- a/blockservice/test/mock.go +++ b/blockservice/test/mock.go @@ -9,13 +9,13 @@ import ( ) // Mocks returns |n| connected mock Blockservices -func Mocks(n int) []*BlockService { +func Mocks(n int) []BlockService { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) sg := bitswap.NewTestSessionGenerator(net) instances := sg.Instances(n) - var servs []*BlockService + var servs []BlockService for _, i := range instances { servs = append(servs, New(i.Blockstore(), i.Exchange)) } diff --git a/core/core.go b/core/core.go index b4ee113c9..6c1507abd 100644 --- a/core/core.go +++ b/core/core.go @@ -96,7 +96,7 @@ type IpfsNode struct { // Services Peerstore pstore.Peerstore // storage for other Peer instances Blockstore bstore.GCBlockstore // the block store (lower level) - Blocks *bserv.BlockService // the block service, get/add blocks. + Blocks bserv.BlockService // the block service, get/add blocks. DAG merkledag.DAGService // the merkle dag service, get/add objects. Resolver *path.Resolver // the path resolution system Reporter metrics.Reporter diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index b9431522b..5d2cd36dc 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -41,7 +41,7 @@ type LinkService interface { GetOfflineLinkService() LinkService } -func NewDAGService(bs *bserv.BlockService) *dagService { +func NewDAGService(bs bserv.BlockService) *dagService { return &dagService{Blocks: bs} } @@ -51,7 +51,7 @@ func NewDAGService(bs *bserv.BlockService) *dagService { // TODO: should cache Nodes that are in memory, and be // able to free some of them when vm pressure is high type dagService struct { - Blocks *bserv.BlockService + Blocks bserv.BlockService } // Add adds a node to the dagService, storing the block in the BlockService @@ -113,8 +113,8 @@ func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*Link, error) } func (n *dagService) GetOfflineLinkService() LinkService { - if n.Blocks.Exchange.IsOnline() { - bsrv := bserv.New(n.Blocks.Blockstore, offline.Exchange(n.Blocks.Blockstore)) + if n.Blocks.Exchange().IsOnline() { + bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore())) return NewDAGService(bsrv) } else { return n diff --git a/merkledag/merkledag_test.go b/merkledag/merkledag_test.go index 006c8b5ca..bc9093532 100644 --- a/merkledag/merkledag_test.go +++ b/merkledag/merkledag_test.go @@ -237,7 +237,7 @@ func TestFetchGraph(t *testing.T) { } // create an offline dagstore and ensure all blocks were fetched - bs := bserv.New(bsis[1].Blockstore, offline.Exchange(bsis[1].Blockstore)) + bs := bserv.New(bsis[1].Blockstore(), offline.Exchange(bsis[1].Blockstore())) offline_ds := NewDAGService(bs)