mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 10:27:46 +08:00
Merge pull request #4508 from ipfs/fix/add-small-files
Don't lock up 256KiB buffers when adding small files
This commit is contained in:
commit
8f17968cd8
@ -131,7 +131,7 @@ func (adder *Adder) SetMfsRoot(r *mfs.Root) {
|
||||
}
|
||||
|
||||
// Constructs a node from reader's data, and adds it. Doesn't pin.
|
||||
func (adder Adder) add(reader io.Reader) (node.Node, error) {
|
||||
func (adder *Adder) add(reader io.Reader) (node.Node, error) {
|
||||
chnk, err := chunk.FromString(reader, adder.Chunker)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@ -11,7 +11,7 @@ import (
|
||||
func FromString(r io.Reader, chunker string) (Splitter, error) {
|
||||
switch {
|
||||
case chunker == "" || chunker == "default":
|
||||
return NewSizeSplitter(r, DefaultBlockSize), nil
|
||||
return DefaultSplitter(r), nil
|
||||
|
||||
case strings.HasPrefix(chunker, "size-"):
|
||||
sizeStr := strings.Split(chunker, "-")[1]
|
||||
|
||||
@ -5,6 +5,7 @@ import (
|
||||
"io"
|
||||
|
||||
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
|
||||
mpool "gx/ipfs/QmWBug6eBS7AxRdCDVuSY5CnSit7cS2XnPFYJWqWDumhCG/go-msgio/mpool"
|
||||
)
|
||||
|
||||
var log = logging.Logger("chunk")
|
||||
@ -51,14 +52,14 @@ func Chan(s Splitter) (<-chan []byte, <-chan error) {
|
||||
|
||||
type sizeSplitterv2 struct {
|
||||
r io.Reader
|
||||
size int64
|
||||
size uint32
|
||||
err error
|
||||
}
|
||||
|
||||
func NewSizeSplitter(r io.Reader, size int64) Splitter {
|
||||
return &sizeSplitterv2{
|
||||
r: r,
|
||||
size: size,
|
||||
size: uint32(size),
|
||||
}
|
||||
}
|
||||
|
||||
@ -66,17 +67,22 @@ func (ss *sizeSplitterv2) NextBytes() ([]byte, error) {
|
||||
if ss.err != nil {
|
||||
return nil, ss.err
|
||||
}
|
||||
buf := make([]byte, ss.size)
|
||||
n, err := io.ReadFull(ss.r, buf)
|
||||
if err == io.ErrUnexpectedEOF {
|
||||
|
||||
full := mpool.ByteSlicePool.Get(ss.size).([]byte)[:ss.size]
|
||||
n, err := io.ReadFull(ss.r, full)
|
||||
switch err {
|
||||
case io.ErrUnexpectedEOF:
|
||||
ss.err = io.EOF
|
||||
err = nil
|
||||
}
|
||||
if err != nil {
|
||||
small := make([]byte, n)
|
||||
copy(small, full)
|
||||
mpool.ByteSlicePool.Put(ss.size, full)
|
||||
return small, nil
|
||||
case nil:
|
||||
return full, nil
|
||||
default:
|
||||
mpool.ByteSlicePool.Put(ss.size, full)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return buf[:n], nil
|
||||
}
|
||||
|
||||
func (ss *sizeSplitterv2) Reader() io.Reader {
|
||||
|
||||
@ -22,6 +22,20 @@ func copyBuf(buf []byte) []byte {
|
||||
return cpy
|
||||
}
|
||||
|
||||
func TestSizeSplitterOverAllocate(t *testing.T) {
|
||||
max := 1000
|
||||
r := bytes.NewReader(randBuf(t, max))
|
||||
chunksize := int64(1024 * 256)
|
||||
splitter := NewSizeSplitter(r, chunksize)
|
||||
chunk, err := splitter.NextBytes()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if cap(chunk) > len(chunk) {
|
||||
t.Fatal("chunk capacity too large")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSizeSplitterIsDeterministic(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.SkipNow()
|
||||
|
||||
@ -34,7 +34,7 @@ func BuildDagFromFile(fpath string, ds dag.DAGService) (node.Node, error) {
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
return BuildDagFromReader(ds, chunk.NewSizeSplitter(f, chunk.DefaultBlockSize))
|
||||
return BuildDagFromReader(ds, chunk.DefaultSplitter(f))
|
||||
}
|
||||
|
||||
func BuildDagFromReader(ds dag.DAGService, spl chunk.Splitter) (node.Node, error) {
|
||||
|
||||
@ -509,6 +509,12 @@
|
||||
"hash": "QmYmhgAcvmDGXct1qBvc1kz9BxQSit1XBrTeiGZp2FvRyn",
|
||||
"name": "go-libp2p-blankhost",
|
||||
"version": "0.2.3"
|
||||
},
|
||||
{
|
||||
"author": "jbenet",
|
||||
"hash": "QmWBug6eBS7AxRdCDVuSY5CnSit7cS2XnPFYJWqWDumhCG",
|
||||
"name": "go-msgio",
|
||||
"version": "0.0.3"
|
||||
}
|
||||
],
|
||||
"gxVersion": "0.10.0",
|
||||
|
||||
Loading…
Reference in New Issue
Block a user