core/commands: Made 'get' copying thread-safe

This commit is contained in:
Matt Bell 2015-01-22 10:05:42 -08:00
parent 36249b9292
commit 276a8d062c

View File

@ -5,6 +5,7 @@ import (
"bytes"
"io"
p "path"
"sync"
cmds "github.com/jbenet/go-ipfs/commands"
core "github.com/jbenet/go-ipfs/core"
@ -42,137 +43,177 @@ To output a TAR archive instead of unpacked files, use '--archive' or '-a'.
return
}
reader, err := get(node, req.Arguments())
reader, err := get(node, req.Arguments()[0])
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
res.SetOutput(reader)
},
// TODO: create a PostRun that splits the archive up into files
}
func get(node *core.IpfsNode, paths []string) (io.Reader, error) {
reader := &getReader{signalChan: make(chan struct{})}
writer := tar.NewWriter(&reader.buf)
func get(node *core.IpfsNode, path string) (io.Reader, error) {
buf := NewBufReadWriter()
go func() {
for _, path := range paths {
_, err := copyFile(node, writer, path, nil, reader.signalChan)
if err != nil {
log.Error(err)
return
}
}
err := writer.Flush()
err := copyFilesAsTar(node, buf, path)
if err != nil {
log.Error(err)
return
}
reader.Close()
reader.Signal()
}()
return reader, nil
return buf, nil
}
func copyFile(node *core.IpfsNode, writer *tar.Writer, path string, dagnode *dag.Node, signal chan struct{}) (int64, error) {
func copyFilesAsTar(node *core.IpfsNode, buf *bufReadWriter, path string) error {
writer := tar.NewWriter(buf)
err := _copyFilesAsTar(node, writer, buf, path, nil)
if err != nil {
return err
}
err = writer.Flush()
if err != nil {
return err
}
buf.Close()
buf.Signal()
return nil
}
func _copyFilesAsTar(node *core.IpfsNode, writer *tar.Writer, buf *bufReadWriter, path string, dagnode *dag.Node) error {
var err error
if dagnode == nil {
dagnode, err = node.Resolver.ResolvePath(path)
if err != nil {
return 0, err
return err
}
}
pb := new(upb.Data)
err = proto.Unmarshal(dagnode.Data, pb)
if err != nil {
return 0, err
return err
}
written := int64(0)
if pb.GetType() == upb.Data_Directory {
buf.mutex.Lock()
err = writer.WriteHeader(&tar.Header{
Name: path,
Typeflag: tar.TypeDir,
Mode: 0777,
// TODO: set mode, dates, etc. when added to unixFS
})
buf.mutex.Unlock()
if err != nil {
return 0, err
return err
}
for _, link := range dagnode.Links {
n, err := copyFile(node, writer, p.Join(path, link.Name), link.Node, signal)
err := _copyFilesAsTar(node, writer, buf, p.Join(path, link.Name), link.Node)
if err != nil {
return 0, err
return err
}
written += n
}
return written, nil
} else {
err = writer.WriteHeader(&tar.Header{
Name: path,
Size: int64(pb.GetFilesize()),
Typeflag: tar.TypeReg,
Mode: 0644,
// TODO: set mode, dates, etc. when added to unixFS
})
if err != nil {
return 0, err
}
reader, err := uio.NewDagReader(dagnode, node.DAG)
if err != nil {
return 0, err
}
buf := make([]byte, 32*1024)
for {
nr, err := reader.Read(buf)
if nr > 0 {
nw, err := writer.Write(buf[:nr])
if err != nil {
return written, err
}
written += int64(nw)
signal <- struct{}{}
}
if err == io.EOF {
break
}
if err != nil {
return written, err
}
}
return written, nil
return nil
}
buf.mutex.Lock()
err = writer.WriteHeader(&tar.Header{
Name: path,
Size: int64(pb.GetFilesize()),
Typeflag: tar.TypeReg,
Mode: 0644,
// TODO: set mode, dates, etc. when added to unixFS
})
buf.mutex.Unlock()
if err != nil {
return err
}
reader, err := uio.NewDagReader(dagnode, node.DAG)
if err != nil {
return err
}
_, err = syncCopy(writer, reader, buf)
if err != nil {
return err
}
return nil
}
type getReader struct {
type bufReadWriter struct {
buf bytes.Buffer
closed bool
signalChan chan struct{}
mutex *sync.Mutex
}
func (i *getReader) Read(p []byte) (int, error) {
func NewBufReadWriter() *bufReadWriter {
return &bufReadWriter{
signalChan: make(chan struct{}),
mutex: &sync.Mutex{},
}
}
func (i *bufReadWriter) Read(p []byte) (int, error) {
<-i.signalChan
i.mutex.Lock()
defer i.mutex.Unlock()
if i.buf.Len() == 0 {
if i.closed {
return 0, io.EOF
}
return 0, nil
}
n, err := i.buf.Read(p)
if err == io.EOF && !i.closed {
if err == io.EOF && !i.closed || i.buf.Len() > 0 {
return n, nil
}
return n, err
}
func (i *getReader) Signal() {
func (i *bufReadWriter) Write(p []byte) (int, error) {
return i.buf.Write(p)
}
func (i *bufReadWriter) Signal() {
i.signalChan <- struct{}{}
}
func (i *getReader) Close() {
func (i *bufReadWriter) Close() error {
i.closed = true
return nil
}
func syncCopy(writer io.Writer, reader io.Reader, buf *bufReadWriter) (int64, error) {
written := int64(0)
copyBuf := make([]byte, 32*1024)
for {
nr, err := reader.Read(copyBuf)
if nr > 0 {
buf.mutex.Lock()
nw, err := writer.Write(copyBuf[:nr])
buf.mutex.Unlock()
if err != nil {
return written, err
}
written += int64(nw)
buf.Signal()
}
if err == io.EOF {
break
}
if err != nil {
return written, err
}
}
return written, nil
}