mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-12 11:48:07 +08:00
Merge pull request #237 from jbenet/dont-write-twice
blockservice: dont write blocks twice
This commit is contained in:
commit
461e5a3c2c
@ -37,13 +37,26 @@ func NewBlockService(d ds.Datastore, rem exchange.Interface) (*BlockService, err
|
||||
// AddBlock adds a particular block to the service, Putting it into the datastore.
|
||||
func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
|
||||
k := b.Key()
|
||||
log.Debug("blockservice: storing [%s] in datastore", k)
|
||||
// TODO(brian): define a block datastore with a Put method which accepts a
|
||||
// block parameter
|
||||
err := s.Datastore.Put(k.DsKey(), b.Data)
|
||||
|
||||
// check if we have it before adding. this is an extra read, but large writes
|
||||
// are more expensive.
|
||||
// TODO(jbenet) cheaper has. https://github.com/jbenet/go-datastore/issues/6
|
||||
has, err := s.Datastore.Has(k.DsKey())
|
||||
if err != nil {
|
||||
return k, err
|
||||
}
|
||||
if has {
|
||||
log.Debugf("blockservice: storing [%s] in datastore (already stored)", k)
|
||||
} else {
|
||||
log.Debugf("blockservice: storing [%s] in datastore", k)
|
||||
err := s.Datastore.Put(k.DsKey(), b.Data)
|
||||
if err != nil {
|
||||
return k, err
|
||||
}
|
||||
}
|
||||
|
||||
if s.Remote != nil {
|
||||
ctx := context.TODO()
|
||||
err = s.Remote.HasBlock(ctx, *b)
|
||||
|
||||
53
importer/chunk/splitting_test.go
Normal file
53
importer/chunk/splitting_test.go
Normal file
@ -0,0 +1,53 @@
|
||||
package chunk
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func randBuf(t *testing.T, size int) []byte {
|
||||
buf := make([]byte, size)
|
||||
if _, err := rand.Read(buf); err != nil {
|
||||
t.Fatal("failed to read enough randomness")
|
||||
}
|
||||
return buf
|
||||
}
|
||||
|
||||
func copyBuf(buf []byte) []byte {
|
||||
cpy := make([]byte, len(buf))
|
||||
copy(cpy, buf)
|
||||
return cpy
|
||||
}
|
||||
|
||||
func TestSizeSplitterIsDeterministic(t *testing.T) {
|
||||
|
||||
test := func() {
|
||||
bufR := randBuf(t, 10000000) // crank this up to satisfy yourself.
|
||||
bufA := copyBuf(bufR)
|
||||
bufB := copyBuf(bufR)
|
||||
|
||||
chunksA := DefaultSplitter.Split(bytes.NewReader(bufA))
|
||||
chunksB := DefaultSplitter.Split(bytes.NewReader(bufB))
|
||||
|
||||
for n := 0; ; n++ {
|
||||
a, moreA := <-chunksA
|
||||
b, moreB := <-chunksB
|
||||
|
||||
if !moreA {
|
||||
if moreB {
|
||||
t.Fatal("A ended, B didnt.")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if !bytes.Equal(a, b) {
|
||||
t.Fatalf("chunk %d not equal", n)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for run := 0; run < 1; run++ { // crank this up to satisfy yourself.
|
||||
test()
|
||||
}
|
||||
}
|
||||
@ -12,7 +12,7 @@ import (
|
||||
func randNode() (*mdag.Node, util.Key) {
|
||||
nd := new(mdag.Node)
|
||||
nd.Data = make([]byte, 32)
|
||||
util.NewFastRand().Read(nd.Data)
|
||||
util.NewTimeSeededRand().Read(nd.Data)
|
||||
k, _ := nd.Key()
|
||||
return nd, k
|
||||
}
|
||||
|
||||
@ -28,7 +28,7 @@ func getMockDagServ(t *testing.T) mdag.DAGService {
|
||||
func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Node) {
|
||||
dw := NewDagWriter(dserv, &chunk.SizeSplitter{500})
|
||||
|
||||
n, err := io.CopyN(dw, u.NewFastRand(), size)
|
||||
n, err := io.CopyN(dw, u.NewTimeSeededRand(), size)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -58,7 +58,7 @@ func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Nod
|
||||
|
||||
func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm *DagModifier) []byte {
|
||||
newdata := make([]byte, size)
|
||||
r := u.NewFastRand()
|
||||
r := u.NewTimeSeededRand()
|
||||
r.Read(newdata)
|
||||
|
||||
if size+beg > uint64(len(orig)) {
|
||||
@ -160,7 +160,7 @@ func TestMultiWrite(t *testing.T) {
|
||||
}
|
||||
|
||||
data := make([]byte, 4000)
|
||||
u.NewFastRand().Read(data)
|
||||
u.NewTimeSeededRand().Read(data)
|
||||
|
||||
for i := 0; i < len(data); i++ {
|
||||
n, err := dagmod.WriteAt(data[i:i+1], uint64(i))
|
||||
@ -201,7 +201,7 @@ func TestMultiWriteCoal(t *testing.T) {
|
||||
}
|
||||
|
||||
data := make([]byte, 4000)
|
||||
u.NewFastRand().Read(data)
|
||||
u.NewTimeSeededRand().Read(data)
|
||||
|
||||
for i := 0; i < len(data); i++ {
|
||||
n, err := dagmod.WriteAt(data[:i+1], 0)
|
||||
|
||||
73
util/util.go
73
util/util.go
@ -60,65 +60,46 @@ func NewByteChanReader(in chan []byte) io.Reader {
|
||||
return &byteChanReader{in: in}
|
||||
}
|
||||
|
||||
func (bcr *byteChanReader) Read(b []byte) (int, error) {
|
||||
if len(bcr.buf) == 0 {
|
||||
data, ok := <-bcr.in
|
||||
if !ok {
|
||||
return 0, io.EOF
|
||||
func (bcr *byteChanReader) Read(output []byte) (int, error) {
|
||||
remain := output
|
||||
remainLen := len(output)
|
||||
outputLen := 0
|
||||
more := false
|
||||
next := bcr.buf
|
||||
|
||||
for {
|
||||
n := copy(remain, next)
|
||||
remainLen -= n
|
||||
outputLen += n
|
||||
if remainLen == 0 {
|
||||
bcr.buf = next[n:]
|
||||
return outputLen, nil
|
||||
}
|
||||
bcr.buf = data
|
||||
}
|
||||
|
||||
if len(bcr.buf) >= len(b) {
|
||||
copy(b, bcr.buf)
|
||||
bcr.buf = bcr.buf[len(b):]
|
||||
return len(b), nil
|
||||
}
|
||||
|
||||
copy(b, bcr.buf)
|
||||
b = b[len(bcr.buf):]
|
||||
totread := len(bcr.buf)
|
||||
|
||||
for data := range bcr.in {
|
||||
if len(data) > len(b) {
|
||||
totread += len(b)
|
||||
copy(b, data[:len(b)])
|
||||
bcr.buf = data[len(b):]
|
||||
return totread, nil
|
||||
}
|
||||
copy(b, data)
|
||||
totread += len(data)
|
||||
b = b[len(data):]
|
||||
if len(b) == 0 {
|
||||
return totread, nil
|
||||
remain = remain[n:]
|
||||
next, more = <-bcr.in
|
||||
if !more {
|
||||
return outputLen, io.EOF
|
||||
}
|
||||
}
|
||||
return totread, io.EOF
|
||||
}
|
||||
|
||||
type randGen struct {
|
||||
src rand.Source
|
||||
rand.Rand
|
||||
}
|
||||
|
||||
func NewFastRand() io.Reader {
|
||||
return &randGen{rand.NewSource(time.Now().UnixNano())}
|
||||
func NewTimeSeededRand() io.Reader {
|
||||
src := rand.NewSource(time.Now().UnixNano())
|
||||
return &randGen{
|
||||
Rand: *rand.New(src),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *randGen) Read(p []byte) (n int, err error) {
|
||||
todo := len(p)
|
||||
offset := 0
|
||||
for {
|
||||
val := int64(r.src.Int63())
|
||||
for i := 0; i < 8; i++ {
|
||||
p[offset] = byte(val & 0xff)
|
||||
todo--
|
||||
if todo == 0 {
|
||||
return len(p), nil
|
||||
}
|
||||
offset++
|
||||
val >>= 8
|
||||
}
|
||||
for i := 0; i < len(p); i++ {
|
||||
p[i] = byte(r.Rand.Intn(255))
|
||||
}
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
// GetenvBool is the way to check an env var as a boolean
|
||||
|
||||
@ -2,7 +2,6 @@ package util
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
@ -30,31 +29,38 @@ func TestKey(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestByteChanReader(t *testing.T) {
|
||||
data := make([]byte, 1024*1024)
|
||||
r := NewFastRand()
|
||||
r.Read(data)
|
||||
|
||||
var data bytes.Buffer
|
||||
var data2 bytes.Buffer
|
||||
dch := make(chan []byte, 8)
|
||||
randr := NewTimeSeededRand()
|
||||
|
||||
go func() {
|
||||
beg := 0
|
||||
for i := 0; i < len(data); {
|
||||
i += rand.Intn(100) + 1
|
||||
if i > len(data) {
|
||||
i = len(data)
|
||||
}
|
||||
dch <- data[beg:i]
|
||||
beg = i
|
||||
defer close(dch)
|
||||
for i := 0; i < rand.Intn(100)+100; i++ {
|
||||
chunk := make([]byte, rand.Intn(100000)+10)
|
||||
randr.Read(chunk)
|
||||
data.Write(chunk)
|
||||
// fmt.Printf("chunk: %6.d %v\n", len(chunk), chunk[:10])
|
||||
dch <- chunk
|
||||
}
|
||||
close(dch)
|
||||
}()
|
||||
|
||||
read := NewByteChanReader(dch)
|
||||
out, err := ioutil.ReadAll(read)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
||||
// read in random, weird sizes to exercise saving buffer.
|
||||
for {
|
||||
buf := make([]byte, rand.Intn(10)*10)
|
||||
n, err := read.Read(buf)
|
||||
data2.Write(buf[:n])
|
||||
// fmt.Printf("read: %6.d\n", n)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !bytes.Equal(out, data) {
|
||||
// fmt.Printf("lens: %d == %d\n", len(out), len(data.Bytes()))
|
||||
if !bytes.Equal(data2.Bytes(), data.Bytes()) {
|
||||
t.Fatal("Reader failed to stream correct bytes")
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user