From 276a8d062cd76fc8875af2fd849312a584e7e187 Mon Sep 17 00:00:00 2001 From: Matt Bell Date: Thu, 22 Jan 2015 10:05:42 -0800 Subject: [PATCH] core/commands: Made 'get' copying thread-safe --- core/commands/get.go | 179 ++++++++++++++++++++++++++----------------- 1 file changed, 110 insertions(+), 69 deletions(-) diff --git a/core/commands/get.go b/core/commands/get.go index 445b479c0..9bbab082e 100644 --- a/core/commands/get.go +++ b/core/commands/get.go @@ -5,6 +5,7 @@ import ( "bytes" "io" p "path" + "sync" cmds "github.com/jbenet/go-ipfs/commands" core "github.com/jbenet/go-ipfs/core" @@ -42,137 +43,177 @@ To output a TAR archive instead of unpacked files, use '--archive' or '-a'. return } - reader, err := get(node, req.Arguments()) + reader, err := get(node, req.Arguments()[0]) if err != nil { res.SetError(err, cmds.ErrNormal) return } res.SetOutput(reader) }, - - // TODO: create a PostRun that splits the archive up into files } -func get(node *core.IpfsNode, paths []string) (io.Reader, error) { - reader := &getReader{signalChan: make(chan struct{})} - writer := tar.NewWriter(&reader.buf) +func get(node *core.IpfsNode, path string) (io.Reader, error) { + buf := NewBufReadWriter() go func() { - for _, path := range paths { - _, err := copyFile(node, writer, path, nil, reader.signalChan) - if err != nil { - log.Error(err) - return - } - } - - err := writer.Flush() + err := copyFilesAsTar(node, buf, path) if err != nil { log.Error(err) return } - - reader.Close() - reader.Signal() }() - return reader, nil + return buf, nil } -func copyFile(node *core.IpfsNode, writer *tar.Writer, path string, dagnode *dag.Node, signal chan struct{}) (int64, error) { +func copyFilesAsTar(node *core.IpfsNode, buf *bufReadWriter, path string) error { + writer := tar.NewWriter(buf) + + err := _copyFilesAsTar(node, writer, buf, path, nil) + if err != nil { + return err + } + + err = writer.Flush() + if err != nil { + return err + } + buf.Close() + buf.Signal() + return nil +} + +func _copyFilesAsTar(node *core.IpfsNode, writer *tar.Writer, buf *bufReadWriter, path string, dagnode *dag.Node) error { var err error if dagnode == nil { dagnode, err = node.Resolver.ResolvePath(path) if err != nil { - return 0, err + return err } } pb := new(upb.Data) err = proto.Unmarshal(dagnode.Data, pb) if err != nil { - return 0, err + return err } - written := int64(0) if pb.GetType() == upb.Data_Directory { + buf.mutex.Lock() err = writer.WriteHeader(&tar.Header{ Name: path, Typeflag: tar.TypeDir, Mode: 0777, // TODO: set mode, dates, etc. when added to unixFS }) + buf.mutex.Unlock() if err != nil { - return 0, err + return err } for _, link := range dagnode.Links { - n, err := copyFile(node, writer, p.Join(path, link.Name), link.Node, signal) + err := _copyFilesAsTar(node, writer, buf, p.Join(path, link.Name), link.Node) if err != nil { - return 0, err + return err } - written += n - } - return written, nil - - } else { - err = writer.WriteHeader(&tar.Header{ - Name: path, - Size: int64(pb.GetFilesize()), - Typeflag: tar.TypeReg, - Mode: 0644, - // TODO: set mode, dates, etc. when added to unixFS - }) - if err != nil { - return 0, err } - reader, err := uio.NewDagReader(dagnode, node.DAG) - if err != nil { - return 0, err - } - - buf := make([]byte, 32*1024) - for { - nr, err := reader.Read(buf) - if nr > 0 { - nw, err := writer.Write(buf[:nr]) - if err != nil { - return written, err - } - written += int64(nw) - signal <- struct{}{} - } - if err == io.EOF { - break - } - if err != nil { - return written, err - } - } - return written, nil + return nil } + + buf.mutex.Lock() + err = writer.WriteHeader(&tar.Header{ + Name: path, + Size: int64(pb.GetFilesize()), + Typeflag: tar.TypeReg, + Mode: 0644, + // TODO: set mode, dates, etc. when added to unixFS + }) + buf.mutex.Unlock() + if err != nil { + return err + } + + reader, err := uio.NewDagReader(dagnode, node.DAG) + if err != nil { + return err + } + + _, err = syncCopy(writer, reader, buf) + if err != nil { + return err + } + + return nil } -type getReader struct { +type bufReadWriter struct { buf bytes.Buffer closed bool signalChan chan struct{} + mutex *sync.Mutex } -func (i *getReader) Read(p []byte) (int, error) { +func NewBufReadWriter() *bufReadWriter { + return &bufReadWriter{ + signalChan: make(chan struct{}), + mutex: &sync.Mutex{}, + } +} + +func (i *bufReadWriter) Read(p []byte) (int, error) { <-i.signalChan + i.mutex.Lock() + defer i.mutex.Unlock() + + if i.buf.Len() == 0 { + if i.closed { + return 0, io.EOF + } + return 0, nil + } + n, err := i.buf.Read(p) - if err == io.EOF && !i.closed { + if err == io.EOF && !i.closed || i.buf.Len() > 0 { return n, nil } return n, err } -func (i *getReader) Signal() { +func (i *bufReadWriter) Write(p []byte) (int, error) { + return i.buf.Write(p) +} + +func (i *bufReadWriter) Signal() { i.signalChan <- struct{}{} } -func (i *getReader) Close() { +func (i *bufReadWriter) Close() error { i.closed = true + return nil +} + +func syncCopy(writer io.Writer, reader io.Reader, buf *bufReadWriter) (int64, error) { + written := int64(0) + copyBuf := make([]byte, 32*1024) + for { + nr, err := reader.Read(copyBuf) + if nr > 0 { + buf.mutex.Lock() + nw, err := writer.Write(copyBuf[:nr]) + buf.mutex.Unlock() + if err != nil { + return written, err + } + written += int64(nw) + buf.Signal() + } + if err == io.EOF { + break + } + if err != nil { + return written, err + } + } + return written, nil }