diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 54a992cdb..edfaa11fa 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -37,13 +37,26 @@ func NewBlockService(d ds.Datastore, rem exchange.Interface) (*BlockService, err // AddBlock adds a particular block to the service, Putting it into the datastore. func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) { k := b.Key() - log.Debug("blockservice: storing [%s] in datastore", k) // TODO(brian): define a block datastore with a Put method which accepts a // block parameter - err := s.Datastore.Put(k.DsKey(), b.Data) + + // check if we have it before adding. this is an extra read, but large writes + // are more expensive. + // TODO(jbenet) cheaper has. https://github.com/jbenet/go-datastore/issues/6 + has, err := s.Datastore.Has(k.DsKey()) if err != nil { return k, err } + if has { + log.Debugf("blockservice: storing [%s] in datastore (already stored)", k) + } else { + log.Debugf("blockservice: storing [%s] in datastore", k) + err := s.Datastore.Put(k.DsKey(), b.Data) + if err != nil { + return k, err + } + } + if s.Remote != nil { ctx := context.TODO() err = s.Remote.HasBlock(ctx, *b) diff --git a/importer/chunk/splitting_test.go b/importer/chunk/splitting_test.go new file mode 100644 index 000000000..0ecb143cb --- /dev/null +++ b/importer/chunk/splitting_test.go @@ -0,0 +1,53 @@ +package chunk + +import ( + "bytes" + "crypto/rand" + "testing" +) + +func randBuf(t *testing.T, size int) []byte { + buf := make([]byte, size) + if _, err := rand.Read(buf); err != nil { + t.Fatal("failed to read enough randomness") + } + return buf +} + +func copyBuf(buf []byte) []byte { + cpy := make([]byte, len(buf)) + copy(cpy, buf) + return cpy +} + +func TestSizeSplitterIsDeterministic(t *testing.T) { + + test := func() { + bufR := randBuf(t, 10000000) // crank this up to satisfy yourself. + bufA := copyBuf(bufR) + bufB := copyBuf(bufR) + + chunksA := DefaultSplitter.Split(bytes.NewReader(bufA)) + chunksB := DefaultSplitter.Split(bytes.NewReader(bufB)) + + for n := 0; ; n++ { + a, moreA := <-chunksA + b, moreB := <-chunksB + + if !moreA { + if moreB { + t.Fatal("A ended, B didnt.") + } + return + } + + if !bytes.Equal(a, b) { + t.Fatalf("chunk %d not equal", n) + } + } + } + + for run := 0; run < 1; run++ { // crank this up to satisfy yourself. + test() + } +} diff --git a/pin/pin_test.go b/pin/pin_test.go index 7bf0756df..1ea302823 100644 --- a/pin/pin_test.go +++ b/pin/pin_test.go @@ -12,7 +12,7 @@ import ( func randNode() (*mdag.Node, util.Key) { nd := new(mdag.Node) nd.Data = make([]byte, 32) - util.NewFastRand().Read(nd.Data) + util.NewTimeSeededRand().Read(nd.Data) k, _ := nd.Key() return nd, k } diff --git a/unixfs/io/dagmodifier_test.go b/unixfs/io/dagmodifier_test.go index d45559b3a..edb4d6f76 100644 --- a/unixfs/io/dagmodifier_test.go +++ b/unixfs/io/dagmodifier_test.go @@ -28,7 +28,7 @@ func getMockDagServ(t *testing.T) mdag.DAGService { func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Node) { dw := NewDagWriter(dserv, &chunk.SizeSplitter{500}) - n, err := io.CopyN(dw, u.NewFastRand(), size) + n, err := io.CopyN(dw, u.NewTimeSeededRand(), size) if err != nil { t.Fatal(err) } @@ -58,7 +58,7 @@ func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Nod func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm *DagModifier) []byte { newdata := make([]byte, size) - r := u.NewFastRand() + r := u.NewTimeSeededRand() r.Read(newdata) if size+beg > uint64(len(orig)) { @@ -160,7 +160,7 @@ func TestMultiWrite(t *testing.T) { } data := make([]byte, 4000) - u.NewFastRand().Read(data) + u.NewTimeSeededRand().Read(data) for i := 0; i < len(data); i++ { n, err := dagmod.WriteAt(data[i:i+1], uint64(i)) @@ -201,7 +201,7 @@ func TestMultiWriteCoal(t *testing.T) { } data := make([]byte, 4000) - u.NewFastRand().Read(data) + u.NewTimeSeededRand().Read(data) for i := 0; i < len(data); i++ { n, err := dagmod.WriteAt(data[:i+1], 0) diff --git a/util/util.go b/util/util.go index 2fd58c000..9ba3d62f7 100644 --- a/util/util.go +++ b/util/util.go @@ -60,65 +60,46 @@ func NewByteChanReader(in chan []byte) io.Reader { return &byteChanReader{in: in} } -func (bcr *byteChanReader) Read(b []byte) (int, error) { - if len(bcr.buf) == 0 { - data, ok := <-bcr.in - if !ok { - return 0, io.EOF +func (bcr *byteChanReader) Read(output []byte) (int, error) { + remain := output + remainLen := len(output) + outputLen := 0 + more := false + next := bcr.buf + + for { + n := copy(remain, next) + remainLen -= n + outputLen += n + if remainLen == 0 { + bcr.buf = next[n:] + return outputLen, nil } - bcr.buf = data - } - if len(bcr.buf) >= len(b) { - copy(b, bcr.buf) - bcr.buf = bcr.buf[len(b):] - return len(b), nil - } - - copy(b, bcr.buf) - b = b[len(bcr.buf):] - totread := len(bcr.buf) - - for data := range bcr.in { - if len(data) > len(b) { - totread += len(b) - copy(b, data[:len(b)]) - bcr.buf = data[len(b):] - return totread, nil - } - copy(b, data) - totread += len(data) - b = b[len(data):] - if len(b) == 0 { - return totread, nil + remain = remain[n:] + next, more = <-bcr.in + if !more { + return outputLen, io.EOF } } - return totread, io.EOF } type randGen struct { - src rand.Source + rand.Rand } -func NewFastRand() io.Reader { - return &randGen{rand.NewSource(time.Now().UnixNano())} +func NewTimeSeededRand() io.Reader { + src := rand.NewSource(time.Now().UnixNano()) + return &randGen{ + Rand: *rand.New(src), + } } func (r *randGen) Read(p []byte) (n int, err error) { - todo := len(p) - offset := 0 - for { - val := int64(r.src.Int63()) - for i := 0; i < 8; i++ { - p[offset] = byte(val & 0xff) - todo-- - if todo == 0 { - return len(p), nil - } - offset++ - val >>= 8 - } + for i := 0; i < len(p); i++ { + p[i] = byte(r.Rand.Intn(255)) } + return len(p), nil } // GetenvBool is the way to check an env var as a boolean diff --git a/util/util_test.go b/util/util_test.go index c2bb8f484..9648c5889 100644 --- a/util/util_test.go +++ b/util/util_test.go @@ -2,7 +2,6 @@ package util import ( "bytes" - "io/ioutil" "math/rand" "testing" @@ -30,31 +29,38 @@ func TestKey(t *testing.T) { } func TestByteChanReader(t *testing.T) { - data := make([]byte, 1024*1024) - r := NewFastRand() - r.Read(data) + + var data bytes.Buffer + var data2 bytes.Buffer dch := make(chan []byte, 8) + randr := NewTimeSeededRand() go func() { - beg := 0 - for i := 0; i < len(data); { - i += rand.Intn(100) + 1 - if i > len(data) { - i = len(data) - } - dch <- data[beg:i] - beg = i + defer close(dch) + for i := 0; i < rand.Intn(100)+100; i++ { + chunk := make([]byte, rand.Intn(100000)+10) + randr.Read(chunk) + data.Write(chunk) + // fmt.Printf("chunk: %6.d %v\n", len(chunk), chunk[:10]) + dch <- chunk } - close(dch) }() read := NewByteChanReader(dch) - out, err := ioutil.ReadAll(read) - if err != nil { - t.Fatal(err) + + // read in random, weird sizes to exercise saving buffer. + for { + buf := make([]byte, rand.Intn(10)*10) + n, err := read.Read(buf) + data2.Write(buf[:n]) + // fmt.Printf("read: %6.d\n", n) + if err != nil { + break + } } - if !bytes.Equal(out, data) { + // fmt.Printf("lens: %d == %d\n", len(out), len(data.Bytes())) + if !bytes.Equal(data2.Bytes(), data.Bytes()) { t.Fatal("Reader failed to stream correct bytes") } }