diff --git a/core/commands/get.go b/core/commands/get.go index 3e6205f9f..9255bc263 100644 --- a/core/commands/get.go +++ b/core/commands/get.go @@ -1,25 +1,18 @@ package commands import ( - "archive/tar" - "bytes" "compress/gzip" "errors" "fmt" "io" "os" - p "path" - fp "path/filepath" "strings" - "sync" cmds "github.com/jbenet/go-ipfs/commands" core "github.com/jbenet/go-ipfs/core" - dag "github.com/jbenet/go-ipfs/merkledag" - uio "github.com/jbenet/go-ipfs/unixfs/io" - upb "github.com/jbenet/go-ipfs/unixfs/pb" + tar "github.com/jbenet/go-ipfs/thirdparty/tar" + utar "github.com/jbenet/go-ipfs/unixfs/tar" - proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb" ) @@ -64,22 +57,19 @@ may also specify the level of compression by specifying '-l=<1-9>'. return } - compress, _, _ := req.Option("compress").Bool() - compressionLevel, found, _ := req.Option("compression-level").Int() - if !found { - if compress { - compressionLevel = gzip.DefaultCompression - } else { - compressionLevel = gzip.NoCompression - } - } else { - if compressionLevel < 1 || compressionLevel > 9 { - res.SetError(ErrInvalidCompressionLevel, cmds.ErrClient) - return - } + cmprs, _, _ := req.Option("compress").Bool() + cmplvl, cmplvlFound, _ := req.Option("compression-level").Int() + switch { + case !cmprs: + cmplvl = gzip.NoCompression + case cmprs && !cmplvlFound: + cmplvl = gzip.DefaultCompression + case cmprs && cmplvlFound && (cmplvl < 1 || cmplvl > 9): + res.SetError(ErrInvalidCompressionLevel, cmds.ErrClient) + return } - reader, err := get(node, req.Arguments()[0], compressionLevel) + reader, err := get(node, req.Arguments()[0], cmplvl) if err != nil { res.SetError(err, cmds.ErrNormal) return @@ -87,7 +77,7 @@ may also specify the level of compression by specifying '-l=<1-9>'. res.SetOutput(reader) }, PostRun: func(req cmds.Request, res cmds.Response) { - reader := res.Output().(io.Reader) + outReader := res.Output().(io.Reader) res.SetOutput(nil) outPath, _, _ := req.Option("output").String() @@ -95,15 +85,17 @@ may also specify the level of compression by specifying '-l=<1-9>'. outPath = req.Arguments()[0] } - compress, _, _ := req.Option("compress").Bool() - compressionLevel, found, _ := req.Option("compression-level").Int() - compress = (compress && (compressionLevel > 0 || !found)) || compressionLevel > 0 + cmprs, _, _ := req.Option("compress").Bool() + cmplvl, _, _ := req.Option("compression-level").Int() + if !cmprs { + cmprs = cmplvl > 0 + } if archive, _, _ := req.Option("archive").Bool(); archive { if !strings.HasSuffix(outPath, ".tar") { outPath += ".tar" } - if compress { + if cmprs { outPath += ".gz" } fmt.Printf("Saving archive to %s\n", outPath) @@ -117,7 +109,7 @@ may also specify the level of compression by specifying '-l=<1-9>'. bar := pb.New(0).SetUnits(pb.U_BYTES) bar.Output = os.Stderr - pbReader := bar.NewProxyReader(reader) + pbReader := bar.NewProxyReader(outReader) bar.Start() defer bar.Finish() @@ -136,291 +128,40 @@ may also specify the level of compression by specifying '-l=<1-9>'. bar := pb.New(0).SetUnits(pb.U_BYTES) bar.Output = os.Stderr - preexisting := true - pathIsDir := false - if stat, err := os.Stat(outPath); err != nil && os.IsNotExist(err) { - preexisting = false - } else if err != nil { - res.SetError(err, cmds.ErrNormal) - return - } else if stat.IsDir() { - pathIsDir = true - } - - var tarReader *tar.Reader - if compress { - gzipReader, err := gzip.NewReader(reader) + // wrap the reader with the progress bar proxy reader + // if the output is compressed, also wrap it in a gzip.Reader + var reader io.Reader + if cmprs { + gzipReader, err := gzip.NewReader(outReader) if err != nil { res.SetError(err, cmds.ErrNormal) return } defer gzipReader.Close() - pbReader := bar.NewProxyReader(gzipReader) - tarReader = tar.NewReader(pbReader) + reader = bar.NewProxyReader(gzipReader) } else { - pbReader := bar.NewProxyReader(reader) - tarReader = tar.NewReader(pbReader) + reader = bar.NewProxyReader(outReader) } bar.Start() defer bar.Finish() - for i := 0; ; i++ { - header, err := tarReader.Next() - if err != nil && err != io.EOF { - res.SetError(err, cmds.ErrNormal) - return - } - if header == nil || err == io.EOF { - break - } - - if header.Typeflag == tar.TypeDir { - pathElements := strings.Split(header.Name, "/") - if !preexisting { - pathElements = pathElements[1:] - } - path := fp.Join(pathElements...) - path = fp.Join(outPath, path) - if i == 0 { - outPath = path - } - - err = os.MkdirAll(path, 0755) - if err != nil { - res.SetError(err, cmds.ErrNormal) - return - } - continue - } - - var path string - if i == 0 { - if preexisting { - if !pathIsDir { - res.SetError(os.ErrExist, cmds.ErrNormal) - return - } - path = fp.Join(outPath, header.Name) - } else { - path = outPath - } - } else { - pathElements := strings.Split(header.Name, "/")[1:] - path = fp.Join(pathElements...) - path = fp.Join(outPath, path) - } - - file, err := os.Create(path) - if err != nil { - res.SetError(err, cmds.ErrNormal) - return - } - - _, err = io.Copy(file, tarReader) - if err != nil { - res.SetError(err, cmds.ErrNormal) - return - } - - err = file.Close() - if err != nil { - res.SetError(err, cmds.ErrNormal) - return - } + extractor := &tar.Extractor{outPath} + err := extractor.Extract(reader) + if err != nil { + res.SetError(err, cmds.ErrNormal) } }, } func getCheckOptions(req cmds.Request) error { - compressionLevel, found, _ := req.Option("compression-level").Int() - if found && (compressionLevel < 1 || compressionLevel > 9) { + cmplvl, found, _ := req.Option("compression-level").Int() + if found && (cmplvl < 1 || cmplvl > 9) { return ErrInvalidCompressionLevel } return nil } func get(node *core.IpfsNode, path string, compression int) (io.Reader, error) { - buf := NewBufReadWriter() - - go func() { - err := copyFilesAsTar(node, buf, path, compression) - if err != nil { - log.Error(err) - return - } - }() - - return buf, nil -} - -func copyFilesAsTar(node *core.IpfsNode, buf *bufReadWriter, path string, compression int) error { - var gzipWriter *gzip.Writer - var writer *tar.Writer - var err error - if compression != gzip.NoCompression { - gzipWriter, err = gzip.NewWriterLevel(buf, compression) - if err != nil { - return err - } - writer = tar.NewWriter(gzipWriter) - } else { - writer = tar.NewWriter(buf) - } - - err = _copyFilesAsTar(node, writer, buf, path, nil) - if err != nil { - return err - } - - buf.mutex.Lock() - err = writer.Close() - if err != nil { - return err - } - if gzipWriter != nil { - err = gzipWriter.Close() - if err != nil { - return err - } - } - buf.Close() - buf.mutex.Unlock() - buf.Signal() - return nil -} - -func _copyFilesAsTar(node *core.IpfsNode, writer *tar.Writer, buf *bufReadWriter, path string, dagnode *dag.Node) error { - var err error - if dagnode == nil { - dagnode, err = node.Resolver.ResolvePath(path) - if err != nil { - return err - } - } - - pb := new(upb.Data) - err = proto.Unmarshal(dagnode.Data, pb) - if err != nil { - return err - } - - if pb.GetType() == upb.Data_Directory { - buf.mutex.Lock() - err = writer.WriteHeader(&tar.Header{ - Name: path, - Typeflag: tar.TypeDir, - Mode: 0777, - // TODO: set mode, dates, etc. when added to unixFS - }) - buf.mutex.Unlock() - if err != nil { - return err - } - - for _, link := range dagnode.Links { - err := _copyFilesAsTar(node, writer, buf, p.Join(path, link.Name), link.Node) - if err != nil { - return err - } - } - - return nil - } - - buf.mutex.Lock() - err = writer.WriteHeader(&tar.Header{ - Name: path, - Size: int64(pb.GetFilesize()), - Typeflag: tar.TypeReg, - Mode: 0644, - // TODO: set mode, dates, etc. when added to unixFS - }) - buf.mutex.Unlock() - if err != nil { - return err - } - - reader, err := uio.NewDagReader(dagnode, node.DAG) - if err != nil { - return err - } - - _, err = syncCopy(writer, reader, buf) - if err != nil { - return err - } - - return nil -} - -type bufReadWriter struct { - buf bytes.Buffer - closed bool - signalChan chan struct{} - mutex *sync.Mutex -} - -func NewBufReadWriter() *bufReadWriter { - return &bufReadWriter{ - signalChan: make(chan struct{}), - mutex: &sync.Mutex{}, - } -} - -func (i *bufReadWriter) Read(p []byte) (int, error) { - <-i.signalChan - i.mutex.Lock() - defer i.mutex.Unlock() - - if i.buf.Len() == 0 { - if i.closed { - return 0, io.EOF - } - return 0, nil - } - - n, err := i.buf.Read(p) - if err == io.EOF && !i.closed || i.buf.Len() > 0 { - return n, nil - } - return n, err -} - -func (i *bufReadWriter) Write(p []byte) (int, error) { - return i.buf.Write(p) -} - -func (i *bufReadWriter) Signal() { - i.signalChan <- struct{}{} -} - -func (i *bufReadWriter) Close() error { - i.closed = true - return nil -} - -func syncCopy(writer io.Writer, reader io.Reader, buf *bufReadWriter) (int64, error) { - written := int64(0) - copyBuf := make([]byte, 32*1024) - for { - nr, err := reader.Read(copyBuf) - if nr > 0 { - buf.mutex.Lock() - nw, err := writer.Write(copyBuf[:nr]) - buf.mutex.Unlock() - if err != nil { - return written, err - } - written += int64(nw) - buf.Signal() - } - if err == io.EOF { - break - } - if err != nil { - return written, err - } - } - return written, nil + return utar.NewReader(path, node.DAG, node.Resolver, compression) } diff --git a/thirdparty/tar/extractor.go b/thirdparty/tar/extractor.go new file mode 100644 index 000000000..0da0a772c --- /dev/null +++ b/thirdparty/tar/extractor.go @@ -0,0 +1,108 @@ +package tar + +import ( + "archive/tar" + "io" + "os" + fp "path/filepath" + "strings" +) + +type Extractor struct { + Path string +} + +func (te *Extractor) Extract(reader io.Reader) error { + tarReader := tar.NewReader(reader) + + // Check if the output path already exists, so we know whether we should + // create our output with that name, or if we should put the output inside + // a preexisting directory + exists := true + pathIsDir := false + if stat, err := os.Stat(te.Path); err != nil && os.IsNotExist(err) { + exists = false + } else if err != nil { + return err + } else if stat.IsDir() { + pathIsDir = true + } + + // files come recursively in order (i == 0 is root directory) + for i := 0; ; i++ { + header, err := tarReader.Next() + if err != nil && err != io.EOF { + return err + } + if header == nil || err == io.EOF { + break + } + + if header.Typeflag == tar.TypeDir { + err = te.extractDir(header, i, exists) + if err != nil { + return err + } + continue + } + + err = te.extractFile(header, tarReader, i, exists, pathIsDir) + if err != nil { + return err + } + } + return nil +} + +func (te *Extractor) extractDir(h *tar.Header, depth int, exists bool) error { + pathElements := strings.Split(h.Name, "/") + if !exists { + pathElements = pathElements[1:] + } + path := fp.Join(pathElements...) + path = fp.Join(te.Path, path) + if depth == 0 { + // if this is the root root directory, use it as the output path for remaining files + te.Path = path + } + + err := os.MkdirAll(path, 0755) + if err != nil { + return err + } + + return nil +} + +func (te *Extractor) extractFile(h *tar.Header, r *tar.Reader, depth int, exists bool, pathIsDir bool) error { + var path string + if depth == 0 { + // if depth is 0, this is the only file (we aren't 'ipfs get'ing a directory) + switch { + case exists && !pathIsDir: + return os.ErrExist + case exists && pathIsDir: + path = fp.Join(te.Path, h.Name) + case !exists: + path = te.Path + } + } else { + // we are outputting a directory, this file is inside of it + pathElements := strings.Split(h.Name, "/")[1:] + path = fp.Join(pathElements...) + path = fp.Join(te.Path, path) + } + + file, err := os.Create(path) + if err != nil { + return err + } + defer file.Close() + + _, err = io.Copy(file, r) + if err != nil { + return err + } + + return nil +} diff --git a/unixfs/tar/reader.go b/unixfs/tar/reader.go new file mode 100644 index 000000000..725e6867f --- /dev/null +++ b/unixfs/tar/reader.go @@ -0,0 +1,200 @@ +package tar + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "io" + p "path" + + mdag "github.com/jbenet/go-ipfs/merkledag" + path "github.com/jbenet/go-ipfs/path" + uio "github.com/jbenet/go-ipfs/unixfs/io" + upb "github.com/jbenet/go-ipfs/unixfs/pb" + + proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" +) + +type Reader struct { + buf bytes.Buffer + closed bool + signalChan chan struct{} + dag mdag.DAGService + resolver *path.Resolver + writer *tar.Writer + gzipWriter *gzip.Writer + err error +} + +func NewReader(path string, dag mdag.DAGService, resolver *path.Resolver, compression int) (*Reader, error) { + reader := &Reader{ + signalChan: make(chan struct{}), + dag: dag, + resolver: resolver, + } + + var err error + if compression != gzip.NoCompression { + reader.gzipWriter, err = gzip.NewWriterLevel(&reader.buf, compression) + if err != nil { + return nil, err + } + reader.writer = tar.NewWriter(reader.gzipWriter) + } else { + reader.writer = tar.NewWriter(&reader.buf) + } + + dagnode, err := resolver.ResolvePath(path) + if err != nil { + return nil, err + } + + // writeToBuf will write the data to the buffer, and will signal when there + // is new data to read + go reader.writeToBuf(dagnode, path, 0) + + return reader, nil +} + +func (i *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) { + pb := new(upb.Data) + err := proto.Unmarshal(dagnode.Data, pb) + if err != nil { + i.emitError(err) + return + } + + if depth == 0 { + defer i.close() + } + + if pb.GetType() == upb.Data_Directory { + err = i.writer.WriteHeader(&tar.Header{ + Name: path, + Typeflag: tar.TypeDir, + Mode: 0777, + // TODO: set mode, dates, etc. when added to unixFS + }) + if err != nil { + i.emitError(err) + return + } + + for _, link := range dagnode.Links { + childNode, err := link.GetNode(i.dag) + if err != nil { + i.emitError(err) + return + } + i.writeToBuf(childNode, p.Join(path, link.Name), depth+1) + } + return + } + + err = i.writer.WriteHeader(&tar.Header{ + Name: path, + Size: int64(pb.GetFilesize()), + Typeflag: tar.TypeReg, + Mode: 0644, + // TODO: set mode, dates, etc. when added to unixFS + }) + if err != nil { + i.emitError(err) + return + } + + reader, err := uio.NewDagReader(dagnode, i.dag) + if err != nil { + i.emitError(err) + return + } + + err = i.syncCopy(reader) + if err != nil { + i.emitError(err) + return + } +} + +func (i *Reader) Read(p []byte) (int, error) { + // wait for the goroutine that is writing data to the buffer to tell us + // there is something to read + if !i.closed { + <-i.signalChan + } + + if i.err != nil { + return 0, i.err + } + + if !i.closed { + defer i.signal() + } + + if i.buf.Len() == 0 { + if i.closed { + return 0, io.EOF + } + return 0, nil + } + + n, err := i.buf.Read(p) + if err == io.EOF && !i.closed || i.buf.Len() > 0 { + return n, nil + } + + return n, err +} + +func (i *Reader) signal() { + i.signalChan <- struct{}{} +} + +func (i *Reader) emitError(err error) { + i.err = err + i.signal() +} + +func (i *Reader) close() { + i.closed = true + i.flush() +} + +func (i *Reader) flush() { + defer i.signal() + err := i.writer.Close() + if err != nil { + i.emitError(err) + return + } + if i.gzipWriter != nil { + err = i.gzipWriter.Close() + if err != nil { + i.emitError(err) + return + } + } +} + +func (i *Reader) syncCopy(reader io.Reader) error { + buf := make([]byte, 32*1024) + for { + nr, err := reader.Read(buf) + if nr > 0 { + _, err := i.writer.Write(buf[:nr]) + if err != nil { + return err + } + i.signal() + // wait for Read to finish reading + <-i.signalChan + } + if err == io.EOF { + break + } + if err != nil { + return err + } + } + return nil +}