Merge pull request #3290 from ipfs/feat/el-cid-2

The conquest of El Cid, Pt. 2
This commit is contained in:
Jeromy Johnson 2016-10-10 06:37:00 -07:00 committed by GitHub
commit 7962903a2a
52 changed files with 544 additions and 535 deletions

View File

@ -6,8 +6,6 @@ import (
"errors"
"fmt"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
mh "gx/ipfs/QmYDds3421prZgqKbLpEK7T9Aa2eVdQ7o3YarX1LVLdP2J/go-multihash"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
@ -18,37 +16,39 @@ var ErrWrongHash = errors.New("data did not match given hash!")
type Block interface {
Multihash() mh.Multihash
RawData() []byte
Key() key.Key
Cid() *cid.Cid
String() string
Loggable() map[string]interface{}
}
// Block is a singular block of data in ipfs
type BasicBlock struct {
multihash mh.Multihash
data []byte
cid *cid.Cid
data []byte
}
// NewBlock creates a Block object from opaque data. It will hash the data.
func NewBlock(data []byte) *BasicBlock {
return &BasicBlock{data: data, multihash: u.Hash(data)}
// TODO: fix assumptions
return &BasicBlock{data: data, cid: cid.NewCidV0(u.Hash(data))}
}
// NewBlockWithHash creates a new block when the hash of the data
// is already known, this is used to save time in situations where
// we are able to be confident that the data is correct
func NewBlockWithHash(data []byte, h mh.Multihash) (*BasicBlock, error) {
func NewBlockWithCid(data []byte, c *cid.Cid) (*BasicBlock, error) {
if u.Debug {
chk := u.Hash(data)
if string(chk) != string(h) {
// TODO: fix assumptions
chkc := cid.NewCidV0(u.Hash(data))
if !chkc.Equals(c) {
return nil, ErrWrongHash
}
}
return &BasicBlock{data: data, multihash: h}, nil
return &BasicBlock{data: data, cid: c}, nil
}
func (b *BasicBlock) Multihash() mh.Multihash {
return b.multihash
return b.cid.Hash()
}
func (b *BasicBlock) RawData() []byte {
@ -56,20 +56,15 @@ func (b *BasicBlock) RawData() []byte {
}
func (b *BasicBlock) Cid() *cid.Cid {
return cid.NewCidV0(b.multihash)
}
// Key returns the block's Multihash as a Key value.
func (b *BasicBlock) Key() key.Key {
return key.Key(b.multihash)
return b.cid
}
func (b *BasicBlock) String() string {
return fmt.Sprintf("[Block %s]", b.Key())
return fmt.Sprintf("[Block %s]", b.Cid())
}
func (b *BasicBlock) Loggable() map[string]interface{} {
return map[string]interface{}{
"block": b.Key().String(),
"block": b.Cid().String(),
}
}

View File

@ -5,6 +5,7 @@ import (
"testing"
mh "gx/ipfs/QmYDds3421prZgqKbLpEK7T9Aa2eVdQ7o3YarX1LVLdP2J/go-multihash"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
)
@ -44,12 +45,12 @@ func TestHash(t *testing.T) {
}
}
func TestKey(t *testing.T) {
func TestCid(t *testing.T) {
data := []byte("yet another data")
block := NewBlock(data)
key := block.Key()
c := block.Cid()
if !bytes.Equal(block.Multihash(), key.ToMultihash()) {
if !bytes.Equal(block.Multihash(), c.Hash()) {
t.Error("key contains wrong data")
}
}
@ -66,8 +67,10 @@ func TestManualHash(t *testing.T) {
t.Fatal(err)
}
c := cid.NewCidV0(hash)
u.Debug = false
block, err := NewBlockWithHash(data, hash)
block, err := NewBlockWithCid(data, c)
if err != nil {
t.Fatal(err)
}
@ -77,7 +80,7 @@ func TestManualHash(t *testing.T) {
}
data[5] = byte((uint32(data[5]) + 5) % 256) // Transfrom hash to be different
block, err = NewBlockWithHash(data, hash)
block, err = NewBlockWithCid(data, c)
if err != nil {
t.Fatal(err)
}
@ -88,7 +91,7 @@ func TestManualHash(t *testing.T) {
u.Debug = true
block, err = NewBlockWithHash(data, hash)
block, err = NewBlockWithCid(data, c)
if err != ErrWrongHash {
t.Fatal(err)
}

View File

@ -1,13 +1,13 @@
package blockstore
import (
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
"context"
"github.com/ipfs/go-ipfs/blocks"
context "context"
"gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore"
)
@ -31,7 +31,7 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache,
return c, nil
}
func (b *arccache) DeleteBlock(k key.Key) error {
func (b *arccache) DeleteBlock(k *cid.Cid) error {
if has, ok := b.hasCached(k); ok && !has {
return ErrNotFound
}
@ -40,7 +40,7 @@ func (b *arccache) DeleteBlock(k key.Key) error {
err := b.blockstore.DeleteBlock(k)
switch err {
case nil, ds.ErrNotFound, ErrNotFound:
b.arc.Add(k, false)
b.addCache(k, false)
return err
default:
return err
@ -49,15 +49,16 @@ func (b *arccache) DeleteBlock(k key.Key) error {
// if ok == false has is inconclusive
// if ok == true then has respons to question: is it contained
func (b *arccache) hasCached(k key.Key) (has bool, ok bool) {
func (b *arccache) hasCached(k *cid.Cid) (has bool, ok bool) {
b.total.Inc()
if k == "" {
if k == nil {
log.Error("nil cid in arccache")
// Return cache invalid so the call to blockstore happens
// in case of invalid key and correct error is created.
return false, false
}
h, ok := b.arc.Get(k)
h, ok := b.arc.Get(k.KeyString())
if ok {
b.hits.Inc()
return h.(bool), true
@ -65,40 +66,45 @@ func (b *arccache) hasCached(k key.Key) (has bool, ok bool) {
return false, false
}
func (b *arccache) Has(k key.Key) (bool, error) {
func (b *arccache) Has(k *cid.Cid) (bool, error) {
if has, ok := b.hasCached(k); ok {
return has, nil
}
res, err := b.blockstore.Has(k)
if err == nil {
b.arc.Add(k, res)
b.addCache(k, res)
}
return res, err
}
func (b *arccache) Get(k key.Key) (blocks.Block, error) {
func (b *arccache) Get(k *cid.Cid) (blocks.Block, error) {
if k == nil {
log.Error("nil cid in arc cache")
return nil, ErrNotFound
}
if has, ok := b.hasCached(k); ok && !has {
return nil, ErrNotFound
}
bl, err := b.blockstore.Get(k)
if bl == nil && err == ErrNotFound {
b.arc.Add(k, false)
b.addCache(k, false)
} else if bl != nil {
b.arc.Add(k, true)
b.addCache(k, true)
}
return bl, err
}
func (b *arccache) Put(bl blocks.Block) error {
if has, ok := b.hasCached(bl.Key()); ok && has {
if has, ok := b.hasCached(bl.Cid()); ok && has {
return nil
}
err := b.blockstore.Put(bl)
if err == nil {
b.arc.Add(bl.Key(), true)
b.addCache(bl.Cid(), true)
}
return err
}
@ -108,7 +114,7 @@ func (b *arccache) PutMany(bs []blocks.Block) error {
for _, block := range bs {
// call put on block if result is inconclusive or we are sure that
// the block isn't in storage
if has, ok := b.hasCached(block.Key()); !ok || (ok && !has) {
if has, ok := b.hasCached(block.Cid()); !ok || (ok && !has) {
good = append(good, block)
}
}
@ -117,12 +123,16 @@ func (b *arccache) PutMany(bs []blocks.Block) error {
return err
}
for _, block := range good {
b.arc.Add(block.Key(), true)
b.addCache(block.Cid(), true)
}
return nil
}
func (b *arccache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
func (b *arccache) addCache(c *cid.Cid, has bool) {
b.arc.Add(c.KeyString(), has)
}
func (b *arccache) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
return b.blockstore.AllKeysChan(ctx)
}

View File

@ -1,12 +1,12 @@
package blockstore
import (
"context"
"testing"
"github.com/ipfs/go-ipfs/blocks"
"gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
context "context"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore"
syncds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync"
)
@ -60,7 +60,7 @@ func TestRemoveCacheEntryOnDelete(t *testing.T) {
writeHitTheDatastore = true
})
arc.DeleteBlock(exampleBlock.Key())
arc.DeleteBlock(exampleBlock.Cid())
arc.Put(exampleBlock)
if !writeHitTheDatastore {
t.Fail()
@ -78,9 +78,9 @@ func TestElideDuplicateWrite(t *testing.T) {
func TestHasRequestTriggersCache(t *testing.T) {
arc, _, cd := createStores(t)
arc.Has(exampleBlock.Key())
arc.Has(exampleBlock.Cid())
trap("has hit datastore", cd, t)
if has, err := arc.Has(exampleBlock.Key()); has || err != nil {
if has, err := arc.Has(exampleBlock.Cid()); has || err != nil {
t.Fatal("has was true but there is no such block")
}
@ -92,7 +92,7 @@ func TestHasRequestTriggersCache(t *testing.T) {
trap("has hit datastore", cd, t)
if has, err := arc.Has(exampleBlock.Key()); !has || err != nil {
if has, err := arc.Has(exampleBlock.Cid()); !has || err != nil {
t.Fatal("has returned invalid result")
}
}
@ -100,13 +100,13 @@ func TestHasRequestTriggersCache(t *testing.T) {
func TestGetFillsCache(t *testing.T) {
arc, _, cd := createStores(t)
if bl, err := arc.Get(exampleBlock.Key()); bl != nil || err == nil {
if bl, err := arc.Get(exampleBlock.Cid()); bl != nil || err == nil {
t.Fatal("block was found or there was no error")
}
trap("has hit datastore", cd, t)
if has, err := arc.Has(exampleBlock.Key()); has || err != nil {
if has, err := arc.Has(exampleBlock.Cid()); has || err != nil {
t.Fatal("has was true but there is no such block")
}
@ -118,7 +118,7 @@ func TestGetFillsCache(t *testing.T) {
trap("has hit datastore", cd, t)
if has, err := arc.Has(exampleBlock.Key()); !has || err != nil {
if has, err := arc.Has(exampleBlock.Cid()); !has || err != nil {
t.Fatal("has returned invalid result")
}
}
@ -126,15 +126,15 @@ func TestGetFillsCache(t *testing.T) {
func TestGetAndDeleteFalseShortCircuit(t *testing.T) {
arc, _, cd := createStores(t)
arc.Has(exampleBlock.Key())
arc.Has(exampleBlock.Cid())
trap("get hit datastore", cd, t)
if bl, err := arc.Get(exampleBlock.Key()); bl != nil || err != ErrNotFound {
if bl, err := arc.Get(exampleBlock.Cid()); bl != nil || err != ErrNotFound {
t.Fatal("get returned invalid result")
}
if arc.DeleteBlock(exampleBlock.Key()) != ErrNotFound {
if arc.DeleteBlock(exampleBlock.Cid()) != ErrNotFound {
t.Fatal("expected ErrNotFound error")
}
}
@ -148,7 +148,7 @@ func TestArcCreationFailure(t *testing.T) {
func TestInvalidKey(t *testing.T) {
arc, _, _ := createStores(t)
bl, err := arc.Get(key.Key(""))
bl, err := arc.Get(nil)
if bl != nil {
t.Fatal("blocks should be nil")
@ -163,10 +163,28 @@ func TestHasAfterSucessfulGetIsCached(t *testing.T) {
bs.Put(exampleBlock)
arc.Get(exampleBlock.Key())
arc.Get(exampleBlock.Cid())
trap("has hit datastore", cd, t)
arc.Has(exampleBlock.Key())
arc.Has(exampleBlock.Cid())
}
func TestDifferentKeyObjectsWork(t *testing.T) {
arc, bs, cd := createStores(t)
bs.Put(exampleBlock)
arc.Get(exampleBlock.Cid())
trap("has hit datastore", cd, t)
cidstr := exampleBlock.Cid().String()
ncid, err := cid.Decode(cidstr)
if err != nil {
t.Fatal(err)
}
arc.Has(ncid)
}
func TestPutManyCaches(t *testing.T) {
@ -174,9 +192,9 @@ func TestPutManyCaches(t *testing.T) {
arc.PutMany([]blocks.Block{exampleBlock})
trap("has hit datastore", cd, t)
arc.Has(exampleBlock.Key())
arc.Has(exampleBlock.Cid())
untrap(cd)
arc.DeleteBlock(exampleBlock.Key())
arc.DeleteBlock(exampleBlock.Cid())
arc.Put(exampleBlock)
trap("PunMany has hit datastore", cd, t)

View File

@ -3,15 +3,16 @@
package blockstore
import (
"context"
"errors"
"sync"
"sync/atomic"
context "context"
blocks "github.com/ipfs/go-ipfs/blocks"
dshelp "github.com/ipfs/go-ipfs/thirdparty/ds-help"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
mh "gx/ipfs/QmYDds3421prZgqKbLpEK7T9Aa2eVdQ7o3YarX1LVLdP2J/go-multihash"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore"
dsns "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/namespace"
dsq "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/query"
@ -29,13 +30,13 @@ var ErrNotFound = errors.New("blockstore: block not found")
// Blockstore wraps a Datastore
type Blockstore interface {
DeleteBlock(key.Key) error
Has(key.Key) (bool, error)
Get(key.Key) (blocks.Block, error)
DeleteBlock(*cid.Cid) error
Has(*cid.Cid) (bool, error)
Get(*cid.Cid) (blocks.Block, error)
Put(blocks.Block) error
PutMany([]blocks.Block) error
AllKeysChan(ctx context.Context) (<-chan key.Key, error)
AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error)
}
type GCBlockstore interface {
@ -80,12 +81,13 @@ func (bs *blockstore) HashOnRead(enabled bool) {
bs.rehash = enabled
}
func (bs *blockstore) Get(k key.Key) (blocks.Block, error) {
if k == "" {
func (bs *blockstore) Get(k *cid.Cid) (blocks.Block, error) {
if k == nil {
log.Error("nil cid in blockstore")
return nil, ErrNotFound
}
maybeData, err := bs.datastore.Get(k.DsKey())
maybeData, err := bs.datastore.Get(dshelp.NewKeyFromBinary(k.KeyString()))
if err == ds.ErrNotFound {
return nil, ErrNotFound
}
@ -99,18 +101,18 @@ func (bs *blockstore) Get(k key.Key) (blocks.Block, error) {
if bs.rehash {
rb := blocks.NewBlock(bdata)
if rb.Key() != k {
if !rb.Cid().Equals(k) {
return nil, ErrHashMismatch
} else {
return rb, nil
}
} else {
return blocks.NewBlockWithHash(bdata, mh.Multihash(k))
return blocks.NewBlockWithCid(bdata, k)
}
}
func (bs *blockstore) Put(block blocks.Block) error {
k := block.Key().DsKey()
k := dshelp.NewKeyFromBinary(block.Cid().KeyString())
// Has is cheaper than Put, so see if we already have it
exists, err := bs.datastore.Has(k)
@ -126,7 +128,7 @@ func (bs *blockstore) PutMany(blocks []blocks.Block) error {
return err
}
for _, b := range blocks {
k := b.Key().DsKey()
k := dshelp.NewKeyFromBinary(b.Cid().KeyString())
exists, err := bs.datastore.Has(k)
if err == nil && exists {
continue
@ -140,19 +142,19 @@ func (bs *blockstore) PutMany(blocks []blocks.Block) error {
return t.Commit()
}
func (bs *blockstore) Has(k key.Key) (bool, error) {
return bs.datastore.Has(k.DsKey())
func (bs *blockstore) Has(k *cid.Cid) (bool, error) {
return bs.datastore.Has(dshelp.NewKeyFromBinary(k.KeyString()))
}
func (s *blockstore) DeleteBlock(k key.Key) error {
return s.datastore.Delete(k.DsKey())
func (s *blockstore) DeleteBlock(k *cid.Cid) error {
return s.datastore.Delete(dshelp.NewKeyFromBinary(k.KeyString()))
}
// AllKeysChan runs a query for keys from the blockstore.
// this is very simplistic, in the future, take dsq.Query as a param?
//
// AllKeysChan respects context
func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
// KeysOnly, because that would be _a lot_ of data.
q := dsq.Query{KeysOnly: true}
@ -164,39 +166,38 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
}
// this function is here to compartmentalize
get := func() (key.Key, bool) {
get := func() (*cid.Cid, bool) {
select {
case <-ctx.Done():
return "", false
return nil, false
case e, more := <-res.Next():
if !more {
return "", false
return nil, false
}
if e.Error != nil {
log.Debug("blockstore.AllKeysChan got err:", e.Error)
return "", false
return nil, false
}
// need to convert to key.Key using key.KeyFromDsKey.
k, err := key.KeyFromDsKey(ds.NewKey(e.Key))
kb, err := dshelp.BinaryFromDsKey(ds.NewKey(e.Key)) // TODO: calling NewKey isnt free
if err != nil {
log.Warningf("error parsing key from DsKey: ", err)
return "", true
return nil, true
}
log.Debug("blockstore: query got key", k)
// key must be a multihash. else ignore it.
_, err = mh.Cast([]byte(k))
c, err := cid.Cast(kb)
if err != nil {
log.Warningf("key from datastore was not a multihash: ", err)
return "", true
log.Warning("error parsing cid from decoded DsKey: ", err)
return nil, true
}
log.Debug("blockstore: query got key", c)
return k, true
return c, true
}
}
output := make(chan key.Key, dsq.KeysOnlyBufSize)
output := make(chan *cid.Cid, dsq.KeysOnlyBufSize)
go func() {
defer func() {
res.Process().Close() // ensure exit (signals early exit, too)
@ -208,7 +209,7 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
if !ok {
return
}
if k == "" {
if k == nil {
continue
}

View File

@ -2,22 +2,24 @@ package blockstore
import (
"bytes"
"context"
"fmt"
"testing"
context "context"
blocks "github.com/ipfs/go-ipfs/blocks"
dshelp "github.com/ipfs/go-ipfs/thirdparty/ds-help"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore"
dsq "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/query"
ds_sync "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync"
blocks "github.com/ipfs/go-ipfs/blocks"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
)
func TestGetWhenKeyNotPresent(t *testing.T) {
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
bl, err := bs.Get(key.Key("not present"))
c := cid.NewCidV0(u.Hash([]byte("stuff")))
bl, err := bs.Get(c)
if bl != nil {
t.Error("nil block expected")
@ -27,9 +29,9 @@ func TestGetWhenKeyNotPresent(t *testing.T) {
}
}
func TestGetWhenKeyIsEmptyString(t *testing.T) {
func TestGetWhenKeyIsNil(t *testing.T) {
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
_, err := bs.Get(key.Key(""))
_, err := bs.Get(nil)
if err != ErrNotFound {
t.Fail()
}
@ -44,7 +46,7 @@ func TestPutThenGetBlock(t *testing.T) {
t.Fatal(err)
}
blockFromBlockstore, err := bs.Get(block.Key())
blockFromBlockstore, err := bs.Get(block.Cid())
if err != nil {
t.Fatal(err)
}
@ -62,7 +64,7 @@ func TestHashOnRead(t *testing.T) {
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
bl := blocks.NewBlock([]byte("some data"))
blBad, err := blocks.NewBlockWithHash([]byte("some other data"), bl.Key().ToMultihash())
blBad, err := blocks.NewBlockWithCid([]byte("some other data"), bl.Cid())
if err != nil {
t.Fatal("debug is off, still got an error")
}
@ -71,35 +73,35 @@ func TestHashOnRead(t *testing.T) {
bs.Put(bl2)
bs.HashOnRead(true)
if _, err := bs.Get(bl.Key()); err != ErrHashMismatch {
if _, err := bs.Get(bl.Cid()); err != ErrHashMismatch {
t.Fatalf("expected '%v' got '%v'\n", ErrHashMismatch, err)
}
if b, err := bs.Get(bl2.Key()); err != nil || b.String() != bl2.String() {
if b, err := bs.Get(bl2.Cid()); err != nil || b.String() != bl2.String() {
t.Fatal("got wrong blocks")
}
}
func newBlockStoreWithKeys(t *testing.T, d ds.Datastore, N int) (Blockstore, []key.Key) {
func newBlockStoreWithKeys(t *testing.T, d ds.Datastore, N int) (Blockstore, []*cid.Cid) {
if d == nil {
d = ds.NewMapDatastore()
}
bs := NewBlockstore(ds_sync.MutexWrap(d))
keys := make([]key.Key, N)
keys := make([]*cid.Cid, N)
for i := 0; i < N; i++ {
block := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i)))
err := bs.Put(block)
if err != nil {
t.Fatal(err)
}
keys[i] = block.Key()
keys[i] = block.Cid()
}
return bs, keys
}
func collect(ch <-chan key.Key) []key.Key {
var keys []key.Key
func collect(ch <-chan *cid.Cid) []*cid.Cid {
var keys []*cid.Cid
for k := range ch {
keys = append(keys, k)
}
@ -188,18 +190,18 @@ func TestValueTypeMismatch(t *testing.T) {
block := blocks.NewBlock([]byte("some data"))
datastore := ds.NewMapDatastore()
k := BlockPrefix.Child(block.Key().DsKey())
k := BlockPrefix.Child(dshelp.NewKeyFromBinary(block.Cid().KeyString()))
datastore.Put(k, "data that isn't a block!")
blockstore := NewBlockstore(ds_sync.MutexWrap(datastore))
_, err := blockstore.Get(block.Key())
_, err := blockstore.Get(block.Cid())
if err != ValueTypeMismatch {
t.Fatal(err)
}
}
func expectMatches(t *testing.T, expect, actual []key.Key) {
func expectMatches(t *testing.T, expect, actual []*cid.Cid) {
if len(expect) != len(actual) {
t.Errorf("expect and actual differ: %d != %d", len(expect), len(actual))
@ -207,7 +209,7 @@ func expectMatches(t *testing.T, expect, actual []key.Key) {
for _, ek := range expect {
found := false
for _, ak := range actual {
if ek == ak {
if ek.Equals(ak) {
found = true
}
}

View File

@ -1,14 +1,14 @@
package blockstore
import (
"context"
"sync/atomic"
"time"
"github.com/ipfs/go-ipfs/blocks"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
context "context"
"gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
bloom "gx/ipfs/QmeiMCBkYHxkDkDfnDadzz4YxY5ruL5Pj499essE4vRsGM/bbloom"
)
@ -84,7 +84,7 @@ func (b *bloomcache) Rebuild(ctx context.Context) {
select {
case key, ok := <-ch:
if ok {
b.bloom.AddTS([]byte(key)) // Use binary key, the more compact the better
b.bloom.AddTS(key.Bytes()) // Use binary key, the more compact the better
} else {
finish = true
}
@ -97,7 +97,7 @@ func (b *bloomcache) Rebuild(ctx context.Context) {
atomic.StoreInt32(&b.active, 1)
}
func (b *bloomcache) DeleteBlock(k key.Key) error {
func (b *bloomcache) DeleteBlock(k *cid.Cid) error {
if has, ok := b.hasCached(k); ok && !has {
return ErrNotFound
}
@ -107,15 +107,16 @@ func (b *bloomcache) DeleteBlock(k key.Key) error {
// if ok == false has is inconclusive
// if ok == true then has respons to question: is it contained
func (b *bloomcache) hasCached(k key.Key) (has bool, ok bool) {
func (b *bloomcache) hasCached(k *cid.Cid) (has bool, ok bool) {
b.total.Inc()
if k == "" {
if k == nil {
log.Error("nil cid in bloom cache")
// Return cache invalid so call to blockstore
// in case of invalid key is forwarded deeper
return false, false
}
if b.BloomActive() {
blr := b.bloom.HasTS([]byte(k))
blr := b.bloom.HasTS(k.Bytes())
if blr == false { // not contained in bloom is only conclusive answer bloom gives
b.hits.Inc()
return false, true
@ -124,7 +125,7 @@ func (b *bloomcache) hasCached(k key.Key) (has bool, ok bool) {
return false, false
}
func (b *bloomcache) Has(k key.Key) (bool, error) {
func (b *bloomcache) Has(k *cid.Cid) (bool, error) {
if has, ok := b.hasCached(k); ok {
return has, nil
}
@ -132,7 +133,7 @@ func (b *bloomcache) Has(k key.Key) (bool, error) {
return b.blockstore.Has(k)
}
func (b *bloomcache) Get(k key.Key) (blocks.Block, error) {
func (b *bloomcache) Get(k *cid.Cid) (blocks.Block, error) {
if has, ok := b.hasCached(k); ok && !has {
return nil, ErrNotFound
}
@ -141,13 +142,13 @@ func (b *bloomcache) Get(k key.Key) (blocks.Block, error) {
}
func (b *bloomcache) Put(bl blocks.Block) error {
if has, ok := b.hasCached(bl.Key()); ok && has {
if has, ok := b.hasCached(bl.Cid()); ok && has {
return nil
}
err := b.blockstore.Put(bl)
if err == nil {
b.bloom.AddTS([]byte(bl.Key()))
b.bloom.AddTS(bl.Cid().Bytes())
}
return err
}
@ -162,12 +163,12 @@ func (b *bloomcache) PutMany(bs []blocks.Block) error {
return err
}
for _, bl := range bs {
b.bloom.AddTS([]byte(bl.Key()))
b.bloom.AddTS(bl.Cid().Bytes())
}
return nil
}
func (b *bloomcache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
func (b *bloomcache) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
return b.blockstore.AllKeysChan(ctx)
}

View File

@ -44,7 +44,7 @@ func TestPutManyAddsToBloom(t *testing.T) {
block2 := blocks.NewBlock([]byte("bar"))
cachedbs.PutMany([]blocks.Block{block1})
has, err := cachedbs.Has(block1.Key())
has, err := cachedbs.Has(block1.Cid())
if err != nil {
t.Fatal(err)
}
@ -52,7 +52,7 @@ func TestPutManyAddsToBloom(t *testing.T) {
t.Fatal("added block is reported missing")
}
has, err = cachedbs.Has(block2.Key())
has, err = cachedbs.Has(block2.Cid())
if err != nil {
t.Fatal(err)
}
@ -93,7 +93,7 @@ func TestHasIsBloomCached(t *testing.T) {
})
for i := 0; i < 1000; i++ {
cachedbs.Has(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i+2000))).Key())
cachedbs.Has(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i+2000))).Cid())
}
if float64(cacheFails)/float64(1000) > float64(0.05) {
@ -112,11 +112,11 @@ func TestHasIsBloomCached(t *testing.T) {
t.Fatalf("expected datastore hit: %d", cacheFails)
}
if has, err := cachedbs.Has(block.Key()); !has || err != nil {
if has, err := cachedbs.Has(block.Cid()); !has || err != nil {
t.Fatal("has gave wrong response")
}
bl, err := cachedbs.Get(block.Key())
bl, err := cachedbs.Get(block.Cid())
if bl.String() != block.String() {
t.Fatal("block data doesn't match")
}

View File

@ -6,7 +6,6 @@ import (
bs "github.com/ipfs/go-ipfs/blocks/blockstore"
"github.com/ipfs/go-ipfs/pin"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore"
)
@ -38,7 +37,7 @@ func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, out chan<- interface{}, c
stillOkay := FilterPinned(pins, out, cids)
for _, c := range stillOkay {
err := blocks.DeleteBlock(key.Key(c.Hash()))
err := blocks.DeleteBlock(c)
if err != nil && opts.Force && (err == bs.ErrNotFound || err == ds.ErrNotFound) {
// ignore non-existent blocks
} else if err != nil {

View File

@ -4,62 +4,57 @@ package set
import (
"github.com/ipfs/go-ipfs/blocks/bloom"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
)
var log = logging.Logger("blockset")
// BlockSet represents a mutable set of keyed blocks
type BlockSet interface {
AddBlock(key.Key)
RemoveBlock(key.Key)
HasKey(key.Key) bool
AddBlock(*cid.Cid)
RemoveBlock(*cid.Cid)
HasKey(*cid.Cid) bool
GetBloomFilter() bloom.Filter
GetKeys() []key.Key
GetKeys() []*cid.Cid
}
func SimpleSetFromKeys(keys []key.Key) BlockSet {
sbs := &simpleBlockSet{blocks: make(map[key.Key]struct{})}
func SimpleSetFromKeys(keys []*cid.Cid) BlockSet {
sbs := &simpleBlockSet{blocks: cid.NewSet()}
for _, k := range keys {
sbs.blocks[k] = struct{}{}
sbs.AddBlock(k)
}
return sbs
}
func NewSimpleBlockSet() BlockSet {
return &simpleBlockSet{blocks: make(map[key.Key]struct{})}
return &simpleBlockSet{blocks: cid.NewSet()}
}
type simpleBlockSet struct {
blocks map[key.Key]struct{}
blocks *cid.Set
}
func (b *simpleBlockSet) AddBlock(k key.Key) {
b.blocks[k] = struct{}{}
func (b *simpleBlockSet) AddBlock(k *cid.Cid) {
b.blocks.Add(k)
}
func (b *simpleBlockSet) RemoveBlock(k key.Key) {
delete(b.blocks, k)
func (b *simpleBlockSet) RemoveBlock(k *cid.Cid) {
b.blocks.Remove(k)
}
func (b *simpleBlockSet) HasKey(k key.Key) bool {
_, has := b.blocks[k]
return has
func (b *simpleBlockSet) HasKey(k *cid.Cid) bool {
return b.blocks.Has(k)
}
func (b *simpleBlockSet) GetBloomFilter() bloom.Filter {
f := bloom.BasicFilter()
for k := range b.blocks {
f.Add([]byte(k))
for _, k := range b.blocks.Keys() {
f.Add(k.Bytes())
}
return f
}
func (b *simpleBlockSet) GetKeys() []key.Key {
var out []key.Key
for k := range b.blocks {
out = append(out, k)
}
return out
func (b *simpleBlockSet) GetKeys() []*cid.Cid {
return b.blocks.Keys()
}

View File

@ -4,7 +4,8 @@ import (
"testing"
bu "github.com/ipfs/go-ipfs/blocks/blocksutil"
k "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
)
const (
@ -13,15 +14,15 @@ const (
tReAdd
)
func exampleKeys() []k.Key {
res := make([]k.Key, 1<<8)
func exampleKeys() []*cid.Cid {
res := make([]*cid.Cid, 1<<8)
gen := bu.NewBlockGenerator()
for i := uint64(0); i < 1<<8; i++ {
res[i] = gen.Next().Key()
res[i] = gen.Next().Cid()
}
return res
}
func checkSet(set BlockSet, keySlice []k.Key, t *testing.T) {
func checkSet(set BlockSet, keySlice []*cid.Cid, t *testing.T) {
for i, key := range keySlice {
if i&tReAdd == 0 {
if set.HasKey(key) == false {
@ -69,7 +70,7 @@ func TestSetWorks(t *testing.T) {
bloom := set.GetBloomFilter()
for _, key := range addedKeys {
if bloom.Find([]byte(key)) == false {
if bloom.Find(key.Bytes()) == false {
t.Error("bloom doesn't contain expected key")
}
}

View File

@ -10,7 +10,6 @@ import (
blocks "github.com/ipfs/go-ipfs/blocks"
"github.com/ipfs/go-ipfs/blocks/blockstore"
exchange "github.com/ipfs/go-ipfs/exchange"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
context "context"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
@ -30,12 +29,6 @@ type BlockService struct {
Exchange exchange.Interface
}
// an Object is simply a typed block
type Object interface {
Cid() *cid.Cid
blocks.Block
}
// NewBlockService creates a BlockService with given datastore instance.
func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService {
if rem == nil {
@ -50,14 +43,14 @@ func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService {
// AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here.
func (s *BlockService) AddObject(o Object) (*cid.Cid, error) {
func (s *BlockService) AddBlock(o blocks.Block) (*cid.Cid, error) {
// TODO: while this is a great optimization, we should think about the
// possibility of streaming writes directly to disk. If we can pass this object
// all the way down to the datastore without having to 'buffer' its data,
// we could implement a `WriteTo` method on it that could do a streaming write
// of the content, saving us (probably) considerable memory.
c := o.Cid()
has, err := s.Blockstore.Has(key.Key(c.Hash()))
has, err := s.Blockstore.Has(c)
if err != nil {
return nil, err
}
@ -78,13 +71,10 @@ func (s *BlockService) AddObject(o Object) (*cid.Cid, error) {
return c, nil
}
func (s *BlockService) AddObjects(bs []Object) ([]*cid.Cid, error) {
func (s *BlockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
var toput []blocks.Block
var toputcids []*cid.Cid
for _, b := range bs {
c := b.Cid()
has, err := s.Blockstore.Has(key.Key(c.Hash()))
has, err := s.Blockstore.Has(b.Cid())
if err != nil {
return nil, err
}
@ -94,7 +84,6 @@ func (s *BlockService) AddObjects(bs []Object) ([]*cid.Cid, error) {
}
toput = append(toput, b)
toputcids = append(toputcids, c)
}
err := s.Blockstore.PutMany(toput)
@ -108,8 +97,7 @@ func (s *BlockService) AddObjects(bs []Object) ([]*cid.Cid, error) {
return nil, fmt.Errorf("blockservice is closed (%s)", err)
}
c := o.(Object).Cid() // cast is safe, we created these
ks = append(ks, c)
ks = append(ks, o.Cid())
}
return ks, nil
}
@ -119,7 +107,7 @@ func (s *BlockService) AddObjects(bs []Object) ([]*cid.Cid, error) {
func (s *BlockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
log.Debugf("BlockService GetBlock: '%s'", c)
block, err := s.Blockstore.Get(key.Key(c.Hash()))
block, err := s.Blockstore.Get(c)
if err == nil {
return block, nil
}
@ -128,7 +116,7 @@ func (s *BlockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block,
// TODO be careful checking ErrNotFound. If the underlying
// implementation changes, this will break.
log.Debug("Blockservice: Searching bitswap")
blk, err := s.Exchange.GetBlock(ctx, key.Key(c.Hash()))
blk, err := s.Exchange.GetBlock(ctx, c)
if err != nil {
if err == blockstore.ErrNotFound {
return nil, ErrNotFound
@ -153,12 +141,11 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc
out := make(chan blocks.Block, 0)
go func() {
defer close(out)
var misses []key.Key
var misses []*cid.Cid
for _, c := range ks {
k := key.Key(c.Hash())
hit, err := s.Blockstore.Get(k)
hit, err := s.Blockstore.Get(c)
if err != nil {
misses = append(misses, k)
misses = append(misses, c)
continue
}
log.Debug("Blockservice: Got data in datastore")
@ -191,8 +178,8 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc
}
// DeleteBlock deletes a block in the blockservice from the datastore
func (s *BlockService) DeleteObject(o Object) error {
return s.Blockstore.DeleteBlock(o.Key())
func (s *BlockService) DeleteBlock(o blocks.Block) error {
return s.Blockstore.DeleteBlock(o.Cid())
}
func (s *BlockService) Close() error {

View File

@ -2,6 +2,7 @@ package bstest
import (
"bytes"
"context"
"fmt"
"testing"
"time"
@ -10,9 +11,7 @@ import (
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
. "github.com/ipfs/go-ipfs/blockservice"
offline "github.com/ipfs/go-ipfs/exchange/offline"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
"context"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore"
@ -44,11 +43,11 @@ func TestBlocks(t *testing.T) {
t.Error("Block Multihash and data multihash not equal")
}
if o.Key() != key.Key(h) {
if !o.Cid().Equals(cid.NewCidV0(h)) {
t.Error("Block key and data multihash key not equal")
}
k, err := bs.AddObject(o)
k, err := bs.AddBlock(o)
if err != nil {
t.Error("failed to add block to BlockService", err)
return
@ -66,7 +65,7 @@ func TestBlocks(t *testing.T) {
return
}
if o.Key() != b2.Key() {
if !o.Cid().Equals(b2.Cid()) {
t.Error("Block keys not equal.")
}
@ -93,7 +92,7 @@ func TestGetBlocksSequential(t *testing.T) {
var cids []*cid.Cid
for _, o := range objs {
cids = append(cids, o.Cid())
servs[0].AddObject(o)
servs[0].AddBlock(o)
}
t.Log("one instance at a time, get blocks concurrently")
@ -102,12 +101,12 @@ func TestGetBlocksSequential(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*50)
defer cancel()
out := servs[i].GetBlocks(ctx, cids)
gotten := make(map[key.Key]blocks.Block)
gotten := make(map[string]blocks.Block)
for blk := range out {
if _, ok := gotten[blk.Key()]; ok {
if _, ok := gotten[blk.Cid().KeyString()]; ok {
t.Fatal("Got duplicate block!")
}
gotten[blk.Key()] = blk
gotten[blk.Cid().KeyString()] = blk
}
if len(gotten) != len(objs) {
t.Fatalf("Didnt get enough blocks back: %d/%d", len(gotten), len(objs))

View File

@ -8,7 +8,6 @@ import (
cmds "github.com/ipfs/go-ipfs/commands"
bitswap "github.com/ipfs/go-ipfs/exchange/bitswap"
decision "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
"gx/ipfs/QmPSBJL4momYnE7DcUyk2DVhD6rH488ZmHBGLbxNdhU44K/go-humanize"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
@ -54,7 +53,7 @@ var unwantCmd = &cmds.Command{
return
}
var ks []key.Key
var ks []*cid.Cid
for _, arg := range req.Arguments() {
c, err := cid.Decode(arg)
if err != nil {
@ -62,7 +61,7 @@ var unwantCmd = &cmds.Command{
return
}
ks = append(ks, key.Key(c.Hash()))
ks = append(ks, c)
}
bs.CancelWants(ks)
@ -164,7 +163,7 @@ var bitswapStatCmd = &cmds.Command{
fmt.Fprintf(buf, "\tdup data received: %s\n", humanize.Bytes(out.DupDataReceived))
fmt.Fprintf(buf, "\twantlist [%d keys]\n", len(out.Wantlist))
for _, k := range out.Wantlist {
fmt.Fprintf(buf, "\t\t%s\n", k.B58String())
fmt.Fprintf(buf, "\t\t%s\n", k.String())
}
fmt.Fprintf(buf, "\tpartners [%d]\n", len(out.Peers))
for _, p := range out.Peers {

View File

@ -66,7 +66,7 @@ on raw ipfs blocks. It outputs the following to stdout:
}
res.SetOutput(&BlockStat{
Key: b.Key().B58String(),
Key: b.Cid().String(),
Size: len(b.RawData()),
})
},
@ -140,9 +140,9 @@ It reads from stdin, and <key> is a base58 encoded multihash.
}
b := blocks.NewBlock(data)
log.Debugf("BlockPut key: '%q'", b.Key())
log.Debugf("BlockPut key: '%q'", b.Cid())
k, err := n.Blocks.AddObject(b)
k, err := n.Blocks.AddBlock(b)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
@ -182,7 +182,7 @@ func getBlockForKey(req cmds.Request, skey string) (blocks.Block, error) {
return nil, err
}
log.Debugf("ipfs block: got block with key: %q", b.Key())
log.Debugf("ipfs block: got block with key: %s", b.Cid())
return b, nil
}

View File

@ -17,7 +17,6 @@ import (
routing "gx/ipfs/QmXKuGUzLcgoQvp8M6ZEJzupWUNmx8NoqXEbYLMDjL4rjj/go-libp2p-routing"
notif "gx/ipfs/QmXKuGUzLcgoQvp8M6ZEJzupWUNmx8NoqXEbYLMDjL4rjj/go-libp2p-routing/notifications"
pstore "gx/ipfs/QmXXCcQ7CLg5a81Ui9TTR35QcR4y7ZyihxwfjqaHfUVcVo/go-libp2p-peerstore"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
@ -276,7 +275,7 @@ var provideRefDhtCmd = &cmds.Command{
return
}
has, err := n.Blockstore.Has(key.Key(c.Hash()))
has, err := n.Blockstore.Has(c)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return

View File

@ -609,7 +609,12 @@ stat' on the file or any of its ancestors.
return
}
defer wfd.Close()
defer func() {
err := wfd.Close()
if err != nil {
res.SetError(err, cmds.ErrNormal)
}
}()
if trunc {
if err := wfd.Truncate(0); err != nil {

View File

@ -12,7 +12,8 @@ import (
path "github.com/ipfs/go-ipfs/path"
unixfs "github.com/ipfs/go-ipfs/unixfs"
unixfspb "github.com/ipfs/go-ipfs/unixfs/pb"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
)
type LsLink struct {
@ -90,7 +91,7 @@ The JSON output contains type information.
for j, link := range dagnode.Links {
var linkNode *merkledag.Node
t := unixfspb.Data_DataType(-1)
linkKey := key.Key(link.Hash)
linkKey := cid.NewCidV0(link.Hash)
if ok, err := node.Blockstore.Has(linkKey); ok && err == nil {
b, err := node.Blockstore.Get(linkKey)
if err != nil {

View File

@ -106,7 +106,7 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
if discover {
go func() {
blk := blocks.NewBlock([]byte("floodsub:" + topic))
cid, err := n.Blocks.AddObject(blk)
cid, err := n.Blocks.AddBlock(blk)
if err != nil {
log.Error("pubsub discovery: ", err)
return

View File

@ -2,6 +2,7 @@ package commands
import (
"bytes"
"context"
"errors"
"io"
"strings"
@ -10,16 +11,14 @@ import (
"github.com/ipfs/go-ipfs/core"
dag "github.com/ipfs/go-ipfs/merkledag"
path "github.com/ipfs/go-ipfs/path"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
context "context"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
)
// KeyList is a general type for outputting lists of keys
type KeyList struct {
Keys []key.Key
Keys []*cid.Cid
}
// KeyListTextMarshaler outputs a KeyList as plaintext, one key per line
@ -27,7 +26,7 @@ func KeyListTextMarshaler(res cmds.Response) (io.Reader, error) {
output := res.Output().(*KeyList)
buf := new(bytes.Buffer)
for _, key := range output.Keys {
buf.WriteString(key.B58String() + "\n")
buf.WriteString(key.String() + "\n")
}
return buf, nil
}
@ -160,7 +159,7 @@ Displays the hashes of all local objects.
defer close(out)
for k := range allKeys {
out <- &RefWrapper{Ref: k.B58String()}
out <- &RefWrapper{Ref: k.String()}
}
}()
},

View File

@ -95,7 +95,7 @@ order to reclaim hard disk space.
buf := new(bytes.Buffer)
if quiet {
buf = bytes.NewBufferString(string(obj.Key) + "\n")
buf = bytes.NewBufferString(obj.Key.String() + "\n")
} else {
buf = bytes.NewBufferString(fmt.Sprintf("removed %s\n", obj.Key))
}

View File

@ -1,6 +1,7 @@
package corerepo
import (
"context"
"errors"
"time"
@ -8,9 +9,7 @@ import (
mfs "github.com/ipfs/go-ipfs/mfs"
gc "github.com/ipfs/go-ipfs/pin/gc"
repo "github.com/ipfs/go-ipfs/repo"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
context "context"
humanize "gx/ipfs/QmPSBJL4momYnE7DcUyk2DVhD6rH488ZmHBGLbxNdhU44K/go-humanize"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
@ -21,7 +20,7 @@ var log = logging.Logger("corerepo")
var ErrMaxStorageExceeded = errors.New("Maximum storage limit exceeded. Maybe unpin some files?")
type KeyRemoved struct {
Key key.Key
Key *cid.Cid
}
type GC struct {

View File

@ -14,7 +14,6 @@ import (
"github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs/repo/config"
"github.com/ipfs/go-ipfs/thirdparty/testutil"
"gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
"context"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
@ -94,7 +93,7 @@ func TestAddGCLive(t *testing.T) {
t.Fatal("add shouldnt complete yet")
}
var gcout <-chan key.Key
var gcout <-chan *cid.Cid
gcstarted := make(chan struct{})
go func() {
defer close(gcstarted)
@ -139,7 +138,7 @@ func TestAddGCLive(t *testing.T) {
}
for k := range gcout {
if _, ok := addedHashes[k.B58String()]; ok {
if _, ok := addedHashes[k.String()]; ok {
t.Fatal("gc'ed a hash we just added")
}
}

View File

@ -3,13 +3,12 @@
package bitswap
import (
"context"
"errors"
"math"
"sync"
"time"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
blocks "github.com/ipfs/go-ipfs/blocks"
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
exchange "github.com/ipfs/go-ipfs/exchange"
@ -19,12 +18,12 @@ import (
notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
flags "github.com/ipfs/go-ipfs/flags"
"github.com/ipfs/go-ipfs/thirdparty/delay"
loggables "gx/ipfs/QmTMy4hVSY28DdwJ9kBz6y7q6MuioFzPcpM3Ma3aPjo1i3/go-libp2p-loggables"
context "context"
process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
loggables "gx/ipfs/QmTMy4hVSY28DdwJ9kBz6y7q6MuioFzPcpM3Ma3aPjo1i3/go-libp2p-loggables"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
)
@ -90,8 +89,8 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
network: network,
findKeys: make(chan *blockRequest, sizeBatchRequestChan),
process: px,
newBlocks: make(chan key.Key, HasBlockBufferSize),
provideKeys: make(chan key.Key, provideKeysBufferSize),
newBlocks: make(chan *cid.Cid, HasBlockBufferSize),
provideKeys: make(chan *cid.Cid, provideKeysBufferSize),
wm: NewWantManager(ctx, network),
}
go bs.wm.Run()
@ -137,9 +136,9 @@ type Bitswap struct {
process process.Process
newBlocks chan key.Key
newBlocks chan *cid.Cid
provideKeys chan key.Key
provideKeys chan *cid.Cid
counterLk sync.Mutex
blocksRecvd int
@ -148,14 +147,15 @@ type Bitswap struct {
}
type blockRequest struct {
Key key.Key
Cid *cid.Cid
Ctx context.Context
}
// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (blocks.Block, error) {
if k == "" {
func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) {
if k == nil {
log.Error("nil cid in GetBlock")
return nil, blockstore.ErrNotFound
}
@ -165,18 +165,17 @@ func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (blocks.Block, er
// functions called by this one. Otherwise those functions won't return
// when this context's cancel func is executed. This is difficult to
// enforce. May this comment keep you safe.
ctx, cancelFunc := context.WithCancel(parent)
ctx = logging.ContextWithLoggable(ctx, loggables.Uuid("GetBlockRequest"))
log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k)
defer log.Event(ctx, "Bitswap.GetBlockRequest.End", &k)
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
defer log.Event(ctx, "Bitswap.GetBlockRequest.End", k)
defer func() {
cancelFunc()
}()
promise, err := bs.GetBlocks(ctx, []key.Key{k})
promise, err := bs.GetBlocks(ctx, []*cid.Cid{k})
if err != nil {
return nil, err
}
@ -197,10 +196,10 @@ func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (blocks.Block, er
}
}
func (bs *Bitswap) WantlistForPeer(p peer.ID) []key.Key {
var out []key.Key
func (bs *Bitswap) WantlistForPeer(p peer.ID) []*cid.Cid {
var out []*cid.Cid
for _, e := range bs.engine.WantlistForPeer(p) {
out = append(out, e.Key)
out = append(out, e.Cid)
}
return out
}
@ -216,7 +215,7 @@ func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks.Block, error) {
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks.Block, error) {
if len(keys) == 0 {
out := make(chan blocks.Block)
close(out)
@ -231,7 +230,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks
promise := bs.notifications.Subscribe(ctx, keys...)
for _, k := range keys {
log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k)
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
}
bs.wm.WantBlocks(ctx, keys)
@ -240,13 +239,13 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
req := &blockRequest{
Key: keys[0],
Cid: keys[0],
Ctx: ctx,
}
remaining := make(map[key.Key]struct{})
remaining := cid.NewSet()
for _, k := range keys {
remaining[k] = struct{}{}
remaining.Add(k)
}
out := make(chan blocks.Block)
@ -255,11 +254,8 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks
defer cancel()
defer close(out)
defer func() {
var toCancel []key.Key
for k, _ := range remaining {
toCancel = append(toCancel, k)
}
bs.CancelWants(toCancel)
// can't just defer this call on its own, arguments are resolved *when* the defer is created
bs.CancelWants(remaining.Keys())
}()
for {
select {
@ -268,7 +264,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks
return
}
delete(remaining, blk.Key())
remaining.Remove(blk.Cid())
select {
case out <- blk:
case <-ctx.Done():
@ -289,8 +285,8 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks
}
// CancelWant removes a given key from the wantlist
func (bs *Bitswap) CancelWants(keys []key.Key) {
bs.wm.CancelWants(keys)
func (bs *Bitswap) CancelWants(cids []*cid.Cid) {
bs.wm.CancelWants(cids)
}
// HasBlock announces the existance of a block to this bitswap service. The
@ -318,7 +314,7 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error {
bs.engine.AddBlock(blk)
select {
case bs.newBlocks <- blk.Key():
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
@ -340,13 +336,13 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
}
// quickly send out cancels, reduces chances of duplicate block receives
var keys []key.Key
var keys []*cid.Cid
for _, block := range iblocks {
if _, found := bs.wm.wl.Contains(block.Key()); !found {
if _, found := bs.wm.wl.Contains(block.Cid()); !found {
log.Infof("received un-asked-for %s from %s", block, p)
continue
}
keys = append(keys, block.Key())
keys = append(keys, block.Cid())
}
bs.wm.CancelWants(keys)
@ -360,8 +356,8 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
return // ignore error, is either logged previously, or ErrAlreadyHaveBlock
}
k := b.Key()
log.Event(ctx, "Bitswap.GetBlockRequest.End", &k)
k := b.Cid()
log.Event(ctx, "Bitswap.GetBlockRequest.End", k)
log.Debugf("got block %s from %s", b, p)
if err := bs.HasBlock(b); err != nil {
@ -378,7 +374,7 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) error {
bs.counterLk.Lock()
defer bs.counterLk.Unlock()
bs.blocksRecvd++
has, err := bs.blockstore.Has(b.Key())
has, err := bs.blockstore.Has(b.Cid())
if err != nil {
log.Infof("blockstore.Has error: %s", err)
return err
@ -415,10 +411,10 @@ func (bs *Bitswap) Close() error {
return bs.process.Close()
}
func (bs *Bitswap) GetWantlist() []key.Key {
var out []key.Key
func (bs *Bitswap) GetWantlist() []*cid.Cid {
var out []*cid.Cid
for _, e := range bs.wm.wl.Entries() {
out = append(out, e.Key)
out = append(out, e.Cid)
}
return out
}

View File

@ -2,21 +2,21 @@ package bitswap
import (
"bytes"
"context"
"sync"
"testing"
"time"
context "context"
detectrace "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race"
travis "github.com/ipfs/go-ipfs/thirdparty/testutil/ci/travis"
blocks "github.com/ipfs/go-ipfs/blocks"
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
mockrouting "github.com/ipfs/go-ipfs/routing/mock"
delay "github.com/ipfs/go-ipfs/thirdparty/delay"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
travis "github.com/ipfs/go-ipfs/thirdparty/testutil/ci/travis"
detectrace "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
p2ptestutil "gx/ipfs/QmcRa2qn6iCmap9bjp8jAwkvYAq13AUfxdY3rrYiaJbLum/go-libp2p/p2p/test/util"
)
@ -38,7 +38,7 @@ func TestClose(t *testing.T) {
bitswap := sesgen.Next()
bitswap.Exchange.Close()
bitswap.Exchange.GetBlock(context.Background(), block.Key())
bitswap.Exchange.GetBlock(context.Background(), block.Cid())
}
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
@ -57,7 +57,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
defer cancel()
_, err := solo.Exchange.GetBlock(ctx, block.Key())
_, err := solo.Exchange.GetBlock(ctx, block.Cid())
if err != context.DeadlineExceeded {
t.Fatal("Expected DeadlineExceeded error")
@ -84,7 +84,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
received, err := wantsBlock.Exchange.GetBlock(ctx, block.Cid())
if err != nil {
t.Log(err)
t.Fatal("Expected to succeed")
@ -176,10 +176,10 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
}
}
var blkeys []key.Key
var blkeys []*cid.Cid
first := instances[0]
for _, b := range blocks {
blkeys = append(blkeys, b.Key())
blkeys = append(blkeys, b.Cid())
first.Exchange.HasBlock(b)
}
@ -216,7 +216,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
for _, inst := range instances {
for _, b := range blocks {
if _, err := inst.Blockstore().Get(b.Key()); err != nil {
if _, err := inst.Blockstore().Get(b.Cid()); err != nil {
t.Fatal(err)
}
}
@ -224,8 +224,8 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
}
func getOrFail(bitswap Instance, b blocks.Block, t *testing.T, wg *sync.WaitGroup) {
if _, err := bitswap.Blockstore().Get(b.Key()); err != nil {
_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
if _, err := bitswap.Blockstore().Get(b.Cid()); err != nil {
_, err := bitswap.Exchange.GetBlock(context.Background(), b.Cid())
if err != nil {
t.Fatal(err)
}
@ -260,7 +260,7 @@ func TestSendToWantingPeer(t *testing.T) {
// peerA requests and waits for block alpha
ctx, cancel := context.WithTimeout(context.Background(), waitTime)
defer cancel()
alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []key.Key{alpha.Key()})
alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []*cid.Cid{alpha.Cid()})
if err != nil {
t.Fatal(err)
}
@ -277,7 +277,7 @@ func TestSendToWantingPeer(t *testing.T) {
t.Fatal("context timed out and broke promise channel!")
}
if blkrecvd.Key() != alpha.Key() {
if !blkrecvd.Cid().Equals(alpha.Cid()) {
t.Fatal("Wrong block!")
}
@ -292,7 +292,7 @@ func TestEmptyKey(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
_, err := bs.GetBlock(ctx, key.Key(""))
_, err := bs.GetBlock(ctx, nil)
if err != blockstore.ErrNotFound {
t.Error("empty str key should return ErrNotFound")
}
@ -315,7 +315,7 @@ func TestBasicBitswap(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key())
blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
if err != nil {
t.Fatal(err)
}
@ -341,7 +341,7 @@ func TestDoubleGet(t *testing.T) {
blocks := bg.Blocks(1)
ctx1, cancel1 := context.WithCancel(context.Background())
blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []key.Key{blocks[0].Key()})
blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []*cid.Cid{blocks[0].Cid()})
if err != nil {
t.Fatal(err)
}
@ -349,7 +349,7 @@ func TestDoubleGet(t *testing.T) {
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
blkch2, err := instances[1].Exchange.GetBlocks(ctx2, []key.Key{blocks[0].Key()})
blkch2, err := instances[1].Exchange.GetBlocks(ctx2, []*cid.Cid{blocks[0].Cid()})
if err != nil {
t.Fatal(err)
}
@ -396,9 +396,9 @@ func TestWantlistCleanup(t *testing.T) {
bswap := instances.Exchange
blocks := bg.Blocks(20)
var keys []key.Key
var keys []*cid.Cid
for _, b := range blocks {
keys = append(keys, b.Key())
keys = append(keys, b.Cid())
}
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)

View File

@ -1,12 +1,14 @@
package decision
import (
"fmt"
"math"
"testing"
"github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
"github.com/ipfs/go-ipfs/thirdparty/testutil"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
"gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
)
@ -21,6 +23,8 @@ func BenchmarkTaskQueuePush(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Push(&wantlist.Entry{Key: key.Key(i), Priority: math.MaxInt32}, peers[i%len(peers)])
c := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i))))
q.Push(&wantlist.Entry{Cid: c, Priority: math.MaxInt32}, peers[i%len(peers)])
}
}

View File

@ -169,8 +169,9 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
// with a task in hand, we're ready to prepare the envelope...
block, err := e.bs.Get(nextTask.Entry.Key)
block, err := e.bs.Get(nextTask.Entry.Cid)
if err != nil {
log.Errorf("tried to execute a task and errored fetching block: %s", err)
// If we don't have the block, don't hold that against the peer
// make sure to update that the task has been 'completed'
nextTask.Done()
@ -233,13 +234,13 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
for _, entry := range m.Wantlist() {
if entry.Cancel {
log.Debugf("%s cancel %s", p, entry.Key)
l.CancelWant(entry.Key)
e.peerRequestQueue.Remove(entry.Key, p)
log.Debugf("%s cancel %s", p, entry.Cid)
l.CancelWant(entry.Cid)
e.peerRequestQueue.Remove(entry.Cid, p)
} else {
log.Debugf("wants %s - %d", entry.Key, entry.Priority)
l.Wants(entry.Key, entry.Priority)
if exists, err := e.bs.Has(entry.Key); err == nil && exists {
log.Debugf("wants %s - %d", entry.Cid, entry.Priority)
l.Wants(entry.Cid, entry.Priority)
if exists, err := e.bs.Has(entry.Cid); err == nil && exists {
e.peerRequestQueue.Push(entry.Entry, p)
newWorkExists = true
}
@ -258,7 +259,7 @@ func (e *Engine) addBlock(block blocks.Block) {
for _, l := range e.ledgerMap {
l.lk.Lock()
if entry, ok := l.WantListContains(block.Key()); ok {
if entry, ok := l.WantListContains(block.Cid()); ok {
e.peerRequestQueue.Push(entry, l.Partner)
work = true
}
@ -287,8 +288,8 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error {
l := e.findOrCreate(p)
for _, block := range m.Blocks() {
l.SentBytes(len(block.RawData()))
l.wantList.Remove(block.Key())
e.peerRequestQueue.Remove(block.Key(), p)
l.wantList.Remove(block.Cid())
e.peerRequestQueue.Remove(block.Cid(), p)
}
return nil

View File

@ -167,7 +167,7 @@ func partnerWants(e *Engine, keys []string, partner peer.ID) {
add := message.New(false)
for i, letter := range keys {
block := blocks.NewBlock([]byte(letter))
add.AddEntry(block.Key(), math.MaxInt32-i)
add.AddEntry(block.Cid(), math.MaxInt32-i)
}
e.MessageReceived(partner, add)
}
@ -176,7 +176,7 @@ func partnerCancels(e *Engine, keys []string, partner peer.ID) {
cancels := message.New(false)
for _, k := range keys {
block := blocks.NewBlock([]byte(k))
cancels.Cancel(block.Key())
cancels.Cancel(block.Cid())
}
e.MessageReceived(partner, cancels)
}
@ -187,7 +187,7 @@ func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error {
envelope := <-next
received := envelope.Block
expected := blocks.NewBlock([]byte(k))
if received.Key() != expected.Key() {
if !received.Cid().Equals(expected.Cid()) {
return errors.New(fmt.Sprintln("received", string(received.RawData()), "expected", string(expected.RawData())))
}
}

View File

@ -5,19 +5,16 @@ import (
"time"
wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
)
// keySet is just a convenient alias for maps of keys, where we only care
// access/lookups.
type keySet map[key.Key]struct{}
func newLedger(p peer.ID) *ledger {
return &ledger{
wantList: wl.New(),
Partner: p,
sentToPeer: make(map[key.Key]time.Time),
sentToPeer: make(map[string]time.Time),
}
}
@ -44,7 +41,7 @@ type ledger struct {
// sentToPeer is a set of keys to ensure we dont send duplicate blocks
// to a given peer
sentToPeer map[key.Key]time.Time
sentToPeer map[string]time.Time
lk sync.Mutex
}
@ -78,16 +75,16 @@ func (l *ledger) ReceivedBytes(n int) {
l.Accounting.BytesRecv += uint64(n)
}
func (l *ledger) Wants(k key.Key, priority int) {
func (l *ledger) Wants(k *cid.Cid, priority int) {
log.Debugf("peer %s wants %s", l.Partner, k)
l.wantList.Add(k, priority)
}
func (l *ledger) CancelWant(k key.Key) {
func (l *ledger) CancelWant(k *cid.Cid) {
l.wantList.Remove(k)
}
func (l *ledger) WantListContains(k key.Key) (*wl.Entry, bool) {
func (l *ledger) WantListContains(k *cid.Cid) (*wl.Entry, bool) {
return l.wantList.Contains(k)
}

View File

@ -6,7 +6,8 @@ import (
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
pq "github.com/ipfs/go-ipfs/thirdparty/pq"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
)
@ -14,7 +15,7 @@ type peerRequestQueue interface {
// Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty.
Pop() *peerRequestTask
Push(entry *wantlist.Entry, to peer.ID)
Remove(k key.Key, p peer.ID)
Remove(k *cid.Cid, p peer.ID)
// NB: cannot expose simply expose taskQueue.Len because trashed elements
// may exist. These trashed elements should not contribute to the count.
@ -57,12 +58,11 @@ func (tl *prq) Push(entry *wantlist.Entry, to peer.ID) {
partner.activelk.Lock()
defer partner.activelk.Unlock()
_, ok = partner.activeBlocks[entry.Key]
if ok {
if partner.activeBlocks.Has(entry.Cid) {
return
}
if task, ok := tl.taskMap[taskKey(to, entry.Key)]; ok {
if task, ok := tl.taskMap[taskKey(to, entry.Cid)]; ok {
task.Entry.Priority = entry.Priority
partner.taskQueue.Update(task.index)
return
@ -74,7 +74,7 @@ func (tl *prq) Push(entry *wantlist.Entry, to peer.ID) {
created: time.Now(),
Done: func() {
tl.lock.Lock()
partner.TaskDone(entry.Key)
partner.TaskDone(entry.Cid)
tl.pQueue.Update(partner.Index())
tl.lock.Unlock()
},
@ -104,7 +104,7 @@ func (tl *prq) Pop() *peerRequestTask {
continue // discarding tasks that have been removed
}
partner.StartTask(out.Entry.Key)
partner.StartTask(out.Entry.Cid)
partner.requests--
break // and return |out|
}
@ -114,7 +114,7 @@ func (tl *prq) Pop() *peerRequestTask {
}
// Remove removes a task from the queue
func (tl *prq) Remove(k key.Key, p peer.ID) {
func (tl *prq) Remove(k *cid.Cid, p peer.ID) {
tl.lock.Lock()
t, ok := tl.taskMap[taskKey(p, k)]
if ok {
@ -181,7 +181,7 @@ type peerRequestTask struct {
// Key uniquely identifies a task.
func (t *peerRequestTask) Key() string {
return taskKey(t.Target, t.Entry.Key)
return taskKey(t.Target, t.Entry.Cid)
}
// Index implements pq.Elem
@ -195,8 +195,8 @@ func (t *peerRequestTask) SetIndex(i int) {
}
// taskKey returns a key that uniquely identifies a task.
func taskKey(p peer.ID, k key.Key) string {
return string(p) + string(k)
func taskKey(p peer.ID, k *cid.Cid) string {
return string(p) + k.KeyString()
}
// FIFO is a basic task comparator that returns tasks in the order created.
@ -226,7 +226,7 @@ type activePartner struct {
activelk sync.Mutex
active int
activeBlocks map[key.Key]struct{}
activeBlocks *cid.Set
// requests is the number of blocks this peer is currently requesting
// request need not be locked around as it will only be modified under
@ -245,7 +245,7 @@ type activePartner struct {
func newActivePartner() *activePartner {
return &activePartner{
taskQueue: pq.New(wrapCmp(V1)),
activeBlocks: make(map[key.Key]struct{}),
activeBlocks: cid.NewSet(),
}
}
@ -281,17 +281,17 @@ func partnerCompare(a, b pq.Elem) bool {
}
// StartTask signals that a task was started for this partner
func (p *activePartner) StartTask(k key.Key) {
func (p *activePartner) StartTask(k *cid.Cid) {
p.activelk.Lock()
p.activeBlocks[k] = struct{}{}
p.activeBlocks.Add(k)
p.active++
p.activelk.Unlock()
}
// TaskDone signals that a task was completed for this partner
func (p *activePartner) TaskDone(k key.Key) {
func (p *activePartner) TaskDone(k *cid.Cid) {
p.activelk.Lock()
delete(p.activeBlocks, k)
p.activeBlocks.Remove(k)
p.active--
if p.active < 0 {
panic("more tasks finished than started!")

View File

@ -1,6 +1,7 @@
package decision
import (
"fmt"
"math"
"math/rand"
"sort"
@ -9,7 +10,8 @@ import (
"github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
"github.com/ipfs/go-ipfs/thirdparty/testutil"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
)
func TestPushPop(t *testing.T) {
@ -41,10 +43,13 @@ func TestPushPop(t *testing.T) {
for _, index := range rand.Perm(len(alphabet)) { // add blocks for all letters
letter := alphabet[index]
t.Log(partner.String())
prq.Push(&wantlist.Entry{Key: key.Key(letter), Priority: math.MaxInt32 - index}, partner)
c := cid.NewCidV0(u.Hash([]byte(letter)))
prq.Push(&wantlist.Entry{Cid: c, Priority: math.MaxInt32 - index}, partner)
}
for _, consonant := range consonants {
prq.Remove(key.Key(consonant), partner)
c := cid.NewCidV0(u.Hash([]byte(consonant)))
prq.Remove(c, partner)
}
prq.fullThaw()
@ -56,12 +61,13 @@ func TestPushPop(t *testing.T) {
break
}
out = append(out, string(received.Entry.Key))
out = append(out, received.Entry.Cid.String())
}
// Entries popped should already be in correct order
for i, expected := range vowels {
if out[i] != expected {
exp := cid.NewCidV0(u.Hash([]byte(expected))).String()
if out[i] != exp {
t.Fatal("received", out[i], "expected", expected)
}
}
@ -78,10 +84,11 @@ func TestPeerRepeats(t *testing.T) {
// Have each push some blocks
for i := 0; i < 5; i++ {
prq.Push(&wantlist.Entry{Key: key.Key(i)}, a)
prq.Push(&wantlist.Entry{Key: key.Key(i)}, b)
prq.Push(&wantlist.Entry{Key: key.Key(i)}, c)
prq.Push(&wantlist.Entry{Key: key.Key(i)}, d)
elcid := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i))))
prq.Push(&wantlist.Entry{Cid: elcid}, a)
prq.Push(&wantlist.Entry{Cid: elcid}, b)
prq.Push(&wantlist.Entry{Cid: elcid}, c)
prq.Push(&wantlist.Entry{Cid: elcid}, d)
}
// now, pop off four entries, there should be one from each

View File

@ -1,16 +1,17 @@
package message
import (
"fmt"
"io"
blocks "github.com/ipfs/go-ipfs/blocks"
pb "github.com/ipfs/go-ipfs/exchange/bitswap/message/pb"
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
inet "gx/ipfs/QmdXimY9QHaasZmw6hWojWnCJvfgxETjZQfg9g6ZrA9wMX/go-libp2p-net"
ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
inet "gx/ipfs/QmdXimY9QHaasZmw6hWojWnCJvfgxETjZQfg9g6ZrA9wMX/go-libp2p-net"
)
// TODO move message.go into the bitswap package
@ -25,9 +26,9 @@ type BitSwapMessage interface {
Blocks() []blocks.Block
// AddEntry adds an entry to the Wantlist.
AddEntry(key key.Key, priority int)
AddEntry(key *cid.Cid, priority int)
Cancel(key key.Key)
Cancel(key *cid.Cid)
Empty() bool
@ -47,8 +48,8 @@ type Exportable interface {
type impl struct {
full bool
wantlist map[key.Key]Entry
blocks map[key.Key]blocks.Block
wantlist map[string]Entry
blocks map[string]blocks.Block
}
func New(full bool) BitSwapMessage {
@ -57,8 +58,8 @@ func New(full bool) BitSwapMessage {
func newMsg(full bool) *impl {
return &impl{
blocks: make(map[key.Key]blocks.Block),
wantlist: make(map[key.Key]Entry),
blocks: make(map[string]blocks.Block),
wantlist: make(map[string]Entry),
full: full,
}
}
@ -68,16 +69,20 @@ type Entry struct {
Cancel bool
}
func newMessageFromProto(pbm pb.Message) BitSwapMessage {
func newMessageFromProto(pbm pb.Message) (BitSwapMessage, error) {
m := newMsg(pbm.GetWantlist().GetFull())
for _, e := range pbm.GetWantlist().GetEntries() {
m.addEntry(key.Key(e.GetBlock()), int(e.GetPriority()), e.GetCancel())
c, err := cid.Cast([]byte(e.GetBlock()))
if err != nil {
return nil, fmt.Errorf("incorrectly formatted cid in wantlist: %s", err)
}
m.addEntry(c, int(e.GetPriority()), e.GetCancel())
}
for _, d := range pbm.GetBlocks() {
b := blocks.NewBlock(d)
m.AddBlock(b)
}
return m
return m, nil
}
func (m *impl) Full() bool {
@ -104,16 +109,17 @@ func (m *impl) Blocks() []blocks.Block {
return bs
}
func (m *impl) Cancel(k key.Key) {
delete(m.wantlist, k)
func (m *impl) Cancel(k *cid.Cid) {
delete(m.wantlist, k.KeyString())
m.addEntry(k, 0, true)
}
func (m *impl) AddEntry(k key.Key, priority int) {
func (m *impl) AddEntry(k *cid.Cid, priority int) {
m.addEntry(k, priority, false)
}
func (m *impl) addEntry(k key.Key, priority int, cancel bool) {
func (m *impl) addEntry(c *cid.Cid, priority int, cancel bool) {
k := c.KeyString()
e, exists := m.wantlist[k]
if exists {
e.Priority = priority
@ -121,7 +127,7 @@ func (m *impl) addEntry(k key.Key, priority int, cancel bool) {
} else {
m.wantlist[k] = Entry{
Entry: &wantlist.Entry{
Key: k,
Cid: c,
Priority: priority,
},
Cancel: cancel,
@ -130,7 +136,7 @@ func (m *impl) addEntry(k key.Key, priority int, cancel bool) {
}
func (m *impl) AddBlock(b blocks.Block) {
m.blocks[b.Key()] = b
m.blocks[b.Cid().KeyString()] = b
}
func FromNet(r io.Reader) (BitSwapMessage, error) {
@ -144,8 +150,7 @@ func FromPBReader(pbr ggio.Reader) (BitSwapMessage, error) {
return nil, err
}
m := newMessageFromProto(*pb)
return m, nil
return newMessageFromProto(*pb)
}
func (m *impl) ToProto() *pb.Message {
@ -153,7 +158,7 @@ func (m *impl) ToProto() *pb.Message {
pbm.Wantlist = new(pb.Message_Wantlist)
for _, e := range m.wantlist {
pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
Block: proto.String(string(e.Key)),
Block: proto.String(e.Cid.KeyString()),
Priority: proto.Int32(int32(e.Priority)),
Cancel: proto.Bool(e.Cancel),
})
@ -176,7 +181,7 @@ func (m *impl) ToNet(w io.Writer) error {
func (m *impl) Loggable() map[string]interface{} {
var blocks []string
for _, v := range m.blocks {
blocks = append(blocks, v.Key().B58String())
blocks = append(blocks, v.Cid().String())
}
return map[string]interface{}{
"blocks": blocks,

View File

@ -8,13 +8,18 @@ import (
blocks "github.com/ipfs/go-ipfs/blocks"
pb "github.com/ipfs/go-ipfs/exchange/bitswap/message/pb"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
)
func mkFakeCid(s string) *cid.Cid {
return cid.NewCidV0(u.Hash([]byte(s)))
}
func TestAppendWanted(t *testing.T) {
const str = "foo"
str := mkFakeCid("foo")
m := New(true)
m.AddEntry(key.Key(str), 1)
m.AddEntry(str, 1)
if !wantlistContains(m.ToProto().GetWantlist(), str) {
t.Fail()
@ -23,16 +28,20 @@ func TestAppendWanted(t *testing.T) {
}
func TestNewMessageFromProto(t *testing.T) {
const str = "a_key"
str := mkFakeCid("a_key")
protoMessage := new(pb.Message)
protoMessage.Wantlist = new(pb.Message_Wantlist)
protoMessage.Wantlist.Entries = []*pb.Message_Wantlist_Entry{
{Block: proto.String(str)},
{Block: proto.String(str.KeyString())},
}
if !wantlistContains(protoMessage.Wantlist, str) {
t.Fail()
}
m := newMessageFromProto(*protoMessage)
m, err := newMessageFromProto(*protoMessage)
if err != nil {
t.Fatal(err)
}
if !wantlistContains(m.ToProto().GetWantlist(), str) {
t.Fail()
}
@ -60,10 +69,10 @@ func TestAppendBlock(t *testing.T) {
}
func TestWantlist(t *testing.T) {
keystrs := []string{"foo", "bar", "baz", "bat"}
keystrs := []*cid.Cid{mkFakeCid("foo"), mkFakeCid("bar"), mkFakeCid("baz"), mkFakeCid("bat")}
m := New(true)
for _, s := range keystrs {
m.AddEntry(key.Key(s), 1)
m.AddEntry(s, 1)
}
exported := m.Wantlist()
@ -71,22 +80,22 @@ func TestWantlist(t *testing.T) {
present := false
for _, s := range keystrs {
if s == string(k.Key) {
if s.Equals(k.Cid) {
present = true
}
}
if !present {
t.Logf("%v isn't in original list", k.Key)
t.Logf("%v isn't in original list", k.Cid)
t.Fail()
}
}
}
func TestCopyProtoByValue(t *testing.T) {
const str = "foo"
str := mkFakeCid("foo")
m := New(true)
protoBeforeAppend := m.ToProto()
m.AddEntry(key.Key(str), 1)
m.AddEntry(str, 1)
if wantlistContains(protoBeforeAppend.GetWantlist(), str) {
t.Fail()
}
@ -94,11 +103,11 @@ func TestCopyProtoByValue(t *testing.T) {
func TestToNetFromNetPreservesWantList(t *testing.T) {
original := New(true)
original.AddEntry(key.Key("M"), 1)
original.AddEntry(key.Key("B"), 1)
original.AddEntry(key.Key("D"), 1)
original.AddEntry(key.Key("T"), 1)
original.AddEntry(key.Key("F"), 1)
original.AddEntry(mkFakeCid("M"), 1)
original.AddEntry(mkFakeCid("B"), 1)
original.AddEntry(mkFakeCid("D"), 1)
original.AddEntry(mkFakeCid("T"), 1)
original.AddEntry(mkFakeCid("F"), 1)
buf := new(bytes.Buffer)
if err := original.ToNet(buf); err != nil {
@ -110,13 +119,13 @@ func TestToNetFromNetPreservesWantList(t *testing.T) {
t.Fatal(err)
}
keys := make(map[key.Key]bool)
keys := make(map[string]bool)
for _, k := range copied.Wantlist() {
keys[k.Key] = true
keys[k.Cid.KeyString()] = true
}
for _, k := range original.Wantlist() {
if _, ok := keys[k.Key]; !ok {
if _, ok := keys[k.Cid.KeyString()]; !ok {
t.Fatalf("Key Missing: \"%v\"", k)
}
}
@ -140,21 +149,21 @@ func TestToAndFromNetMessage(t *testing.T) {
t.Fatal(err)
}
keys := make(map[key.Key]bool)
keys := make(map[string]bool)
for _, b := range m2.Blocks() {
keys[b.Key()] = true
keys[b.Cid().KeyString()] = true
}
for _, b := range original.Blocks() {
if _, ok := keys[b.Key()]; !ok {
if _, ok := keys[b.Cid().KeyString()]; !ok {
t.Fail()
}
}
}
func wantlistContains(wantlist *pb.Message_Wantlist, x string) bool {
func wantlistContains(wantlist *pb.Message_Wantlist, c *cid.Cid) bool {
for _, e := range wantlist.GetEntries() {
if e.GetBlock() == x {
if e.GetBlock() == c.KeyString() {
return true
}
}
@ -174,8 +183,8 @@ func TestDuplicates(t *testing.T) {
b := blocks.NewBlock([]byte("foo"))
msg := New(true)
msg.AddEntry(b.Key(), 1)
msg.AddEntry(b.Key(), 1)
msg.AddEntry(b.Cid(), 1)
msg.AddEntry(b.Cid(), 1)
if len(msg.Wantlist()) != 1 {
t.Fatal("Duplicate in BitSwapMessage")
}

View File

@ -1,10 +1,11 @@
package network
import (
context "context"
"context"
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
)
@ -52,8 +53,8 @@ type Receiver interface {
type Routing interface {
// FindProvidersAsync returns a channel of providers for the given key
FindProvidersAsync(context.Context, key.Key, int) <-chan peer.ID
FindProvidersAsync(context.Context, *cid.Cid, int) <-chan peer.ID
// Provide provides the key to the network
Provide(context.Context, key.Key) error
Provide(context.Context, *cid.Cid) error
}

View File

@ -10,7 +10,6 @@ import (
ma "gx/ipfs/QmUAQaWbKxGCUTuoQVvvicbQNZ9APF5pDGWyAZSe93AtKH/go-multiaddr"
routing "gx/ipfs/QmXKuGUzLcgoQvp8M6ZEJzupWUNmx8NoqXEbYLMDjL4rjj/go-libp2p-routing"
pstore "gx/ipfs/QmXXCcQ7CLg5a81Ui9TTR35QcR4y7ZyihxwfjqaHfUVcVo/go-libp2p-peerstore"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
host "gx/ipfs/QmdML3R42PRSwnt46jSuEts9bHSqLctVYEjJqMR3UYV8ki/go-libp2p-host"
@ -130,7 +129,7 @@ func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error {
}
// FindProvidersAsync returns a channel of providers for the given key
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k key.Key, max int) <-chan peer.ID {
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID {
// Since routing queries are expensive, give bitswap the peers to which we
// have open connections. Note that this may cause issues if bitswap starts
@ -147,12 +146,9 @@ func (bsnet *impl) FindProvidersAsync(ctx context.Context, k key.Key, max int) <
out <- id
}
// TEMPORARY SHIM UNTIL CID GETS PROPAGATED
c := cid.NewCidV0(k.ToMultihash())
go func() {
defer close(out)
providers := bsnet.routing.FindProvidersAsync(ctx, c, max)
providers := bsnet.routing.FindProvidersAsync(ctx, k, max)
for info := range providers {
if info.ID == bsnet.host.ID() {
continue // ignore self as provider
@ -169,9 +165,8 @@ func (bsnet *impl) FindProvidersAsync(ctx context.Context, k key.Key, max int) <
}
// Provide provides the key to the network
func (bsnet *impl) Provide(ctx context.Context, k key.Key) error {
c := cid.NewCidV0(k.ToMultihash())
return bsnet.routing.Provide(ctx, c)
func (bsnet *impl) Provide(ctx context.Context, k *cid.Cid) error {
return bsnet.routing.Provide(ctx, k)
}
// handleNewStream receives a new stream from the network.

View File

@ -1,17 +1,19 @@
package notifications
import (
context "context"
pubsub "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/pubsub"
"context"
blocks "github.com/ipfs/go-ipfs/blocks"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
pubsub "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/pubsub"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
)
const bufferSize = 16
type PubSub interface {
Publish(block blocks.Block)
Subscribe(ctx context.Context, keys ...key.Key) <-chan blocks.Block
Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.Block
Shutdown()
}
@ -24,8 +26,7 @@ type impl struct {
}
func (ps *impl) Publish(block blocks.Block) {
topic := string(block.Key())
ps.wrapped.Pub(block, topic)
ps.wrapped.Pub(block, block.Cid().KeyString())
}
func (ps *impl) Shutdown() {
@ -35,7 +36,7 @@ func (ps *impl) Shutdown() {
// Subscribe returns a channel of blocks for the given |keys|. |blockChannel|
// is closed if the |ctx| times out or is cancelled, or after sending len(keys)
// blocks.
func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan blocks.Block {
func (ps *impl) Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.Block {
blocksCh := make(chan blocks.Block, len(keys))
valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
@ -71,10 +72,10 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan blocks.Bl
return blocksCh
}
func toStrings(keys []key.Key) []string {
func toStrings(keys []*cid.Cid) []string {
strs := make([]string, 0)
for _, key := range keys {
strs = append(strs, string(key))
strs = append(strs, key.KeyString())
}
return strs
}

View File

@ -2,13 +2,13 @@ package notifications
import (
"bytes"
"context"
"testing"
"time"
context "context"
blocks "github.com/ipfs/go-ipfs/blocks"
blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
)
func TestDuplicates(t *testing.T) {
@ -17,7 +17,7 @@ func TestDuplicates(t *testing.T) {
n := New()
defer n.Shutdown()
ch := n.Subscribe(context.Background(), b1.Key(), b2.Key())
ch := n.Subscribe(context.Background(), b1.Cid(), b2.Cid())
n.Publish(b1)
blockRecvd, ok := <-ch
@ -41,7 +41,7 @@ func TestPublishSubscribe(t *testing.T) {
n := New()
defer n.Shutdown()
ch := n.Subscribe(context.Background(), blockSent.Key())
ch := n.Subscribe(context.Background(), blockSent.Cid())
n.Publish(blockSent)
blockRecvd, ok := <-ch
@ -59,7 +59,7 @@ func TestSubscribeMany(t *testing.T) {
n := New()
defer n.Shutdown()
ch := n.Subscribe(context.Background(), e1.Key(), e2.Key())
ch := n.Subscribe(context.Background(), e1.Cid(), e2.Cid())
n.Publish(e1)
r1, ok := <-ch
@ -83,8 +83,8 @@ func TestDuplicateSubscribe(t *testing.T) {
n := New()
defer n.Shutdown()
ch1 := n.Subscribe(context.Background(), e1.Key())
ch2 := n.Subscribe(context.Background(), e1.Key())
ch1 := n.Subscribe(context.Background(), e1.Cid())
ch2 := n.Subscribe(context.Background(), e1.Cid())
n.Publish(e1)
r1, ok := <-ch1
@ -118,7 +118,7 @@ func TestCarryOnWhenDeadlineExpires(t *testing.T) {
n := New()
defer n.Shutdown()
block := blocks.NewBlock([]byte("A Missed Connection"))
blockChannel := n.Subscribe(fastExpiringCtx, block.Key())
blockChannel := n.Subscribe(fastExpiringCtx, block.Cid())
assertBlockChannelNil(t, blockChannel)
}
@ -132,10 +132,10 @@ func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) {
t.Log("generate a large number of blocks. exceed default buffer")
bs := g.Blocks(1000)
ks := func() []key.Key {
var keys []key.Key
ks := func() []*cid.Cid {
var keys []*cid.Cid
for _, b := range bs {
keys = append(keys, b.Key())
keys = append(keys, b.Cid())
}
return keys
}()
@ -162,7 +162,7 @@ func assertBlocksEqual(t *testing.T, a, b blocks.Block) {
if !bytes.Equal(a.RawData(), b.RawData()) {
t.Fatal("blocks aren't equal")
}
if a.Key() != b.Key() {
if a.Cid() != b.Cid() {
t.Fatal("block keys aren't equal")
}
}

View File

@ -1,13 +1,14 @@
package bitswap
import (
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
"sort"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
)
type Stat struct {
ProvideBufLen int
Wantlist []key.Key
Wantlist []*cid.Cid
Peers []string
BlocksReceived int
DupBlksReceived int

View File

@ -10,7 +10,6 @@ import (
delay "github.com/ipfs/go-ipfs/thirdparty/delay"
testutil "github.com/ipfs/go-ipfs/thirdparty/testutil"
routing "gx/ipfs/QmXKuGUzLcgoQvp8M6ZEJzupWUNmx8NoqXEbYLMDjL4rjj/go-libp2p-routing"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
)
@ -92,18 +91,17 @@ func (nc *networkClient) SendMessage(
}
// FindProvidersAsync returns a channel of providers for the given key
func (nc *networkClient) FindProvidersAsync(ctx context.Context, k key.Key, max int) <-chan peer.ID {
func (nc *networkClient) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID {
// NB: this function duplicates the PeerInfo -> ID transformation in the
// bitswap network adapter. Not to worry. This network client will be
// deprecated once the ipfsnet.Mock is added. The code below is only
// temporary.
c := cid.NewCidV0(k.ToMultihash())
out := make(chan peer.ID)
go func() {
defer close(out)
providers := nc.routing.FindProvidersAsync(ctx, c, max)
providers := nc.routing.FindProvidersAsync(ctx, k, max)
for info := range providers {
select {
case <-ctx.Done():
@ -139,9 +137,8 @@ func (n *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet.
}
// Provide provides the key to the network
func (nc *networkClient) Provide(ctx context.Context, k key.Key) error {
c := cid.NewCidV0(k.ToMultihash())
return nc.routing.Provide(ctx, c)
func (nc *networkClient) Provide(ctx context.Context, k *cid.Cid) error {
return nc.routing.Provide(ctx, k)
}
func (nc *networkClient) SetDelegate(r bsnet.Receiver) {

View File

@ -6,7 +6,7 @@ import (
"sort"
"sync"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
)
type ThreadSafe struct {
@ -16,11 +16,11 @@ type ThreadSafe struct {
// not threadsafe
type Wantlist struct {
set map[key.Key]*Entry
set map[string]*Entry
}
type Entry struct {
Key key.Key
Cid *cid.Cid
Priority int
RefCnt int
@ -40,11 +40,11 @@ func NewThreadSafe() *ThreadSafe {
func New() *Wantlist {
return &Wantlist{
set: make(map[key.Key]*Entry),
set: make(map[string]*Entry),
}
}
func (w *ThreadSafe) Add(k key.Key, priority int) bool {
func (w *ThreadSafe) Add(k *cid.Cid, priority int) bool {
w.lk.Lock()
defer w.lk.Unlock()
return w.Wantlist.Add(k, priority)
@ -56,13 +56,13 @@ func (w *ThreadSafe) AddEntry(e *Entry) bool {
return w.Wantlist.AddEntry(e)
}
func (w *ThreadSafe) Remove(k key.Key) bool {
func (w *ThreadSafe) Remove(k *cid.Cid) bool {
w.lk.Lock()
defer w.lk.Unlock()
return w.Wantlist.Remove(k)
}
func (w *ThreadSafe) Contains(k key.Key) (*Entry, bool) {
func (w *ThreadSafe) Contains(k *cid.Cid) (*Entry, bool) {
w.lk.RLock()
defer w.lk.RUnlock()
return w.Wantlist.Contains(k)
@ -90,14 +90,15 @@ func (w *Wantlist) Len() int {
return len(w.set)
}
func (w *Wantlist) Add(k key.Key, priority int) bool {
func (w *Wantlist) Add(c *cid.Cid, priority int) bool {
k := c.KeyString()
if e, ok := w.set[k]; ok {
e.RefCnt++
return false
}
w.set[k] = &Entry{
Key: k,
Cid: c,
Priority: priority,
RefCnt: 1,
}
@ -106,15 +107,17 @@ func (w *Wantlist) Add(k key.Key, priority int) bool {
}
func (w *Wantlist) AddEntry(e *Entry) bool {
if ex, ok := w.set[e.Key]; ok {
k := e.Cid.KeyString()
if ex, ok := w.set[k]; ok {
ex.RefCnt++
return false
}
w.set[e.Key] = e
w.set[k] = e
return true
}
func (w *Wantlist) Remove(k key.Key) bool {
func (w *Wantlist) Remove(c *cid.Cid) bool {
k := c.KeyString()
e, ok := w.set[k]
if !ok {
return false
@ -128,8 +131,8 @@ func (w *Wantlist) Remove(k key.Key) bool {
return false
}
func (w *Wantlist) Contains(k key.Key) (*Entry, bool) {
e, ok := w.set[k]
func (w *Wantlist) Contains(k *cid.Cid) (*Entry, bool) {
e, ok := w.set[k.KeyString()]
return e, ok
}

View File

@ -1,15 +1,15 @@
package bitswap
import (
"context"
"sync"
"time"
context "context"
engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
)
@ -51,7 +51,7 @@ type msgPair struct {
type cancellation struct {
who peer.ID
blk key.Key
blk *cid.Cid
}
type msgQueue struct {
@ -69,23 +69,23 @@ type msgQueue struct {
done chan struct{}
}
func (pm *WantManager) WantBlocks(ctx context.Context, ks []key.Key) {
func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid) {
log.Infof("want blocks: %s", ks)
pm.addEntries(ctx, ks, false)
}
func (pm *WantManager) CancelWants(ks []key.Key) {
func (pm *WantManager) CancelWants(ks []*cid.Cid) {
log.Infof("cancel wants: %s", ks)
pm.addEntries(context.TODO(), ks, true)
}
func (pm *WantManager) addEntries(ctx context.Context, ks []key.Key, cancel bool) {
func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, cancel bool) {
var entries []*bsmsg.Entry
for i, k := range ks {
entries = append(entries, &bsmsg.Entry{
Cancel: cancel,
Entry: &wantlist.Entry{
Key: k,
Cid: k,
Priority: kMaxPriority - i,
RefCnt: 1,
},
@ -130,7 +130,7 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
// new peer, we will want to give them our full wantlist
fullwantlist := bsmsg.New(true)
for _, e := range pm.wl.Entries() {
fullwantlist.AddEntry(e.Key, e.Priority)
fullwantlist.AddEntry(e.Cid, e.Priority)
}
mq.out = fullwantlist
mq.work <- struct{}{}
@ -246,7 +246,7 @@ func (pm *WantManager) Run() {
var filtered []*bsmsg.Entry
for _, e := range entries {
if e.Cancel {
if pm.wl.Remove(e.Key) {
if pm.wl.Remove(e.Cid) {
filtered = append(filtered, e)
}
} else {
@ -323,9 +323,9 @@ func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
// one passed in
for _, e := range entries {
if e.Cancel {
mq.out.Cancel(e.Key)
mq.out.Cancel(e.Cid)
} else {
mq.out.AddEntry(e.Key, e.Priority)
mq.out.AddEntry(e.Cid, e.Priority)
}
}
}

View File

@ -1,15 +1,15 @@
package bitswap
import (
"context"
"math/rand"
"sync"
"time"
context "context"
process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
)
@ -77,7 +77,7 @@ func (bs *Bitswap) provideWorker(px process.Process) {
limit := make(chan struct{}, provideWorkerMax)
limitedGoProvide := func(k key.Key, wid int) {
limitedGoProvide := func(k *cid.Cid, wid int) {
defer func() {
// replace token when done
<-limit
@ -85,7 +85,7 @@ func (bs *Bitswap) provideWorker(px process.Process) {
ev := logging.LoggableMap{"ID": wid}
ctx := procctx.OnClosingContext(px) // derive ctx from px
defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done()
defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, k).Done()
ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx
defer cancel()
@ -121,9 +121,9 @@ func (bs *Bitswap) provideWorker(px process.Process) {
func (bs *Bitswap) provideCollector(ctx context.Context) {
defer close(bs.provideKeys)
var toProvide []key.Key
var nextKey key.Key
var keysOut chan key.Key
var toProvide []*cid.Cid
var nextKey *cid.Cid
var keysOut chan *cid.Cid
for {
select {
@ -181,7 +181,7 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
// for new providers for blocks.
i := rand.Intn(len(entries))
bs.findKeys <- &blockRequest{
Key: entries[i].Key,
Cid: entries[i].Cid,
Ctx: ctx,
}
case <-parent.Done():
@ -192,23 +192,23 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
func (bs *Bitswap) providerQueryManager(ctx context.Context) {
var activeLk sync.Mutex
kset := key.NewKeySet()
kset := cid.NewSet()
for {
select {
case e := <-bs.findKeys:
activeLk.Lock()
if kset.Has(e.Key) {
if kset.Has(e.Cid) {
activeLk.Unlock()
continue
}
kset.Add(e.Key)
kset.Add(e.Cid)
activeLk.Unlock()
go func(e *blockRequest) {
child, cancel := context.WithTimeout(e.Ctx, providerRequestTimeout)
defer cancel()
providers := bs.network.FindProvidersAsync(child, e.Key, maxProvidersPerRequest)
providers := bs.network.FindProvidersAsync(child, e.Cid, maxProvidersPerRequest)
wg := &sync.WaitGroup{}
for p := range providers {
wg.Add(1)
@ -222,7 +222,7 @@ func (bs *Bitswap) providerQueryManager(ctx context.Context) {
}
wg.Wait()
activeLk.Lock()
kset.Remove(e.Key)
kset.Remove(e.Cid)
activeLk.Unlock()
}(e)

View File

@ -2,21 +2,21 @@
package exchange
import (
"context"
"io"
blocks "github.com/ipfs/go-ipfs/blocks"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
context "context"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
)
// Any type that implements exchange.Interface may be used as an IPFS block
// exchange protocol.
type Interface interface { // type Exchanger interface
// GetBlock returns the block associated with a given key.
GetBlock(context.Context, key.Key) (blocks.Block, error)
GetBlock(context.Context, *cid.Cid) (blocks.Block, error)
GetBlocks(context.Context, []key.Key) (<-chan blocks.Block, error)
GetBlocks(context.Context, []*cid.Cid) (<-chan blocks.Block, error)
// TODO Should callers be concerned with whether the block was made
// available on the network?

View File

@ -3,12 +3,13 @@
package offline
import (
"context"
blocks "github.com/ipfs/go-ipfs/blocks"
"github.com/ipfs/go-ipfs/blocks/blockstore"
exchange "github.com/ipfs/go-ipfs/exchange"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
context "context"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
)
func Exchange(bs blockstore.Blockstore) exchange.Interface {
@ -24,7 +25,7 @@ type offlineExchange struct {
// GetBlock returns nil to signal that a block could not be retrieved for the
// given key.
// NB: This function may return before the timeout expires.
func (e *offlineExchange) GetBlock(_ context.Context, k key.Key) (blocks.Block, error) {
func (e *offlineExchange) GetBlock(_ context.Context, k *cid.Cid) (blocks.Block, error) {
return e.bs.Get(k)
}
@ -40,11 +41,11 @@ func (_ *offlineExchange) Close() error {
return nil
}
func (e *offlineExchange) GetBlocks(ctx context.Context, ks []key.Key) (<-chan blocks.Block, error) {
func (e *offlineExchange) GetBlocks(ctx context.Context, ks []*cid.Cid) (<-chan blocks.Block, error) {
out := make(chan blocks.Block, 0)
go func() {
defer close(out)
var misses []key.Key
var misses []*cid.Cid
for _, k := range ks {
hit, err := e.bs.Get(k)
if err != nil {

View File

@ -1,20 +1,23 @@
package offline
import (
"context"
"testing"
context "context"
blocks "github.com/ipfs/go-ipfs/blocks"
"github.com/ipfs/go-ipfs/blocks/blockstore"
"github.com/ipfs/go-ipfs/blocks/blocksutil"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore"
ds_sync "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync"
)
func TestBlockReturnsErr(t *testing.T) {
off := Exchange(bstore())
_, err := off.GetBlock(context.Background(), key.Key("foo"))
c := cid.NewCidV0(u.Hash([]byte("foo")))
_, err := off.GetBlock(context.Background(), c)
if err != nil {
return // as desired
}
@ -31,7 +34,7 @@ func TestHasBlockReturnsNil(t *testing.T) {
t.Fail()
}
if _, err := store.Get(block.Key()); err != nil {
if _, err := store.Get(block.Cid()); err != nil {
t.Fatal(err)
}
}
@ -49,11 +52,11 @@ func TestGetBlocks(t *testing.T) {
}
}
request := func() []key.Key {
var ks []key.Key
request := func() []*cid.Cid {
var ks []*cid.Cid
for _, b := range expected {
ks = append(ks, b.Key())
ks = append(ks, b.Cid())
}
return ks
}()

View File

@ -9,7 +9,6 @@ import (
backoff "gx/ipfs/QmPJUtEJsm5YLUWhF6imvyCH8KZXRJa9Wup7FDMwTy5Ufz/backoff"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
routing "gx/ipfs/QmXKuGUzLcgoQvp8M6ZEJzupWUNmx8NoqXEbYLMDjL4rjj/go-libp2p-routing"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
)
var log = logging.Logger("reprovider")
@ -53,8 +52,7 @@ func (rp *Reprovider) Reprovide(ctx context.Context) error {
if err != nil {
return fmt.Errorf("Failed to get key chan from blockstore: %s", err)
}
for k := range keychan {
c := cid.NewCidV0(k.ToMultihash())
for c := range keychan {
op := func() error {
err := rp.rsys.Provide(ctx, c)
if err != nil {

View File

@ -4,7 +4,6 @@ import (
"bytes"
"fmt"
"github.com/ipfs/go-ipfs/blocks"
"gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
"gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
"io"
"testing"
@ -39,10 +38,10 @@ func TestRabinChunking(t *testing.T) {
}
}
func chunkData(t *testing.T, data []byte) map[key.Key]blocks.Block {
func chunkData(t *testing.T, data []byte) map[string]blocks.Block {
r := NewRabin(bytes.NewReader(data), 1024*256)
blkmap := make(map[key.Key]blocks.Block)
blkmap := make(map[string]blocks.Block)
for {
blk, err := r.NextBytes()
@ -54,7 +53,7 @@ func chunkData(t *testing.T, data []byte) map[key.Key]blocks.Block {
}
b := blocks.NewBlock(blk)
blkmap[b.Key()] = b
blkmap[b.Cid().KeyString()] = b
}
return blkmap

View File

@ -2,15 +2,15 @@
package merkledag
import (
"context"
"fmt"
"strings"
"sync"
blocks "github.com/ipfs/go-ipfs/blocks"
bserv "github.com/ipfs/go-ipfs/blockservice"
offline "github.com/ipfs/go-ipfs/exchange/offline"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
"context"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
)
@ -60,7 +60,7 @@ func (n *dagService) Add(nd *Node) (*cid.Cid, error) {
return nil, fmt.Errorf("dagService is nil")
}
return n.Blocks.AddObject(nd)
return n.Blocks.AddBlock(nd)
}
func (n *dagService) Batch() *Batch {
@ -122,7 +122,7 @@ func (n *dagService) GetOfflineLinkService() LinkService {
}
func (n *dagService) Remove(nd *Node) error {
return n.Blocks.DeleteObject(nd)
return n.Blocks.DeleteBlock(nd)
}
// FetchGraph fetches all nodes that are children of the given node
@ -147,27 +147,11 @@ type NodeOption struct {
Err error
}
// TODO: this is a mid-term hack to get around the fact that blocks don't
// have full CIDs and potentially (though we don't know of any such scenario)
// may have the same block with multiple different encodings.
// We have discussed the possiblity of using CIDs as datastore keys
// in the future. This would be a much larger changeset than i want to make
// right now.
func cidsToKeyMapping(cids []*cid.Cid) map[key.Key]*cid.Cid {
mapping := make(map[key.Key]*cid.Cid)
for _, c := range cids {
mapping[key.Key(c.Hash())] = c
}
return mapping
}
func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
out := make(chan *NodeOption, len(keys))
blocks := ds.Blocks.GetBlocks(ctx, keys)
var count int
mapping := cidsToKeyMapping(keys)
go func() {
defer close(out)
for {
@ -180,7 +164,7 @@ func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *Node
return
}
c := mapping[b.Key()]
c := b.Cid()
var nd *Node
switch c.Type() {
@ -361,7 +345,7 @@ func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
type Batch struct {
ds *dagService
objects []bserv.Object
blocks []blocks.Block
size int
MaxSize int
}
@ -372,7 +356,7 @@ func (t *Batch) Add(nd *Node) (*cid.Cid, error) {
return nil, err
}
t.objects = append(t.objects, nd)
t.blocks = append(t.blocks, nd)
t.size += len(d)
if t.size > t.MaxSize {
return nd.Cid(), t.Commit()
@ -381,8 +365,8 @@ func (t *Batch) Add(nd *Node) (*cid.Cid, error) {
}
func (t *Batch) Commit() error {
_, err := t.ds.Blocks.AddObjects(t.objects)
t.objects = nil
_, err := t.ds.Blocks.AddBlocks(t.blocks)
t.blocks = nil
t.size = 0
return err
}

View File

@ -1,6 +1,7 @@
package mfs
import (
"context"
"fmt"
"sync"
@ -8,8 +9,6 @@ import (
dag "github.com/ipfs/go-ipfs/merkledag"
ft "github.com/ipfs/go-ipfs/unixfs"
mod "github.com/ipfs/go-ipfs/unixfs/mod"
context "context"
)
type File struct {

View File

@ -1,12 +1,12 @@
package gc
import (
"context"
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
dag "github.com/ipfs/go-ipfs/merkledag"
pin "github.com/ipfs/go-ipfs/pin"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
context "context"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
)
@ -22,7 +22,7 @@ var log = logging.Logger("gc")
//
// The routine then iterates over every block in the blockstore and
// deletes any block that is not found in the marked set.
func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan key.Key, error) {
func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan *cid.Cid, error) {
unlocker := bs.GCLock()
ls = ls.GetOfflineLinkService()
@ -37,7 +37,7 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.
return nil, err
}
output := make(chan key.Key)
output := make(chan *cid.Cid)
go func() {
defer close(output)
defer unlocker.Unlock()
@ -68,20 +68,12 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.
return output, nil
}
func Descendants(ctx context.Context, ls dag.LinkService, set key.KeySet, roots []*cid.Cid, bestEffort bool) error {
func Descendants(ctx context.Context, ls dag.LinkService, set *cid.Set, roots []*cid.Cid, bestEffort bool) error {
for _, c := range roots {
set.Add(key.Key(c.Hash()))
set.Add(c)
// EnumerateChildren recursively walks the dag and adds the keys to the given set
err := dag.EnumerateChildren(ctx, ls, c, func(c *cid.Cid) bool {
k := key.Key(c.Hash())
seen := set.Has(k)
if seen {
return false
}
set.Add(k)
return true
}, bestEffort)
err := dag.EnumerateChildren(ctx, ls, c, set.Visit, bestEffort)
if err != nil {
return err
}
@ -90,10 +82,10 @@ func Descendants(ctx context.Context, ls dag.LinkService, set key.KeySet, roots
return nil
}
func ColoredSet(ctx context.Context, pn pin.Pinner, ls dag.LinkService, bestEffortRoots []*cid.Cid) (key.KeySet, error) {
func ColoredSet(ctx context.Context, pn pin.Pinner, ls dag.LinkService, bestEffortRoots []*cid.Cid) (*cid.Set, error) {
// KeySet currently implemented in memory, in the future, may be bloom filter or
// disk backed to conserve memory.
gcs := key.NewKeySet()
gcs := cid.NewSet()
err := Descendants(ctx, ls, gcs, pn.RecursiveKeys(), false)
if err != nil {
return nil, err
@ -105,7 +97,7 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ls dag.LinkService, bestEffo
}
for _, k := range pn.DirectKeys() {
gcs.Add(key.Key(k.Hash()))
gcs.Add(k)
}
err = Descendants(ctx, ls, gcs, pn.InternalPins(), false)

View File

@ -76,7 +76,7 @@ func TestBitswapWithoutRouting(t *testing.T) {
} else if !bytes.Equal(b.RawData(), block0.RawData()) {
t.Error("byte comparison fail")
} else {
log.Debug("got block: %s", b.Key())
log.Debug("got block: %s", b.Cid())
}
}
@ -93,7 +93,7 @@ func TestBitswapWithoutRouting(t *testing.T) {
} else if !bytes.Equal(b.RawData(), block1.RawData()) {
t.Error("byte comparison fail")
} else {
log.Debug("got block: %s", b.Key())
log.Debug("got block: %s", b.Cid())
}
}
}

View File

@ -9,3 +9,7 @@ import (
func NewKeyFromBinary(s string) ds.Key {
return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}
func BinaryFromDsKey(k ds.Key) ([]byte, error) {
return base32.RawStdEncoding.DecodeString(k.String()[1:])
}