Extracted TAR archive building/reading code out of 'ipfs get'

This commit is contained in:
Matt Bell 2015-01-24 00:26:42 -08:00
parent 5efd99c1fe
commit fe48ae2dd6
3 changed files with 343 additions and 294 deletions

View File

@ -1,25 +1,18 @@
package commands
import (
"archive/tar"
"bytes"
"compress/gzip"
"errors"
"fmt"
"io"
"os"
p "path"
fp "path/filepath"
"strings"
"sync"
cmds "github.com/jbenet/go-ipfs/commands"
core "github.com/jbenet/go-ipfs/core"
dag "github.com/jbenet/go-ipfs/merkledag"
uio "github.com/jbenet/go-ipfs/unixfs/io"
upb "github.com/jbenet/go-ipfs/unixfs/pb"
tar "github.com/jbenet/go-ipfs/thirdparty/tar"
utar "github.com/jbenet/go-ipfs/unixfs/tar"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb"
)
@ -64,22 +57,19 @@ may also specify the level of compression by specifying '-l=<1-9>'.
return
}
compress, _, _ := req.Option("compress").Bool()
compressionLevel, found, _ := req.Option("compression-level").Int()
if !found {
if compress {
compressionLevel = gzip.DefaultCompression
} else {
compressionLevel = gzip.NoCompression
}
} else {
if compressionLevel < 1 || compressionLevel > 9 {
res.SetError(ErrInvalidCompressionLevel, cmds.ErrClient)
return
}
cmprs, _, _ := req.Option("compress").Bool()
cmplvl, cmplvlFound, _ := req.Option("compression-level").Int()
switch {
case !cmprs:
cmplvl = gzip.NoCompression
case cmprs && !cmplvlFound:
cmplvl = gzip.DefaultCompression
case cmprs && cmplvlFound && (cmplvl < 1 || cmplvl > 9):
res.SetError(ErrInvalidCompressionLevel, cmds.ErrClient)
return
}
reader, err := get(node, req.Arguments()[0], compressionLevel)
reader, err := get(node, req.Arguments()[0], cmplvl)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
@ -87,7 +77,7 @@ may also specify the level of compression by specifying '-l=<1-9>'.
res.SetOutput(reader)
},
PostRun: func(req cmds.Request, res cmds.Response) {
reader := res.Output().(io.Reader)
outReader := res.Output().(io.Reader)
res.SetOutput(nil)
outPath, _, _ := req.Option("output").String()
@ -95,15 +85,17 @@ may also specify the level of compression by specifying '-l=<1-9>'.
outPath = req.Arguments()[0]
}
compress, _, _ := req.Option("compress").Bool()
compressionLevel, found, _ := req.Option("compression-level").Int()
compress = (compress && (compressionLevel > 0 || !found)) || compressionLevel > 0
cmprs, _, _ := req.Option("compress").Bool()
cmplvl, _, _ := req.Option("compression-level").Int()
if !cmprs {
cmprs = cmplvl > 0
}
if archive, _, _ := req.Option("archive").Bool(); archive {
if !strings.HasSuffix(outPath, ".tar") {
outPath += ".tar"
}
if compress {
if cmprs {
outPath += ".gz"
}
fmt.Printf("Saving archive to %s\n", outPath)
@ -117,7 +109,7 @@ may also specify the level of compression by specifying '-l=<1-9>'.
bar := pb.New(0).SetUnits(pb.U_BYTES)
bar.Output = os.Stderr
pbReader := bar.NewProxyReader(reader)
pbReader := bar.NewProxyReader(outReader)
bar.Start()
defer bar.Finish()
@ -136,291 +128,40 @@ may also specify the level of compression by specifying '-l=<1-9>'.
bar := pb.New(0).SetUnits(pb.U_BYTES)
bar.Output = os.Stderr
preexisting := true
pathIsDir := false
if stat, err := os.Stat(outPath); err != nil && os.IsNotExist(err) {
preexisting = false
} else if err != nil {
res.SetError(err, cmds.ErrNormal)
return
} else if stat.IsDir() {
pathIsDir = true
}
var tarReader *tar.Reader
if compress {
gzipReader, err := gzip.NewReader(reader)
// wrap the reader with the progress bar proxy reader
// if the output is compressed, also wrap it in a gzip.Reader
var reader io.Reader
if cmprs {
gzipReader, err := gzip.NewReader(outReader)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
defer gzipReader.Close()
pbReader := bar.NewProxyReader(gzipReader)
tarReader = tar.NewReader(pbReader)
reader = bar.NewProxyReader(gzipReader)
} else {
pbReader := bar.NewProxyReader(reader)
tarReader = tar.NewReader(pbReader)
reader = bar.NewProxyReader(outReader)
}
bar.Start()
defer bar.Finish()
for i := 0; ; i++ {
header, err := tarReader.Next()
if err != nil && err != io.EOF {
res.SetError(err, cmds.ErrNormal)
return
}
if header == nil || err == io.EOF {
break
}
if header.Typeflag == tar.TypeDir {
pathElements := strings.Split(header.Name, "/")
if !preexisting {
pathElements = pathElements[1:]
}
path := fp.Join(pathElements...)
path = fp.Join(outPath, path)
if i == 0 {
outPath = path
}
err = os.MkdirAll(path, 0755)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
continue
}
var path string
if i == 0 {
if preexisting {
if !pathIsDir {
res.SetError(os.ErrExist, cmds.ErrNormal)
return
}
path = fp.Join(outPath, header.Name)
} else {
path = outPath
}
} else {
pathElements := strings.Split(header.Name, "/")[1:]
path = fp.Join(pathElements...)
path = fp.Join(outPath, path)
}
file, err := os.Create(path)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
_, err = io.Copy(file, tarReader)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
err = file.Close()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
extractor := &tar.Extractor{outPath}
err := extractor.Extract(reader)
if err != nil {
res.SetError(err, cmds.ErrNormal)
}
},
}
func getCheckOptions(req cmds.Request) error {
compressionLevel, found, _ := req.Option("compression-level").Int()
if found && (compressionLevel < 1 || compressionLevel > 9) {
cmplvl, found, _ := req.Option("compression-level").Int()
if found && (cmplvl < 1 || cmplvl > 9) {
return ErrInvalidCompressionLevel
}
return nil
}
func get(node *core.IpfsNode, path string, compression int) (io.Reader, error) {
buf := NewBufReadWriter()
go func() {
err := copyFilesAsTar(node, buf, path, compression)
if err != nil {
log.Error(err)
return
}
}()
return buf, nil
}
func copyFilesAsTar(node *core.IpfsNode, buf *bufReadWriter, path string, compression int) error {
var gzipWriter *gzip.Writer
var writer *tar.Writer
var err error
if compression != gzip.NoCompression {
gzipWriter, err = gzip.NewWriterLevel(buf, compression)
if err != nil {
return err
}
writer = tar.NewWriter(gzipWriter)
} else {
writer = tar.NewWriter(buf)
}
err = _copyFilesAsTar(node, writer, buf, path, nil)
if err != nil {
return err
}
buf.mutex.Lock()
err = writer.Close()
if err != nil {
return err
}
if gzipWriter != nil {
err = gzipWriter.Close()
if err != nil {
return err
}
}
buf.Close()
buf.mutex.Unlock()
buf.Signal()
return nil
}
func _copyFilesAsTar(node *core.IpfsNode, writer *tar.Writer, buf *bufReadWriter, path string, dagnode *dag.Node) error {
var err error
if dagnode == nil {
dagnode, err = node.Resolver.ResolvePath(path)
if err != nil {
return err
}
}
pb := new(upb.Data)
err = proto.Unmarshal(dagnode.Data, pb)
if err != nil {
return err
}
if pb.GetType() == upb.Data_Directory {
buf.mutex.Lock()
err = writer.WriteHeader(&tar.Header{
Name: path,
Typeflag: tar.TypeDir,
Mode: 0777,
// TODO: set mode, dates, etc. when added to unixFS
})
buf.mutex.Unlock()
if err != nil {
return err
}
for _, link := range dagnode.Links {
err := _copyFilesAsTar(node, writer, buf, p.Join(path, link.Name), link.Node)
if err != nil {
return err
}
}
return nil
}
buf.mutex.Lock()
err = writer.WriteHeader(&tar.Header{
Name: path,
Size: int64(pb.GetFilesize()),
Typeflag: tar.TypeReg,
Mode: 0644,
// TODO: set mode, dates, etc. when added to unixFS
})
buf.mutex.Unlock()
if err != nil {
return err
}
reader, err := uio.NewDagReader(dagnode, node.DAG)
if err != nil {
return err
}
_, err = syncCopy(writer, reader, buf)
if err != nil {
return err
}
return nil
}
type bufReadWriter struct {
buf bytes.Buffer
closed bool
signalChan chan struct{}
mutex *sync.Mutex
}
func NewBufReadWriter() *bufReadWriter {
return &bufReadWriter{
signalChan: make(chan struct{}),
mutex: &sync.Mutex{},
}
}
func (i *bufReadWriter) Read(p []byte) (int, error) {
<-i.signalChan
i.mutex.Lock()
defer i.mutex.Unlock()
if i.buf.Len() == 0 {
if i.closed {
return 0, io.EOF
}
return 0, nil
}
n, err := i.buf.Read(p)
if err == io.EOF && !i.closed || i.buf.Len() > 0 {
return n, nil
}
return n, err
}
func (i *bufReadWriter) Write(p []byte) (int, error) {
return i.buf.Write(p)
}
func (i *bufReadWriter) Signal() {
i.signalChan <- struct{}{}
}
func (i *bufReadWriter) Close() error {
i.closed = true
return nil
}
func syncCopy(writer io.Writer, reader io.Reader, buf *bufReadWriter) (int64, error) {
written := int64(0)
copyBuf := make([]byte, 32*1024)
for {
nr, err := reader.Read(copyBuf)
if nr > 0 {
buf.mutex.Lock()
nw, err := writer.Write(copyBuf[:nr])
buf.mutex.Unlock()
if err != nil {
return written, err
}
written += int64(nw)
buf.Signal()
}
if err == io.EOF {
break
}
if err != nil {
return written, err
}
}
return written, nil
return utar.NewReader(path, node.DAG, node.Resolver, compression)
}

108
thirdparty/tar/extractor.go vendored Normal file
View File

@ -0,0 +1,108 @@
package tar
import (
"archive/tar"
"io"
"os"
fp "path/filepath"
"strings"
)
type Extractor struct {
Path string
}
func (te *Extractor) Extract(reader io.Reader) error {
tarReader := tar.NewReader(reader)
// Check if the output path already exists, so we know whether we should
// create our output with that name, or if we should put the output inside
// a preexisting directory
exists := true
pathIsDir := false
if stat, err := os.Stat(te.Path); err != nil && os.IsNotExist(err) {
exists = false
} else if err != nil {
return err
} else if stat.IsDir() {
pathIsDir = true
}
// files come recursively in order (i == 0 is root directory)
for i := 0; ; i++ {
header, err := tarReader.Next()
if err != nil && err != io.EOF {
return err
}
if header == nil || err == io.EOF {
break
}
if header.Typeflag == tar.TypeDir {
err = te.extractDir(header, i, exists)
if err != nil {
return err
}
continue
}
err = te.extractFile(header, tarReader, i, exists, pathIsDir)
if err != nil {
return err
}
}
return nil
}
func (te *Extractor) extractDir(h *tar.Header, depth int, exists bool) error {
pathElements := strings.Split(h.Name, "/")
if !exists {
pathElements = pathElements[1:]
}
path := fp.Join(pathElements...)
path = fp.Join(te.Path, path)
if depth == 0 {
// if this is the root root directory, use it as the output path for remaining files
te.Path = path
}
err := os.MkdirAll(path, 0755)
if err != nil {
return err
}
return nil
}
func (te *Extractor) extractFile(h *tar.Header, r *tar.Reader, depth int, exists bool, pathIsDir bool) error {
var path string
if depth == 0 {
// if depth is 0, this is the only file (we aren't 'ipfs get'ing a directory)
switch {
case exists && !pathIsDir:
return os.ErrExist
case exists && pathIsDir:
path = fp.Join(te.Path, h.Name)
case !exists:
path = te.Path
}
} else {
// we are outputting a directory, this file is inside of it
pathElements := strings.Split(h.Name, "/")[1:]
path = fp.Join(pathElements...)
path = fp.Join(te.Path, path)
}
file, err := os.Create(path)
if err != nil {
return err
}
defer file.Close()
_, err = io.Copy(file, r)
if err != nil {
return err
}
return nil
}

200
unixfs/tar/reader.go Normal file
View File

@ -0,0 +1,200 @@
package tar
import (
"archive/tar"
"bytes"
"compress/gzip"
"io"
p "path"
mdag "github.com/jbenet/go-ipfs/merkledag"
path "github.com/jbenet/go-ipfs/path"
uio "github.com/jbenet/go-ipfs/unixfs/io"
upb "github.com/jbenet/go-ipfs/unixfs/pb"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
)
type Reader struct {
buf bytes.Buffer
closed bool
signalChan chan struct{}
dag mdag.DAGService
resolver *path.Resolver
writer *tar.Writer
gzipWriter *gzip.Writer
err error
}
func NewReader(path string, dag mdag.DAGService, resolver *path.Resolver, compression int) (*Reader, error) {
reader := &Reader{
signalChan: make(chan struct{}),
dag: dag,
resolver: resolver,
}
var err error
if compression != gzip.NoCompression {
reader.gzipWriter, err = gzip.NewWriterLevel(&reader.buf, compression)
if err != nil {
return nil, err
}
reader.writer = tar.NewWriter(reader.gzipWriter)
} else {
reader.writer = tar.NewWriter(&reader.buf)
}
dagnode, err := resolver.ResolvePath(path)
if err != nil {
return nil, err
}
// writeToBuf will write the data to the buffer, and will signal when there
// is new data to read
go reader.writeToBuf(dagnode, path, 0)
return reader, nil
}
func (i *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
pb := new(upb.Data)
err := proto.Unmarshal(dagnode.Data, pb)
if err != nil {
i.emitError(err)
return
}
if depth == 0 {
defer i.close()
}
if pb.GetType() == upb.Data_Directory {
err = i.writer.WriteHeader(&tar.Header{
Name: path,
Typeflag: tar.TypeDir,
Mode: 0777,
// TODO: set mode, dates, etc. when added to unixFS
})
if err != nil {
i.emitError(err)
return
}
for _, link := range dagnode.Links {
childNode, err := link.GetNode(i.dag)
if err != nil {
i.emitError(err)
return
}
i.writeToBuf(childNode, p.Join(path, link.Name), depth+1)
}
return
}
err = i.writer.WriteHeader(&tar.Header{
Name: path,
Size: int64(pb.GetFilesize()),
Typeflag: tar.TypeReg,
Mode: 0644,
// TODO: set mode, dates, etc. when added to unixFS
})
if err != nil {
i.emitError(err)
return
}
reader, err := uio.NewDagReader(dagnode, i.dag)
if err != nil {
i.emitError(err)
return
}
err = i.syncCopy(reader)
if err != nil {
i.emitError(err)
return
}
}
func (i *Reader) Read(p []byte) (int, error) {
// wait for the goroutine that is writing data to the buffer to tell us
// there is something to read
if !i.closed {
<-i.signalChan
}
if i.err != nil {
return 0, i.err
}
if !i.closed {
defer i.signal()
}
if i.buf.Len() == 0 {
if i.closed {
return 0, io.EOF
}
return 0, nil
}
n, err := i.buf.Read(p)
if err == io.EOF && !i.closed || i.buf.Len() > 0 {
return n, nil
}
return n, err
}
func (i *Reader) signal() {
i.signalChan <- struct{}{}
}
func (i *Reader) emitError(err error) {
i.err = err
i.signal()
}
func (i *Reader) close() {
i.closed = true
i.flush()
}
func (i *Reader) flush() {
defer i.signal()
err := i.writer.Close()
if err != nil {
i.emitError(err)
return
}
if i.gzipWriter != nil {
err = i.gzipWriter.Close()
if err != nil {
i.emitError(err)
return
}
}
}
func (i *Reader) syncCopy(reader io.Reader) error {
buf := make([]byte, 32*1024)
for {
nr, err := reader.Read(buf)
if nr > 0 {
_, err := i.writer.Write(buf[:nr])
if err != nil {
return err
}
i.signal()
// wait for Read to finish reading
<-i.signalChan
}
if err == io.EOF {
break
}
if err != nil {
return err
}
}
return nil
}