Review changes

This commit is contained in:
gammazero 2021-02-26 08:41:08 -08:00
parent e22413ab92
commit fcbe47bc4b
13 changed files with 455 additions and 297 deletions

View File

@ -288,9 +288,10 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
return fmt.Errorf("fs-repo requires migration")
}
// Fetch migrations using the current distribution
migrations.SetIpfsDistPath(migrations.CurrentIpfsDist)
err = migrations.RunMigration(cctx.Context(), fsrepo.RepoVersion, "")
fetcher := migrations.NewHttpFetcher()
// Fetch migrations from current distribution, or location from environ
fetcher.SetDistPath(migrations.GetDistPathEnv(migrations.CurrentIpfsDist))
err = migrations.RunMigration(cctx.Context(), fetcher, fsrepo.RepoVersion, "", false)
if err != nil {
fmt.Println("The migrations of fs-repo failed:")
fmt.Printf(" %s\n", err)

View File

@ -7,7 +7,6 @@ import (
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"os/exec"
"path"
@ -15,68 +14,22 @@ import (
"strings"
)
const (
// Current dirstibution to fetch migrations from
CurrentIpfsDist = "/ipfs/Qme8pJhBidEUXRdpcWLGR2fkG5kdwVnaMh3kabjfP8zz7Y"
envIpfsDistPath = "IPFS_DIST_PATH"
// Distribution
gatewayURL = "https://ipfs.io"
ipfsDist = "/ipns/dist.ipfs.io"
// Maximum download size
fetchSizeLimit = 1024 * 1024 * 512
)
type limitReadCloser struct {
io.Reader
io.Closer
}
var ipfsDistPath string
func init() {
SetIpfsDistPath("")
}
// SetIpfsDistPath sets the ipfs path to the distribution site. If an empty
// string is given, then the path is set using the IPFS_DIST_PATH environ
// veriable, or the default dns link value if that is not defined.
func SetIpfsDistPath(distPath string) {
if distPath != "" {
ipfsDistPath = distPath
return
}
if dist := os.Getenv(envIpfsDistPath); dist != "" {
ipfsDistPath = dist
} else {
ipfsDistPath = ipfsDist
}
}
// FetchBinary downloads an archive from the distribution site and unpacks it.
//
// The base name of the archive file, inside the distribution directory on
// distribution site, may differ from the distribution name. If it does, then
// specify arcName.
//
// The base name of the binary inside the archive may differ from the base
// archive name. If it does, then specify binName. For example, the following
// is needed because the archive "go-ipfs_v0.7.0_linux-amd64.tar.gz" contains a
// binary named "ipfs"
//
// FetchBinary(ctx, "go-ipfs", "v0.7.0", "go-ipfs", "ipfs", tmpDir)
// FetchBinary(ctx, fetcher, "go-ipfs", "v0.7.0", "ipfs", tmpDir)
//
// If out is a directory, then the binary is written to that directory with the
// same name it has inside the archive. Otherwise, the binary file is written
// to the file named by out.
func FetchBinary(ctx context.Context, dist, ver, arcName, binName, out string) (string, error) {
// If archive base name not specified, then it is same as dist.
if arcName == "" {
arcName = dist
}
func FetchBinary(ctx context.Context, fetcher Fetcher, dist, ver, binName, out string) (string, error) {
// The archive file name is the base of dist to support possible subdir in
// dist, for example: "ipfs-repo-migrations/ipfs-11-to-12"
arcName := path.Base(dist)
// If binary base name is not specified, then it is same as archive base name.
if binName == "" {
binName = arcName
@ -101,6 +54,18 @@ func FetchBinary(ctx context.Context, dist, ver, arcName, binName, out string) (
}
// out exists and is a directory, so compose final name
out = path.Join(out, binName)
// Check if the binary already exists in the directory
fi, err = os.Stat(out)
if !os.IsNotExist(err) {
if err != nil {
return "", err
}
return "", &os.PathError{
Op: "FetchBinary",
Path: out,
Err: os.ErrExist,
}
}
}
// Create temp directory to store download
@ -115,11 +80,10 @@ func FetchBinary(ctx context.Context, dist, ver, arcName, binName, out string) (
atype = "zip"
}
arcName = makeArchiveName(arcName, ver, atype)
arcIpfsPath := makeIpfsPath(dist, ver, arcName)
arcDistPath, arcFullName := makeArchivePath(dist, arcName, ver, atype)
// Create a file to write the archive data to
arcPath := path.Join(tmpDir, arcName)
arcPath := path.Join(tmpDir, arcFullName)
arcFile, err := os.Create(arcPath)
if err != nil {
return "", err
@ -127,7 +91,7 @@ func FetchBinary(ctx context.Context, dist, ver, arcName, binName, out string) (
defer arcFile.Close()
// Open connection to download archive from ipfs path
rc, err := fetch(ctx, arcIpfsPath)
rc, err := fetcher.Fetch(ctx, arcDistPath)
if err != nil {
return "", err
}
@ -155,73 +119,6 @@ func FetchBinary(ctx context.Context, dist, ver, arcName, binName, out string) (
return out, nil
}
// fetch attempts to fetch the file at the given ipfs path, first using the
// local ipfs api if available, then using http. Returns io.ReadCloser on
// success, which caller must close.
func fetch(ctx context.Context, ipfsPath string) (io.ReadCloser, error) {
// Try fetching via ipfs daemon
rc, err := ipfsFetch(ctx, ipfsPath)
if err == nil {
// Transferred using local ipfs daemon
return rc, nil
}
// Try fetching via HTTP
return httpFetch(ctx, gatewayURL+ipfsPath)
}
// ipfsFetch attempts to fetch the file at the given ipfs path using the local
// ipfs api. Returns io.ReadCloser on success, which caller must close.
func ipfsFetch(ctx context.Context, ipfsPath string) (io.ReadCloser, error) {
sh, _, err := ApiShell("")
if err != nil {
return nil, err
}
resp, err := sh.Request("cat", ipfsPath).Send(ctx)
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}
return newLimitReadCloser(resp.Output, fetchSizeLimit), nil
}
// httpFetch attempts to fetch the file at the given URL. Returns
// io.ReadCloser on success, which caller must close.
func httpFetch(ctx context.Context, url string) (io.ReadCloser, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, fmt.Errorf("http.NewRequest error: %s", err)
}
req.Header.Set("User-Agent", "go-ipfs")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("http.DefaultClient.Do error: %s", err)
}
if resp.StatusCode >= 400 {
defer resp.Body.Close()
mes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading error body: %s", err)
}
return nil, fmt.Errorf("GET %s error: %s: %s", url, resp.Status, string(mes))
}
return newLimitReadCloser(resp.Body, fetchSizeLimit), nil
}
func newLimitReadCloser(rc io.ReadCloser, limit int64) io.ReadCloser {
return limitReadCloser{
Reader: io.LimitReader(rc, limit),
Closer: rc,
}
}
// osWithVariant returns the OS name with optional variant.
// Currently returns either runtime.GOOS, or "linux-musl".
func osWithVariant() (string, error) {
@ -255,18 +152,21 @@ func osWithVariant() (string, error) {
return "linux", nil
}
// makeArchiveName composes the name of a migration binary archive.
// makeArchivePath composes the path, relative to the distribution site, from which to
// download a binary. The path returned does not contain the distribution site path,
// e.g. "/ipns/dist.ipfs.io/", since that is know to the fetcher.
//
// The archive name is in the format: name_version_osv-GOARCH.atype
// Example: ipfs-10-to-11_v1.8.0_darwin-amd64.tar.gz
func makeArchiveName(name, ver, atype string) string {
return fmt.Sprintf("%s_%s_%s-%s.%s", name, ver, runtime.GOOS, runtime.GOARCH, atype)
}
// makeIpfsPath composes the name ipfs path location to download a migration
// binary from the distribution site.
// Returns the archive path and the base name.
//
// The ipfs path format: distBaseCID/rootdir/version/name/archive
func makeIpfsPath(dist, ver, arcName string) string {
return fmt.Sprintf("%s/%s/%s/%s", ipfsDistPath, dist, ver, arcName)
// The ipfs path format is: distribution/version/archiveName
// - distribution is the name of a distribution, such as "go-ipfs"
// - version is the version to fetch, such as "v0.8.0-rc2"
// - archiveName is formatted as name_version_osv-GOARCH.atype, such as
// "go-ipfs_v0.8.0-rc2_linux-amd64.tar.gz"
//
// This would form the path:
// go-ipfs/v0.8.0/go-ipfs_v0.8.0_linux-amd64.tar.gz
func makeArchivePath(dist, name, ver, atype string) (string, string) {
arcName := fmt.Sprintf("%s_%s_%s-%s.%s", name, ver, runtime.GOOS, runtime.GOARCH, atype)
return fmt.Sprintf("%s/%s/%s", dist, ver, arcName), arcName
}

View File

@ -3,17 +3,63 @@ package migrations
import (
"bufio"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"path"
"strings"
"testing"
)
func TestSetIpfsDistPath(t *testing.T) {
func createTestServer() *httptest.Server {
reqHandler := func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
if strings.Contains(r.URL.Path, "not-here") {
http.NotFound(w, r)
} else if strings.HasSuffix(r.URL.Path, "versions") {
fmt.Fprint(w, "v1.0.0\nv1.1.0\nv1.1.2\nv2.0.0-rc1\n2.0.0\nv2.0.1\n")
} else if strings.HasSuffix(r.URL.Path, ".tar.gz") {
createFakeArchive(r.URL.Path, false, w)
} else if strings.HasSuffix(r.URL.Path, "zip") {
createFakeArchive(r.URL.Path, true, w)
} else {
http.NotFound(w, r)
}
}
return httptest.NewServer(http.HandlerFunc(reqHandler))
}
func createFakeArchive(name string, archZip bool, w io.Writer) {
fileName := strings.Split(path.Base(name), "_")[0]
root := path.Base(path.Dir(path.Dir(name)))
// Simulate fetching go-ipfs, which has "ipfs" as the name in the archive.
if fileName == "go-ipfs" {
fileName = "ipfs"
}
var err error
if archZip {
err = writeZip(root, fileName, "FAKE DATA", w)
} else {
err = writeTarGzip(root, fileName, "FAKE DATA", w)
}
if err != nil {
panic(err)
}
}
func TestSetDistPath(t *testing.T) {
f1 := NewHttpFetcher()
f2 := NewHttpFetcher()
mf := NewMultiFetcher(f1, f2)
os.Unsetenv(envIpfsDistPath)
SetIpfsDistPath("")
if ipfsDistPath != ipfsDist {
mf.SetDistPath(GetDistPathEnv(""))
if f1.distPath != IpnsIpfsDist {
t.Error("did not set default dist path")
}
@ -24,17 +70,30 @@ func TestSetIpfsDistPath(t *testing.T) {
}
defer func() {
os.Unsetenv(envIpfsDistPath)
SetIpfsDistPath("")
}()
SetIpfsDistPath("")
if ipfsDistPath != testDist {
mf.SetDistPath(GetDistPathEnv(""))
if f1.distPath != testDist {
t.Error("did not set dist path from environ")
}
if f2.distPath != testDist {
t.Error("did not set dist path from environ")
}
mf.SetDistPath(GetDistPathEnv("ignored"))
if f1.distPath != testDist {
t.Error("did not set dist path from environ")
}
if f2.distPath != testDist {
t.Error("did not set dist path from environ")
}
testDist = "/unit/test/dist2"
SetIpfsDistPath(testDist)
if ipfsDistPath != testDist {
mf.SetDistPath(testDist)
if f1.distPath != testDist {
t.Error("did not set dist path")
}
if f2.distPath != testDist {
t.Error("did not set dist path")
}
}
@ -43,8 +102,12 @@ func TestHttpFetch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
url := gatewayURL + path.Join(ipfsDistPath, distFSRM, distVersions)
rc, err := httpFetch(ctx, url)
fetcher := NewHttpFetcher()
ts := createTestServer()
defer ts.Close()
fetcher.SetGateway(ts.URL)
rc, err := fetcher.Fetch(ctx, "/versions")
if err != nil {
t.Fatal(err)
}
@ -60,73 +123,18 @@ func TestHttpFetch(t *testing.T) {
t.Fatal("could not read versions:", err)
}
if len(out) < 14 {
if len(out) < 6 {
t.Fatal("do not get all expected data")
}
if out[0] != "v1.0.0" {
t.Fatal("expected v1.0.0 as first line, got", out[0])
}
// Check bad URL
_, err = httpFetch(ctx, "")
if err == nil {
t.Fatal("expected error")
}
// Check unreachable URL
_, err = httpFetch(ctx, "http://127.0.0.123:65510")
if err == nil || !strings.HasSuffix(err.Error(), "connection refused") {
t.Fatal("expected 'connection refused' error")
}
// Check not found
url = gatewayURL + path.Join(ipfsDistPath, distFSRM, "no_such_file")
_, err = httpFetch(ctx, url)
_, err = fetcher.Fetch(ctx, "/no_such_file")
if err == nil || !strings.Contains(err.Error(), "404") {
t.Fatal("expected error 404")
}
}
func TestIpfsFetch(t *testing.T) {
_, err := ApiEndpoint("")
if err != nil {
t.Skip("skipped - local ipfs daemon not available")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
url := path.Join(ipfsDistPath, distFSRM, distVersions)
rc, err := ipfsFetch(ctx, url)
if err != nil {
t.Fatal(err)
}
defer rc.Close()
var out []string
scan := bufio.NewScanner(rc)
for scan.Scan() {
out = append(out, scan.Text())
}
err = scan.Err()
if err != nil {
t.Fatal("could not read versions:", err)
}
if len(out) < 14 {
t.Fatal("do not get all expected data")
}
if out[0] != "v1.0.0" {
t.Fatal("expected v1.0.0 as first line, got", out[0])
}
// Check bad URL
url = path.Join(ipfsDistPath, distFSRM, "no_such_file")
_, err = ipfsFetch(ctx, url)
if err == nil || !strings.Contains(err.Error(), "no link") {
t.Fatal("expected 'no link' error, got:", err)
}
}
func TestFetchBinary(t *testing.T) {
@ -139,13 +147,18 @@ func TestFetchBinary(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vers, err := DistVersions(ctx, distFSRM, false)
fetcher := NewHttpFetcher()
ts := createTestServer()
defer ts.Close()
fetcher.SetGateway(ts.URL)
vers, err := DistVersions(ctx, fetcher, distFSRM, false)
if err != nil {
t.Fatal(err)
}
t.Log("latest version of", distFSRM, "is", vers[len(vers)-1])
bin, err := FetchBinary(ctx, distFSRM, vers[0], distFSRM, "", tmpDir)
bin, err := FetchBinary(ctx, fetcher, distFSRM, vers[0], "", tmpDir)
if err != nil {
t.Fatal(err)
}
@ -157,7 +170,7 @@ func TestFetchBinary(t *testing.T) {
t.Log("downloaded and unpacked", fi.Size(), "byte file:", fi.Name())
bin, err = FetchBinary(ctx, "go-ipfs", "v0.3.5", "", "ipfs", tmpDir)
bin, err = FetchBinary(ctx, fetcher, "go-ipfs", "v0.3.5", "ipfs", tmpDir)
if err != nil {
t.Fatal(err)
}
@ -170,11 +183,18 @@ func TestFetchBinary(t *testing.T) {
t.Log("downloaded and unpacked", fi.Size(), "byte file:", fi.Name())
// Check error is destination already exists and is not directory
_, err = FetchBinary(ctx, "go-ipfs", "v0.3.5", "", "ipfs", bin)
_, err = FetchBinary(ctx, fetcher, "go-ipfs", "v0.3.5", "ipfs", bin)
if !os.IsExist(err) {
t.Fatal("expected 'exists' error")
t.Fatal("expected 'exists' error, got", err)
}
_, err = FetchBinary(ctx, fetcher, "go-ipfs", "v0.3.5", "ipfs", tmpDir)
if !os.IsExist(err) {
t.Error("expected 'exists' error, got:", err)
}
os.Remove(path.Join(tmpDir, "ipfs"))
// Check error creating temp download directory
err = os.Chmod(tmpDir, 0555)
if err != nil {
@ -184,9 +204,9 @@ func TestFetchBinary(t *testing.T) {
if err != nil {
panic(err)
}
_, err = FetchBinary(ctx, "go-ipfs", "v0.3.5", "", "ipfs", tmpDir)
_, err = FetchBinary(ctx, fetcher, "go-ipfs", "v0.3.5", "ipfs", tmpDir)
if !os.IsPermission(err) {
t.Error("expected 'permission'error")
t.Error("expected 'permission' error, got:", err)
}
err = os.Setenv("TMPDIR", "/tmp")
if err != nil {
@ -198,13 +218,13 @@ func TestFetchBinary(t *testing.T) {
}
// Check error if failure to fetch due to bad dist
_, err = FetchBinary(ctx, "no-such-dist", "v0.3.5", "", "ipfs", tmpDir)
_, err = FetchBinary(ctx, fetcher, "not-here", "v0.3.5", "ipfs", tmpDir)
if err == nil || !strings.Contains(err.Error(), "Not Found") {
t.Error("expected 'Not Found' error")
t.Error("expected 'Not Found' error, got:", err)
}
// Check error if failure to unpack archive
_, err = FetchBinary(ctx, "go-ipfs", "v0.3.5", "", "not-such-bin", tmpDir)
_, err = FetchBinary(ctx, fetcher, "go-ipfs", "v0.3.5", "not-such-bin", tmpDir)
if err == nil || err.Error() != "no binary found in archive" {
t.Error("expected 'no binary found in archive' error")
}

View File

@ -0,0 +1,96 @@
package migrations
import (
"context"
"io"
"os"
"strings"
)
const (
// Current dirstibution to fetch migrations from
CurrentIpfsDist = "/ipfs/Qme8pJhBidEUXRdpcWLGR2fkG5kdwVnaMh3kabjfP8zz7Y"
// Distribution IPNS path. Default for fetchers.
IpnsIpfsDist = "/ipns/dist.ipfs.io"
// Distribution environ variable
envIpfsDistPath = "IPFS_DIST_PATH"
)
type Fetcher interface {
// Fetch attempts to fetch the file at the given ipfs path.
// Returns io.ReadCloser on success, which caller must close.
Fetch(ctx context.Context, filePath string) (io.ReadCloser, error)
// SetDistPath sets the path to the distribution site for a Fetcher
SetDistPath(distPath string)
}
// MultiFetcher holds multiple Fetchers and provides a Fetch that tries each
// until one succeeds.
type MultiFetcher struct {
fetchers []Fetcher
}
type limitReadCloser struct {
io.Reader
io.Closer
}
// NewMultiFetcher creates a MultiFetcher with the given Fetchers. The
// Fetchers are tried in order ther passed to this function.
func NewMultiFetcher(f ...Fetcher) Fetcher {
mf := &MultiFetcher{
fetchers: make([]Fetcher, len(f)),
}
copy(mf.fetchers, f)
return mf
}
// Fetch attempts to fetch the file at each of its fetchers until one succeeds.
// Returns io.ReadCloser on success, which caller must close.
func (f *MultiFetcher) Fetch(ctx context.Context, ipfsPath string) (rc io.ReadCloser, err error) {
for _, fetcher := range f.fetchers {
rc, err = fetcher.Fetch(ctx, ipfsPath)
if err == nil {
// Transferred using this fetcher
return
}
}
return
}
// SetDistPath sets the path to the distribution site for all fetchers
func (f *MultiFetcher) SetDistPath(distPath string) {
if !strings.HasPrefix(distPath, "/") {
distPath = "/" + distPath
}
for _, fetcher := range f.fetchers {
fetcher.SetDistPath(distPath)
}
}
// NewLimitReadCloser returns a new io.ReadCloser with the reader wrappen in a
// io.LimitedReader limited to reading the amount specified.
func NewLimitReadCloser(rc io.ReadCloser, limit int64) io.ReadCloser {
return limitReadCloser{
Reader: io.LimitReader(rc, limit),
Closer: rc,
}
}
// GetDistPathEnv returns the IPFS path to the distribution site, using
// the value of environ variable specified by envIpfsDistPath. If the environ
// variable is not set, then returns the provided distPath, and if that is not set
// then returns the IPNS path.
//
// To get the IPFS path of the latest distribution, if not overriddin by the
// environ variable: GetDistPathEnv(CurrentIpfsDist)
func GetDistPathEnv(distPath string) string {
if dist := os.Getenv(envIpfsDistPath); dist != "" {
return dist
}
if distPath == "" {
return IpnsIpfsDist
}
return distPath
}

View File

@ -0,0 +1,91 @@
package migrations
import (
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"path"
"strings"
)
const (
defaultGatewayURL = "https://ipfs.io"
defaultFetchLimit = 1024 * 1024 * 512
)
// HttpFetcher fetches files over HTTP
type HttpFetcher struct {
gateway string
distPath string
limit int64
}
var _ Fetcher = (*HttpFetcher)(nil)
// NewHttpFetcher creates a new HttpFetcher
func NewHttpFetcher() *HttpFetcher {
return &HttpFetcher{
gateway: defaultGatewayURL,
distPath: IpnsIpfsDist,
limit: defaultFetchLimit,
}
}
// SetGateway sets the gateway URL
func (f *HttpFetcher) SetGateway(gatewayURL string) error {
gwURL, err := url.Parse(gatewayURL)
if err != nil {
return err
}
f.gateway = gwURL.String()
return nil
}
// SetDistPath sets the path to the distribution site.
func (f *HttpFetcher) SetDistPath(distPath string) {
if !strings.HasPrefix(distPath, "/") {
distPath = "/" + distPath
}
f.distPath = distPath
}
// SetFetchLimit sets the download size limit. A value of 0 means no limit.
func (f *HttpFetcher) SetFetchLimit(limit int64) {
f.limit = limit
}
// Fetch attempts to fetch the file at the given path, from the distribution
// site configured for this HttpFetcher. Returns io.ReadCloser on success,
// which caller must close.
func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) {
gwURL := f.gateway + path.Join(f.distPath, filePath)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, gwURL, nil)
if err != nil {
return nil, fmt.Errorf("http.NewRequest error: %s", err)
}
req.Header.Set("User-Agent", "go-ipfs")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("http.DefaultClient.Do error: %s", err)
}
if resp.StatusCode >= 400 {
defer resp.Body.Close()
mes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading error body: %s", err)
}
return nil, fmt.Errorf("GET %s error: %s: %s", gwURL, resp.Status, string(mes))
}
if f.limit != 0 {
return NewLimitReadCloser(resp.Body, f.limit), nil
}
return resp.Body, nil
}

View File

@ -10,7 +10,6 @@ import (
"strings"
"time"
api "github.com/ipfs/go-ipfs-api"
"github.com/mitchellh/go-homedir"
)
@ -52,23 +51,6 @@ func ApiEndpoint(ipfsDir string) (string, error) {
return parts[2] + ":" + parts[4], nil
}
// ApiShell creates a new ipfs api shell and checks that it is up. If the shell
// is available, then the shell and ipfs version are returned.
func ApiShell(ipfsDir string) (*api.Shell, string, error) {
apiEp, err := ApiEndpoint("")
if err != nil {
return nil, "", err
}
sh := api.NewShell(apiEp)
sh.SetTimeout(shellUpTimeout)
ver, _, err := sh.Version()
if err != nil {
return nil, "", errors.New("ipfs api shell not up")
}
sh.SetTimeout(0)
return sh, ver, nil
}
// IpfsDir returns the path of the ipfs directory. If dir specified, then
// returns the expanded version dir. If dir is "", then return the directory
// set by IPFS_PATH, or if IPFS_PATH is not set, then return the default

View File

@ -212,11 +212,4 @@ func TestApiEndpoint(t *testing.T) {
if val2 != val {
t.Fatal("expected", val, "got", val2)
}
_, _, err = ApiShell(fakeIpfs)
if err != nil {
if err.Error() != "ipfs api shell not up" {
t.Fatal("expected 'ipfs api shell not up' error")
}
}
}

View File

@ -21,7 +21,7 @@ const (
// RunMigration finds, downloads, and runs the individual migrations needed to
// migrate the repo from its current version to the target version.
func RunMigration(ctx context.Context, targetVer int, ipfsDir string) error {
func RunMigration(ctx context.Context, fetcher Fetcher, targetVer int, ipfsDir string, allowDowngrade bool) error {
ipfsDir, err := CheckIpfsDir(ipfsDir)
if err != nil {
return err
@ -34,6 +34,9 @@ func RunMigration(ctx context.Context, targetVer int, ipfsDir string) error {
// repo already at target version number
return nil
}
if fromVer > targetVer && !allowDowngrade {
return fmt.Errorf("downgrade not allowed from %d to %d", fromVer, targetVer)
}
log.Print("Looking for suitable migration binaries.")
@ -59,7 +62,7 @@ func RunMigration(ctx context.Context, targetVer int, ipfsDir string) error {
}
defer os.RemoveAll(tmpDir)
fetched, err := fetchMigrations(ctx, missing, tmpDir)
fetched, err := fetchMigrations(ctx, fetcher, missing, tmpDir)
if err != nil {
log.Print("Failed to download migrations.")
return err
@ -156,7 +159,7 @@ func runMigration(ctx context.Context, binPath, ipfsDir string, revert bool) err
// fetchMigrations downloads the requested migrations, and returns a slice with
// the paths of each binary, in the same order specified by needed.
func fetchMigrations(ctx context.Context, needed []string, destDir string) ([]string, error) {
func fetchMigrations(ctx context.Context, fetcher Fetcher, needed []string, destDir string) ([]string, error) {
osv, err := osWithVariant()
if err != nil {
return nil, err
@ -173,13 +176,13 @@ func fetchMigrations(ctx context.Context, needed []string, destDir string) ([]st
log.Printf("Downloading migration: %s...", name)
go func(i int, name string) {
defer wg.Done()
distDir := path.Join(distMigsRoot, name)
ver, err := LatestDistVersion(ctx, distDir)
dist := path.Join(distMigsRoot, name)
ver, err := LatestDistVersion(ctx, fetcher, dist, false)
if err != nil {
log.Printf("could not get latest version of migration %s: %s", name, err)
return
}
loc, err := FetchBinary(ctx, distDir, ver, name, name, destDir)
loc, err := FetchBinary(ctx, fetcher, dist, ver, name, destDir)
if err != nil {
log.Printf("could not download %s: %s", name, err)
return

View File

@ -3,15 +3,12 @@ package migrations
import (
"context"
"io/ioutil"
"net/http"
"os"
"path"
"strings"
"testing"
)
const testIPFSDistPath = "/ipfs/Qme8pJhBidEUXRdpcWLGR2fkG5kdwVnaMh3kabjfP8zz7Y"
func TestFindMigrations(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "migratetest")
if err != nil {
@ -118,14 +115,11 @@ func TestFetchMigrations(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
SetIpfsDistPath(testIPFSDistPath)
_, err := LatestDistVersion(ctx, "ipfs-1-to-2")
if err != nil {
if strings.Contains(err.Error(), http.StatusText(http.StatusNotFound)) {
t.Skip("skip - migrations not yet available on distribution site")
}
t.Fatal(err)
}
fetcher := NewHttpFetcher()
fetcher.SetDistPath(CurrentIpfsDist)
ts := createTestServer()
defer ts.Close()
fetcher.SetGateway(ts.URL)
tmpDir, err := ioutil.TempDir("", "migratetest")
if err != nil {
@ -134,7 +128,7 @@ func TestFetchMigrations(t *testing.T) {
defer os.RemoveAll(tmpDir)
needed := []string{"ipfs-1-to-2", "ipfs-2-to-3"}
fetched, err := fetchMigrations(ctx, needed, tmpDir)
fetched, err := fetchMigrations(ctx, fetcher, needed, tmpDir)
if err != nil {
t.Fatal(err)
}
@ -147,6 +141,52 @@ func TestFetchMigrations(t *testing.T) {
}
}
func TestRunMigrations(t *testing.T) {
var err error
fakeHome, err = ioutil.TempDir("", "testhome")
if err != nil {
panic(err)
}
defer os.RemoveAll(fakeHome)
os.Setenv("HOME", fakeHome)
fakeIpfs := path.Join(fakeHome, ".ipfs")
err = os.Mkdir(fakeIpfs, os.ModePerm)
if err != nil {
panic(err)
}
testVer := 11
err = WriteRepoVersion(fakeIpfs, testVer)
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := NewHttpFetcher()
fetcher.SetDistPath(CurrentIpfsDist)
ts := createTestServer()
defer ts.Close()
fetcher.SetGateway(ts.URL)
targetVer := 9
err = RunMigration(ctx, fetcher, targetVer, fakeIpfs, false)
if err == nil || !strings.HasPrefix(err.Error(), "downgrade not allowed") {
t.Fatal("expected 'downgrade not alloed' error")
}
err = RunMigration(ctx, fetcher, targetVer, fakeIpfs, true)
if err != nil {
if !strings.HasPrefix(err.Error(), "migration ipfs-10-to-11 failed") {
t.Fatal(err)
}
}
}
func createFakeBin(from, to int, tmpDir string) {
migPath := path.Join(tmpDir, ExeName(migrationName(from, to)))
emptyFile, err := os.Create(migPath)

View File

@ -30,13 +30,13 @@ func unpackArchive(arcPath, atype, root, name, out string) error {
func unpackTgz(arcPath, root, name, out string) error {
fi, err := os.Open(arcPath)
if err != nil {
return fmt.Errorf("cannot open archive file: %s", err)
return fmt.Errorf("cannot open archive file: %w", err)
}
defer fi.Close()
gzr, err := gzip.NewReader(fi)
if err != nil {
return fmt.Errorf("error opening gzip reader: %s", err)
return fmt.Errorf("error opening gzip reader: %w", err)
}
defer gzr.Close()
@ -50,7 +50,7 @@ func unpackTgz(arcPath, root, name, out string) error {
if err == io.EOF {
break
}
return fmt.Errorf("cannot read archive: %s", err)
return fmt.Errorf("cannot read archive: %w", err)
}
if th.Name == lookFor {
@ -69,7 +69,7 @@ func unpackTgz(arcPath, root, name, out string) error {
func unpackZip(arcPath, root, name, out string) error {
zipr, err := zip.OpenReader(arcPath)
if err != nil {
return fmt.Errorf("error opening zip reader: %s", err)
return fmt.Errorf("error opening zip reader: %w", err)
}
defer zipr.Close()
@ -79,7 +79,7 @@ func unpackZip(arcPath, root, name, out string) error {
if fis.Name == lookFor {
rc, err := fis.Open()
if err != nil {
return fmt.Errorf("error extracting binary from archive: %s", err)
return fmt.Errorf("error extracting binary from archive: %w", err)
}
bin = rc
@ -97,7 +97,7 @@ func unpackZip(arcPath, root, name, out string) error {
func writeToPath(rc io.Reader, out string) error {
binfi, err := os.Create(out)
if err != nil {
return fmt.Errorf("error opening tmp bin path '%s': %s", out, err)
return fmt.Errorf("error creating output file '%s': %w", out, err)
}
defer binfi.Close()

View File

@ -5,6 +5,7 @@ import (
"archive/zip"
"bufio"
"compress/gzip"
"io"
"io/ioutil"
"os"
"path"
@ -49,7 +50,7 @@ func TestUnpackTgz(t *testing.T) {
testTarGzip := path.Join(tmpDir, "test.tar.gz")
testData := "some data"
err = writeTarGzip(testTarGzip, "testroot", "testfile", testData)
err = writeTarGzipFile(testTarGzip, "testroot", "testfile", testData)
if err != nil {
panic(err)
}
@ -97,7 +98,7 @@ func TestUnpackZip(t *testing.T) {
testZip := path.Join(tmpDir, "test.zip")
testData := "some data"
err = writeZip(testZip, "testroot", "testfile", testData)
err = writeZipFile(testZip, "testroot", "testfile", testData)
if err != nil {
panic(err)
}
@ -125,21 +126,38 @@ func TestUnpackZip(t *testing.T) {
}
}
func writeTarGzip(archName, root, fileName, data string) error {
func writeTarGzipFile(archName, root, fileName, data string) error {
archFile, err := os.Create(archName)
if err != nil {
return err
}
defer archFile.Close()
wr := bufio.NewWriter(archFile)
w := bufio.NewWriter(archFile)
err = writeTarGzip(root, fileName, data, w)
if err != nil {
return err
}
// Flush buffered data to file
if err = w.Flush(); err != nil {
return err
}
// Close tar file
if err = archFile.Close(); err != nil {
return err
}
return nil
}
func writeTarGzip(root, fileName, data string, w io.Writer) error {
// gzip writer writes to buffer
gzw := gzip.NewWriter(wr)
gzw := gzip.NewWriter(w)
defer gzw.Close()
// tar writer writes to gzip
tw := tar.NewWriter(gzw)
defer tw.Close()
var err error
if fileName != "" {
hdr := &tar.Header{
Name: path.Join(root, fileName),
@ -163,26 +181,34 @@ func writeTarGzip(archName, root, fileName, data string) error {
if err = gzw.Close(); err != nil {
return err
}
// Flush buffered data to file
if err = wr.Flush(); err != nil {
return nil
}
func writeZipFile(archName, root, fileName, data string) error {
archFile, err := os.Create(archName)
if err != nil {
return err
}
// Close tar file
defer archFile.Close()
w := bufio.NewWriter(archFile)
err = writeZip(root, fileName, data, w)
if err != nil {
return err
}
// Flush buffered data to file
if err = w.Flush(); err != nil {
return err
}
// Close zip file
if err = archFile.Close(); err != nil {
return err
}
return nil
}
func writeZip(archName, root, fileName, data string) error {
archFile, err := os.Create(archName)
if err != nil {
return err
}
defer archFile.Close()
wr := bufio.NewWriter(archFile)
zw := zip.NewWriter(wr)
func writeZip(root, fileName, data string, w io.Writer) error {
zw := zip.NewWriter(w)
defer zw.Close()
// Write file name
@ -200,13 +226,5 @@ func writeZip(archName, root, fileName, data string) error {
if err = zw.Close(); err != nil {
return err
}
// Flush buffered data to file
if err = wr.Flush(); err != nil {
return err
}
// Close zip file
if err = archFile.Close(); err != nil {
return err
}
return nil
}

View File

@ -18,17 +18,21 @@ const distVersions = "versions"
// LatestDistVersion returns the latest version, of the specified distribution,
// that is available on the distribution site.
func LatestDistVersion(ctx context.Context, dist string) (string, error) {
vs, err := DistVersions(ctx, dist, false)
func LatestDistVersion(ctx context.Context, fetcher Fetcher, dist string, stableOnly bool) (string, error) {
vs, err := DistVersions(ctx, fetcher, dist, false)
if err != nil {
return "", err
}
for i := len(vs) - 1; i >= 0; i-- {
ver := vs[i]
if !strings.Contains(ver, "-dev") {
return ver, nil
if stableOnly && strings.Contains(ver, "-rc") {
continue
}
if strings.Contains(ver, "-dev") {
continue
}
return ver, nil
}
return "", errors.New("could not find a non dev version")
}
@ -36,8 +40,8 @@ func LatestDistVersion(ctx context.Context, dist string) (string, error) {
// DistVersions returns all versions of the specified distribution, that are
// available on the distriburion site. List is in ascending order, unless
// sortDesc is true.
func DistVersions(ctx context.Context, dist string, sortDesc bool) ([]string, error) {
rc, err := fetch(ctx, path.Join(ipfsDistPath, dist, distVersions))
func DistVersions(ctx context.Context, fetcher Fetcher, dist string, sortDesc bool) ([]string, error) {
rc, err := fetcher.Fetch(ctx, path.Join(dist, distVersions))
if err != nil {
return nil, err
}

View File

@ -14,7 +14,12 @@ func TestDistVersions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vers, err := DistVersions(ctx, testDist, true)
fetcher := NewHttpFetcher()
ts := createTestServer()
defer ts.Close()
fetcher.SetGateway(ts.URL)
vers, err := DistVersions(ctx, fetcher, testDist, true)
if err != nil {
t.Fatal(err)
}
@ -29,7 +34,12 @@ func TestLatestDistVersion(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
latest, err := LatestDistVersion(ctx, testDist)
fetcher := NewHttpFetcher()
//ts := createTestServer()
//defer ts.Close()
//fetcher.SetGateway(ts.URL)
latest, err := LatestDistVersion(ctx, fetcher, testDist, false)
if err != nil {
t.Fatal(err)
}