diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index 92374dfd8..c9b05497b 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -288,9 +288,10 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment return fmt.Errorf("fs-repo requires migration") } - // Fetch migrations using the current distribution - migrations.SetIpfsDistPath(migrations.CurrentIpfsDist) - err = migrations.RunMigration(cctx.Context(), fsrepo.RepoVersion, "") + fetcher := migrations.NewHttpFetcher() + // Fetch migrations from current distribution, or location from environ + fetcher.SetDistPath(migrations.GetDistPathEnv(migrations.CurrentIpfsDist)) + err = migrations.RunMigration(cctx.Context(), fetcher, fsrepo.RepoVersion, "", false) if err != nil { fmt.Println("The migrations of fs-repo failed:") fmt.Printf(" %s\n", err) diff --git a/repo/fsrepo/migrations/fetch.go b/repo/fsrepo/migrations/fetch.go index 0814bd0c6..282978fe3 100644 --- a/repo/fsrepo/migrations/fetch.go +++ b/repo/fsrepo/migrations/fetch.go @@ -7,7 +7,6 @@ import ( "fmt" "io" "io/ioutil" - "net/http" "os" "os/exec" "path" @@ -15,68 +14,22 @@ import ( "strings" ) -const ( - // Current dirstibution to fetch migrations from - CurrentIpfsDist = "/ipfs/Qme8pJhBidEUXRdpcWLGR2fkG5kdwVnaMh3kabjfP8zz7Y" - - envIpfsDistPath = "IPFS_DIST_PATH" - - // Distribution - gatewayURL = "https://ipfs.io" - ipfsDist = "/ipns/dist.ipfs.io" - - // Maximum download size - fetchSizeLimit = 1024 * 1024 * 512 -) - -type limitReadCloser struct { - io.Reader - io.Closer -} - -var ipfsDistPath string - -func init() { - SetIpfsDistPath("") -} - -// SetIpfsDistPath sets the ipfs path to the distribution site. If an empty -// string is given, then the path is set using the IPFS_DIST_PATH environ -// veriable, or the default dns link value if that is not defined. -func SetIpfsDistPath(distPath string) { - if distPath != "" { - ipfsDistPath = distPath - return - } - - if dist := os.Getenv(envIpfsDistPath); dist != "" { - ipfsDistPath = dist - } else { - ipfsDistPath = ipfsDist - } -} - // FetchBinary downloads an archive from the distribution site and unpacks it. // -// The base name of the archive file, inside the distribution directory on -// distribution site, may differ from the distribution name. If it does, then -// specify arcName. -// // The base name of the binary inside the archive may differ from the base // archive name. If it does, then specify binName. For example, the following // is needed because the archive "go-ipfs_v0.7.0_linux-amd64.tar.gz" contains a // binary named "ipfs" // -// FetchBinary(ctx, "go-ipfs", "v0.7.0", "go-ipfs", "ipfs", tmpDir) +// FetchBinary(ctx, fetcher, "go-ipfs", "v0.7.0", "ipfs", tmpDir) // // If out is a directory, then the binary is written to that directory with the // same name it has inside the archive. Otherwise, the binary file is written // to the file named by out. -func FetchBinary(ctx context.Context, dist, ver, arcName, binName, out string) (string, error) { - // If archive base name not specified, then it is same as dist. - if arcName == "" { - arcName = dist - } +func FetchBinary(ctx context.Context, fetcher Fetcher, dist, ver, binName, out string) (string, error) { + // The archive file name is the base of dist to support possible subdir in + // dist, for example: "ipfs-repo-migrations/ipfs-11-to-12" + arcName := path.Base(dist) // If binary base name is not specified, then it is same as archive base name. if binName == "" { binName = arcName @@ -101,6 +54,18 @@ func FetchBinary(ctx context.Context, dist, ver, arcName, binName, out string) ( } // out exists and is a directory, so compose final name out = path.Join(out, binName) + // Check if the binary already exists in the directory + fi, err = os.Stat(out) + if !os.IsNotExist(err) { + if err != nil { + return "", err + } + return "", &os.PathError{ + Op: "FetchBinary", + Path: out, + Err: os.ErrExist, + } + } } // Create temp directory to store download @@ -115,11 +80,10 @@ func FetchBinary(ctx context.Context, dist, ver, arcName, binName, out string) ( atype = "zip" } - arcName = makeArchiveName(arcName, ver, atype) - arcIpfsPath := makeIpfsPath(dist, ver, arcName) + arcDistPath, arcFullName := makeArchivePath(dist, arcName, ver, atype) // Create a file to write the archive data to - arcPath := path.Join(tmpDir, arcName) + arcPath := path.Join(tmpDir, arcFullName) arcFile, err := os.Create(arcPath) if err != nil { return "", err @@ -127,7 +91,7 @@ func FetchBinary(ctx context.Context, dist, ver, arcName, binName, out string) ( defer arcFile.Close() // Open connection to download archive from ipfs path - rc, err := fetch(ctx, arcIpfsPath) + rc, err := fetcher.Fetch(ctx, arcDistPath) if err != nil { return "", err } @@ -155,73 +119,6 @@ func FetchBinary(ctx context.Context, dist, ver, arcName, binName, out string) ( return out, nil } -// fetch attempts to fetch the file at the given ipfs path, first using the -// local ipfs api if available, then using http. Returns io.ReadCloser on -// success, which caller must close. -func fetch(ctx context.Context, ipfsPath string) (io.ReadCloser, error) { - // Try fetching via ipfs daemon - rc, err := ipfsFetch(ctx, ipfsPath) - if err == nil { - // Transferred using local ipfs daemon - return rc, nil - } - // Try fetching via HTTP - return httpFetch(ctx, gatewayURL+ipfsPath) -} - -// ipfsFetch attempts to fetch the file at the given ipfs path using the local -// ipfs api. Returns io.ReadCloser on success, which caller must close. -func ipfsFetch(ctx context.Context, ipfsPath string) (io.ReadCloser, error) { - sh, _, err := ApiShell("") - if err != nil { - return nil, err - } - - resp, err := sh.Request("cat", ipfsPath).Send(ctx) - if err != nil { - return nil, err - } - if resp.Error != nil { - return nil, resp.Error - } - - return newLimitReadCloser(resp.Output, fetchSizeLimit), nil -} - -// httpFetch attempts to fetch the file at the given URL. Returns -// io.ReadCloser on success, which caller must close. -func httpFetch(ctx context.Context, url string) (io.ReadCloser, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) - if err != nil { - return nil, fmt.Errorf("http.NewRequest error: %s", err) - } - - req.Header.Set("User-Agent", "go-ipfs") - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return nil, fmt.Errorf("http.DefaultClient.Do error: %s", err) - } - - if resp.StatusCode >= 400 { - defer resp.Body.Close() - mes, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("error reading error body: %s", err) - } - return nil, fmt.Errorf("GET %s error: %s: %s", url, resp.Status, string(mes)) - } - - return newLimitReadCloser(resp.Body, fetchSizeLimit), nil -} - -func newLimitReadCloser(rc io.ReadCloser, limit int64) io.ReadCloser { - return limitReadCloser{ - Reader: io.LimitReader(rc, limit), - Closer: rc, - } -} - // osWithVariant returns the OS name with optional variant. // Currently returns either runtime.GOOS, or "linux-musl". func osWithVariant() (string, error) { @@ -255,18 +152,21 @@ func osWithVariant() (string, error) { return "linux", nil } -// makeArchiveName composes the name of a migration binary archive. +// makeArchivePath composes the path, relative to the distribution site, from which to +// download a binary. The path returned does not contain the distribution site path, +// e.g. "/ipns/dist.ipfs.io/", since that is know to the fetcher. // -// The archive name is in the format: name_version_osv-GOARCH.atype -// Example: ipfs-10-to-11_v1.8.0_darwin-amd64.tar.gz -func makeArchiveName(name, ver, atype string) string { - return fmt.Sprintf("%s_%s_%s-%s.%s", name, ver, runtime.GOOS, runtime.GOARCH, atype) -} - -// makeIpfsPath composes the name ipfs path location to download a migration -// binary from the distribution site. +// Returns the archive path and the base name. // -// The ipfs path format: distBaseCID/rootdir/version/name/archive -func makeIpfsPath(dist, ver, arcName string) string { - return fmt.Sprintf("%s/%s/%s/%s", ipfsDistPath, dist, ver, arcName) +// The ipfs path format is: distribution/version/archiveName +// - distribution is the name of a distribution, such as "go-ipfs" +// - version is the version to fetch, such as "v0.8.0-rc2" +// - archiveName is formatted as name_version_osv-GOARCH.atype, such as +// "go-ipfs_v0.8.0-rc2_linux-amd64.tar.gz" +// +// This would form the path: +// go-ipfs/v0.8.0/go-ipfs_v0.8.0_linux-amd64.tar.gz +func makeArchivePath(dist, name, ver, atype string) (string, string) { + arcName := fmt.Sprintf("%s_%s_%s-%s.%s", name, ver, runtime.GOOS, runtime.GOARCH, atype) + return fmt.Sprintf("%s/%s/%s", dist, ver, arcName), arcName } diff --git a/repo/fsrepo/migrations/fetch_test.go b/repo/fsrepo/migrations/fetch_test.go index 18bb25955..694e22e2b 100644 --- a/repo/fsrepo/migrations/fetch_test.go +++ b/repo/fsrepo/migrations/fetch_test.go @@ -3,17 +3,63 @@ package migrations import ( "bufio" "context" + "fmt" + "io" "io/ioutil" + "net/http" + "net/http/httptest" "os" "path" "strings" "testing" ) -func TestSetIpfsDistPath(t *testing.T) { +func createTestServer() *httptest.Server { + reqHandler := func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + if strings.Contains(r.URL.Path, "not-here") { + http.NotFound(w, r) + } else if strings.HasSuffix(r.URL.Path, "versions") { + fmt.Fprint(w, "v1.0.0\nv1.1.0\nv1.1.2\nv2.0.0-rc1\n2.0.0\nv2.0.1\n") + } else if strings.HasSuffix(r.URL.Path, ".tar.gz") { + createFakeArchive(r.URL.Path, false, w) + } else if strings.HasSuffix(r.URL.Path, "zip") { + createFakeArchive(r.URL.Path, true, w) + } else { + http.NotFound(w, r) + } + } + return httptest.NewServer(http.HandlerFunc(reqHandler)) +} + +func createFakeArchive(name string, archZip bool, w io.Writer) { + fileName := strings.Split(path.Base(name), "_")[0] + root := path.Base(path.Dir(path.Dir(name))) + + // Simulate fetching go-ipfs, which has "ipfs" as the name in the archive. + if fileName == "go-ipfs" { + fileName = "ipfs" + } + + var err error + if archZip { + err = writeZip(root, fileName, "FAKE DATA", w) + } else { + err = writeTarGzip(root, fileName, "FAKE DATA", w) + } + if err != nil { + panic(err) + } +} + +func TestSetDistPath(t *testing.T) { + f1 := NewHttpFetcher() + f2 := NewHttpFetcher() + mf := NewMultiFetcher(f1, f2) + os.Unsetenv(envIpfsDistPath) - SetIpfsDistPath("") - if ipfsDistPath != ipfsDist { + mf.SetDistPath(GetDistPathEnv("")) + if f1.distPath != IpnsIpfsDist { t.Error("did not set default dist path") } @@ -24,17 +70,30 @@ func TestSetIpfsDistPath(t *testing.T) { } defer func() { os.Unsetenv(envIpfsDistPath) - SetIpfsDistPath("") }() - SetIpfsDistPath("") - if ipfsDistPath != testDist { + mf.SetDistPath(GetDistPathEnv("")) + if f1.distPath != testDist { + t.Error("did not set dist path from environ") + } + if f2.distPath != testDist { + t.Error("did not set dist path from environ") + } + + mf.SetDistPath(GetDistPathEnv("ignored")) + if f1.distPath != testDist { + t.Error("did not set dist path from environ") + } + if f2.distPath != testDist { t.Error("did not set dist path from environ") } testDist = "/unit/test/dist2" - SetIpfsDistPath(testDist) - if ipfsDistPath != testDist { + mf.SetDistPath(testDist) + if f1.distPath != testDist { + t.Error("did not set dist path") + } + if f2.distPath != testDist { t.Error("did not set dist path") } } @@ -43,8 +102,12 @@ func TestHttpFetch(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - url := gatewayURL + path.Join(ipfsDistPath, distFSRM, distVersions) - rc, err := httpFetch(ctx, url) + fetcher := NewHttpFetcher() + ts := createTestServer() + defer ts.Close() + fetcher.SetGateway(ts.URL) + + rc, err := fetcher.Fetch(ctx, "/versions") if err != nil { t.Fatal(err) } @@ -60,73 +123,18 @@ func TestHttpFetch(t *testing.T) { t.Fatal("could not read versions:", err) } - if len(out) < 14 { + if len(out) < 6 { t.Fatal("do not get all expected data") } if out[0] != "v1.0.0" { t.Fatal("expected v1.0.0 as first line, got", out[0]) } - // Check bad URL - _, err = httpFetch(ctx, "") - if err == nil { - t.Fatal("expected error") - } - - // Check unreachable URL - _, err = httpFetch(ctx, "http://127.0.0.123:65510") - if err == nil || !strings.HasSuffix(err.Error(), "connection refused") { - t.Fatal("expected 'connection refused' error") - } - // Check not found - url = gatewayURL + path.Join(ipfsDistPath, distFSRM, "no_such_file") - _, err = httpFetch(ctx, url) + _, err = fetcher.Fetch(ctx, "/no_such_file") if err == nil || !strings.Contains(err.Error(), "404") { t.Fatal("expected error 404") } - -} - -func TestIpfsFetch(t *testing.T) { - _, err := ApiEndpoint("") - if err != nil { - t.Skip("skipped - local ipfs daemon not available") - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - url := path.Join(ipfsDistPath, distFSRM, distVersions) - rc, err := ipfsFetch(ctx, url) - if err != nil { - t.Fatal(err) - } - defer rc.Close() - - var out []string - scan := bufio.NewScanner(rc) - for scan.Scan() { - out = append(out, scan.Text()) - } - err = scan.Err() - if err != nil { - t.Fatal("could not read versions:", err) - } - - if len(out) < 14 { - t.Fatal("do not get all expected data") - } - if out[0] != "v1.0.0" { - t.Fatal("expected v1.0.0 as first line, got", out[0]) - } - - // Check bad URL - url = path.Join(ipfsDistPath, distFSRM, "no_such_file") - _, err = ipfsFetch(ctx, url) - if err == nil || !strings.Contains(err.Error(), "no link") { - t.Fatal("expected 'no link' error, got:", err) - } } func TestFetchBinary(t *testing.T) { @@ -139,13 +147,18 @@ func TestFetchBinary(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - vers, err := DistVersions(ctx, distFSRM, false) + fetcher := NewHttpFetcher() + ts := createTestServer() + defer ts.Close() + fetcher.SetGateway(ts.URL) + + vers, err := DistVersions(ctx, fetcher, distFSRM, false) if err != nil { t.Fatal(err) } t.Log("latest version of", distFSRM, "is", vers[len(vers)-1]) - bin, err := FetchBinary(ctx, distFSRM, vers[0], distFSRM, "", tmpDir) + bin, err := FetchBinary(ctx, fetcher, distFSRM, vers[0], "", tmpDir) if err != nil { t.Fatal(err) } @@ -157,7 +170,7 @@ func TestFetchBinary(t *testing.T) { t.Log("downloaded and unpacked", fi.Size(), "byte file:", fi.Name()) - bin, err = FetchBinary(ctx, "go-ipfs", "v0.3.5", "", "ipfs", tmpDir) + bin, err = FetchBinary(ctx, fetcher, "go-ipfs", "v0.3.5", "ipfs", tmpDir) if err != nil { t.Fatal(err) } @@ -170,11 +183,18 @@ func TestFetchBinary(t *testing.T) { t.Log("downloaded and unpacked", fi.Size(), "byte file:", fi.Name()) // Check error is destination already exists and is not directory - _, err = FetchBinary(ctx, "go-ipfs", "v0.3.5", "", "ipfs", bin) + _, err = FetchBinary(ctx, fetcher, "go-ipfs", "v0.3.5", "ipfs", bin) if !os.IsExist(err) { - t.Fatal("expected 'exists' error") + t.Fatal("expected 'exists' error, got", err) } + _, err = FetchBinary(ctx, fetcher, "go-ipfs", "v0.3.5", "ipfs", tmpDir) + if !os.IsExist(err) { + t.Error("expected 'exists' error, got:", err) + } + + os.Remove(path.Join(tmpDir, "ipfs")) + // Check error creating temp download directory err = os.Chmod(tmpDir, 0555) if err != nil { @@ -184,9 +204,9 @@ func TestFetchBinary(t *testing.T) { if err != nil { panic(err) } - _, err = FetchBinary(ctx, "go-ipfs", "v0.3.5", "", "ipfs", tmpDir) + _, err = FetchBinary(ctx, fetcher, "go-ipfs", "v0.3.5", "ipfs", tmpDir) if !os.IsPermission(err) { - t.Error("expected 'permission'error") + t.Error("expected 'permission' error, got:", err) } err = os.Setenv("TMPDIR", "/tmp") if err != nil { @@ -198,13 +218,13 @@ func TestFetchBinary(t *testing.T) { } // Check error if failure to fetch due to bad dist - _, err = FetchBinary(ctx, "no-such-dist", "v0.3.5", "", "ipfs", tmpDir) + _, err = FetchBinary(ctx, fetcher, "not-here", "v0.3.5", "ipfs", tmpDir) if err == nil || !strings.Contains(err.Error(), "Not Found") { - t.Error("expected 'Not Found' error") + t.Error("expected 'Not Found' error, got:", err) } // Check error if failure to unpack archive - _, err = FetchBinary(ctx, "go-ipfs", "v0.3.5", "", "not-such-bin", tmpDir) + _, err = FetchBinary(ctx, fetcher, "go-ipfs", "v0.3.5", "not-such-bin", tmpDir) if err == nil || err.Error() != "no binary found in archive" { t.Error("expected 'no binary found in archive' error") } diff --git a/repo/fsrepo/migrations/fetcher.go b/repo/fsrepo/migrations/fetcher.go new file mode 100644 index 000000000..6338b75fc --- /dev/null +++ b/repo/fsrepo/migrations/fetcher.go @@ -0,0 +1,96 @@ +package migrations + +import ( + "context" + "io" + "os" + "strings" +) + +const ( + // Current dirstibution to fetch migrations from + CurrentIpfsDist = "/ipfs/Qme8pJhBidEUXRdpcWLGR2fkG5kdwVnaMh3kabjfP8zz7Y" + // Distribution IPNS path. Default for fetchers. + IpnsIpfsDist = "/ipns/dist.ipfs.io" + + // Distribution environ variable + envIpfsDistPath = "IPFS_DIST_PATH" +) + +type Fetcher interface { + // Fetch attempts to fetch the file at the given ipfs path. + // Returns io.ReadCloser on success, which caller must close. + Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) + // SetDistPath sets the path to the distribution site for a Fetcher + SetDistPath(distPath string) +} + +// MultiFetcher holds multiple Fetchers and provides a Fetch that tries each +// until one succeeds. +type MultiFetcher struct { + fetchers []Fetcher +} + +type limitReadCloser struct { + io.Reader + io.Closer +} + +// NewMultiFetcher creates a MultiFetcher with the given Fetchers. The +// Fetchers are tried in order ther passed to this function. +func NewMultiFetcher(f ...Fetcher) Fetcher { + mf := &MultiFetcher{ + fetchers: make([]Fetcher, len(f)), + } + copy(mf.fetchers, f) + return mf +} + +// Fetch attempts to fetch the file at each of its fetchers until one succeeds. +// Returns io.ReadCloser on success, which caller must close. +func (f *MultiFetcher) Fetch(ctx context.Context, ipfsPath string) (rc io.ReadCloser, err error) { + for _, fetcher := range f.fetchers { + rc, err = fetcher.Fetch(ctx, ipfsPath) + if err == nil { + // Transferred using this fetcher + return + } + } + return +} + +// SetDistPath sets the path to the distribution site for all fetchers +func (f *MultiFetcher) SetDistPath(distPath string) { + if !strings.HasPrefix(distPath, "/") { + distPath = "/" + distPath + } + for _, fetcher := range f.fetchers { + fetcher.SetDistPath(distPath) + } +} + +// NewLimitReadCloser returns a new io.ReadCloser with the reader wrappen in a +// io.LimitedReader limited to reading the amount specified. +func NewLimitReadCloser(rc io.ReadCloser, limit int64) io.ReadCloser { + return limitReadCloser{ + Reader: io.LimitReader(rc, limit), + Closer: rc, + } +} + +// GetDistPathEnv returns the IPFS path to the distribution site, using +// the value of environ variable specified by envIpfsDistPath. If the environ +// variable is not set, then returns the provided distPath, and if that is not set +// then returns the IPNS path. +// +// To get the IPFS path of the latest distribution, if not overriddin by the +// environ variable: GetDistPathEnv(CurrentIpfsDist) +func GetDistPathEnv(distPath string) string { + if dist := os.Getenv(envIpfsDistPath); dist != "" { + return dist + } + if distPath == "" { + return IpnsIpfsDist + } + return distPath +} diff --git a/repo/fsrepo/migrations/httpfetcher.go b/repo/fsrepo/migrations/httpfetcher.go new file mode 100644 index 000000000..723747025 --- /dev/null +++ b/repo/fsrepo/migrations/httpfetcher.go @@ -0,0 +1,91 @@ +package migrations + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "path" + "strings" +) + +const ( + defaultGatewayURL = "https://ipfs.io" + defaultFetchLimit = 1024 * 1024 * 512 +) + +// HttpFetcher fetches files over HTTP +type HttpFetcher struct { + gateway string + distPath string + limit int64 +} + +var _ Fetcher = (*HttpFetcher)(nil) + +// NewHttpFetcher creates a new HttpFetcher +func NewHttpFetcher() *HttpFetcher { + return &HttpFetcher{ + gateway: defaultGatewayURL, + distPath: IpnsIpfsDist, + limit: defaultFetchLimit, + } +} + +// SetGateway sets the gateway URL +func (f *HttpFetcher) SetGateway(gatewayURL string) error { + gwURL, err := url.Parse(gatewayURL) + if err != nil { + return err + } + f.gateway = gwURL.String() + return nil +} + +// SetDistPath sets the path to the distribution site. +func (f *HttpFetcher) SetDistPath(distPath string) { + if !strings.HasPrefix(distPath, "/") { + distPath = "/" + distPath + } + f.distPath = distPath +} + +// SetFetchLimit sets the download size limit. A value of 0 means no limit. +func (f *HttpFetcher) SetFetchLimit(limit int64) { + f.limit = limit +} + +// Fetch attempts to fetch the file at the given path, from the distribution +// site configured for this HttpFetcher. Returns io.ReadCloser on success, +// which caller must close. +func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) { + gwURL := f.gateway + path.Join(f.distPath, filePath) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, gwURL, nil) + if err != nil { + return nil, fmt.Errorf("http.NewRequest error: %s", err) + } + + req.Header.Set("User-Agent", "go-ipfs") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("http.DefaultClient.Do error: %s", err) + } + + if resp.StatusCode >= 400 { + defer resp.Body.Close() + mes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("error reading error body: %s", err) + } + return nil, fmt.Errorf("GET %s error: %s: %s", gwURL, resp.Status, string(mes)) + } + + if f.limit != 0 { + return NewLimitReadCloser(resp.Body, f.limit), nil + } + return resp.Body, nil +} diff --git a/repo/fsrepo/migrations/ipfsdir.go b/repo/fsrepo/migrations/ipfsdir.go index bcc3853b1..cbc407ca2 100644 --- a/repo/fsrepo/migrations/ipfsdir.go +++ b/repo/fsrepo/migrations/ipfsdir.go @@ -10,7 +10,6 @@ import ( "strings" "time" - api "github.com/ipfs/go-ipfs-api" "github.com/mitchellh/go-homedir" ) @@ -52,23 +51,6 @@ func ApiEndpoint(ipfsDir string) (string, error) { return parts[2] + ":" + parts[4], nil } -// ApiShell creates a new ipfs api shell and checks that it is up. If the shell -// is available, then the shell and ipfs version are returned. -func ApiShell(ipfsDir string) (*api.Shell, string, error) { - apiEp, err := ApiEndpoint("") - if err != nil { - return nil, "", err - } - sh := api.NewShell(apiEp) - sh.SetTimeout(shellUpTimeout) - ver, _, err := sh.Version() - if err != nil { - return nil, "", errors.New("ipfs api shell not up") - } - sh.SetTimeout(0) - return sh, ver, nil -} - // IpfsDir returns the path of the ipfs directory. If dir specified, then // returns the expanded version dir. If dir is "", then return the directory // set by IPFS_PATH, or if IPFS_PATH is not set, then return the default diff --git a/repo/fsrepo/migrations/ipfsdir_test.go b/repo/fsrepo/migrations/ipfsdir_test.go index 59c14a164..a1003bc7d 100644 --- a/repo/fsrepo/migrations/ipfsdir_test.go +++ b/repo/fsrepo/migrations/ipfsdir_test.go @@ -212,11 +212,4 @@ func TestApiEndpoint(t *testing.T) { if val2 != val { t.Fatal("expected", val, "got", val2) } - - _, _, err = ApiShell(fakeIpfs) - if err != nil { - if err.Error() != "ipfs api shell not up" { - t.Fatal("expected 'ipfs api shell not up' error") - } - } } diff --git a/repo/fsrepo/migrations/migrations.go b/repo/fsrepo/migrations/migrations.go index 2fd7c7eca..1833dff30 100644 --- a/repo/fsrepo/migrations/migrations.go +++ b/repo/fsrepo/migrations/migrations.go @@ -21,7 +21,7 @@ const ( // RunMigration finds, downloads, and runs the individual migrations needed to // migrate the repo from its current version to the target version. -func RunMigration(ctx context.Context, targetVer int, ipfsDir string) error { +func RunMigration(ctx context.Context, fetcher Fetcher, targetVer int, ipfsDir string, allowDowngrade bool) error { ipfsDir, err := CheckIpfsDir(ipfsDir) if err != nil { return err @@ -34,6 +34,9 @@ func RunMigration(ctx context.Context, targetVer int, ipfsDir string) error { // repo already at target version number return nil } + if fromVer > targetVer && !allowDowngrade { + return fmt.Errorf("downgrade not allowed from %d to %d", fromVer, targetVer) + } log.Print("Looking for suitable migration binaries.") @@ -59,7 +62,7 @@ func RunMigration(ctx context.Context, targetVer int, ipfsDir string) error { } defer os.RemoveAll(tmpDir) - fetched, err := fetchMigrations(ctx, missing, tmpDir) + fetched, err := fetchMigrations(ctx, fetcher, missing, tmpDir) if err != nil { log.Print("Failed to download migrations.") return err @@ -156,7 +159,7 @@ func runMigration(ctx context.Context, binPath, ipfsDir string, revert bool) err // fetchMigrations downloads the requested migrations, and returns a slice with // the paths of each binary, in the same order specified by needed. -func fetchMigrations(ctx context.Context, needed []string, destDir string) ([]string, error) { +func fetchMigrations(ctx context.Context, fetcher Fetcher, needed []string, destDir string) ([]string, error) { osv, err := osWithVariant() if err != nil { return nil, err @@ -173,13 +176,13 @@ func fetchMigrations(ctx context.Context, needed []string, destDir string) ([]st log.Printf("Downloading migration: %s...", name) go func(i int, name string) { defer wg.Done() - distDir := path.Join(distMigsRoot, name) - ver, err := LatestDistVersion(ctx, distDir) + dist := path.Join(distMigsRoot, name) + ver, err := LatestDistVersion(ctx, fetcher, dist, false) if err != nil { log.Printf("could not get latest version of migration %s: %s", name, err) return } - loc, err := FetchBinary(ctx, distDir, ver, name, name, destDir) + loc, err := FetchBinary(ctx, fetcher, dist, ver, name, destDir) if err != nil { log.Printf("could not download %s: %s", name, err) return diff --git a/repo/fsrepo/migrations/migrations_test.go b/repo/fsrepo/migrations/migrations_test.go index d86ddbb93..2293d775a 100644 --- a/repo/fsrepo/migrations/migrations_test.go +++ b/repo/fsrepo/migrations/migrations_test.go @@ -3,15 +3,12 @@ package migrations import ( "context" "io/ioutil" - "net/http" "os" "path" "strings" "testing" ) -const testIPFSDistPath = "/ipfs/Qme8pJhBidEUXRdpcWLGR2fkG5kdwVnaMh3kabjfP8zz7Y" - func TestFindMigrations(t *testing.T) { tmpDir, err := ioutil.TempDir("", "migratetest") if err != nil { @@ -118,14 +115,11 @@ func TestFetchMigrations(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - SetIpfsDistPath(testIPFSDistPath) - _, err := LatestDistVersion(ctx, "ipfs-1-to-2") - if err != nil { - if strings.Contains(err.Error(), http.StatusText(http.StatusNotFound)) { - t.Skip("skip - migrations not yet available on distribution site") - } - t.Fatal(err) - } + fetcher := NewHttpFetcher() + fetcher.SetDistPath(CurrentIpfsDist) + ts := createTestServer() + defer ts.Close() + fetcher.SetGateway(ts.URL) tmpDir, err := ioutil.TempDir("", "migratetest") if err != nil { @@ -134,7 +128,7 @@ func TestFetchMigrations(t *testing.T) { defer os.RemoveAll(tmpDir) needed := []string{"ipfs-1-to-2", "ipfs-2-to-3"} - fetched, err := fetchMigrations(ctx, needed, tmpDir) + fetched, err := fetchMigrations(ctx, fetcher, needed, tmpDir) if err != nil { t.Fatal(err) } @@ -147,6 +141,52 @@ func TestFetchMigrations(t *testing.T) { } } +func TestRunMigrations(t *testing.T) { + var err error + fakeHome, err = ioutil.TempDir("", "testhome") + if err != nil { + panic(err) + } + defer os.RemoveAll(fakeHome) + + os.Setenv("HOME", fakeHome) + fakeIpfs := path.Join(fakeHome, ".ipfs") + + err = os.Mkdir(fakeIpfs, os.ModePerm) + if err != nil { + panic(err) + } + + testVer := 11 + err = WriteRepoVersion(fakeIpfs, testVer) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fetcher := NewHttpFetcher() + fetcher.SetDistPath(CurrentIpfsDist) + ts := createTestServer() + defer ts.Close() + fetcher.SetGateway(ts.URL) + + targetVer := 9 + + err = RunMigration(ctx, fetcher, targetVer, fakeIpfs, false) + if err == nil || !strings.HasPrefix(err.Error(), "downgrade not allowed") { + t.Fatal("expected 'downgrade not alloed' error") + } + + err = RunMigration(ctx, fetcher, targetVer, fakeIpfs, true) + if err != nil { + if !strings.HasPrefix(err.Error(), "migration ipfs-10-to-11 failed") { + t.Fatal(err) + } + } +} + func createFakeBin(from, to int, tmpDir string) { migPath := path.Join(tmpDir, ExeName(migrationName(from, to))) emptyFile, err := os.Create(migPath) diff --git a/repo/fsrepo/migrations/unpack.go b/repo/fsrepo/migrations/unpack.go index f08a19885..485d983cf 100644 --- a/repo/fsrepo/migrations/unpack.go +++ b/repo/fsrepo/migrations/unpack.go @@ -30,13 +30,13 @@ func unpackArchive(arcPath, atype, root, name, out string) error { func unpackTgz(arcPath, root, name, out string) error { fi, err := os.Open(arcPath) if err != nil { - return fmt.Errorf("cannot open archive file: %s", err) + return fmt.Errorf("cannot open archive file: %w", err) } defer fi.Close() gzr, err := gzip.NewReader(fi) if err != nil { - return fmt.Errorf("error opening gzip reader: %s", err) + return fmt.Errorf("error opening gzip reader: %w", err) } defer gzr.Close() @@ -50,7 +50,7 @@ func unpackTgz(arcPath, root, name, out string) error { if err == io.EOF { break } - return fmt.Errorf("cannot read archive: %s", err) + return fmt.Errorf("cannot read archive: %w", err) } if th.Name == lookFor { @@ -69,7 +69,7 @@ func unpackTgz(arcPath, root, name, out string) error { func unpackZip(arcPath, root, name, out string) error { zipr, err := zip.OpenReader(arcPath) if err != nil { - return fmt.Errorf("error opening zip reader: %s", err) + return fmt.Errorf("error opening zip reader: %w", err) } defer zipr.Close() @@ -79,7 +79,7 @@ func unpackZip(arcPath, root, name, out string) error { if fis.Name == lookFor { rc, err := fis.Open() if err != nil { - return fmt.Errorf("error extracting binary from archive: %s", err) + return fmt.Errorf("error extracting binary from archive: %w", err) } bin = rc @@ -97,7 +97,7 @@ func unpackZip(arcPath, root, name, out string) error { func writeToPath(rc io.Reader, out string) error { binfi, err := os.Create(out) if err != nil { - return fmt.Errorf("error opening tmp bin path '%s': %s", out, err) + return fmt.Errorf("error creating output file '%s': %w", out, err) } defer binfi.Close() diff --git a/repo/fsrepo/migrations/unpack_test.go b/repo/fsrepo/migrations/unpack_test.go index cc3ec96a6..e772f00b1 100644 --- a/repo/fsrepo/migrations/unpack_test.go +++ b/repo/fsrepo/migrations/unpack_test.go @@ -5,6 +5,7 @@ import ( "archive/zip" "bufio" "compress/gzip" + "io" "io/ioutil" "os" "path" @@ -49,7 +50,7 @@ func TestUnpackTgz(t *testing.T) { testTarGzip := path.Join(tmpDir, "test.tar.gz") testData := "some data" - err = writeTarGzip(testTarGzip, "testroot", "testfile", testData) + err = writeTarGzipFile(testTarGzip, "testroot", "testfile", testData) if err != nil { panic(err) } @@ -97,7 +98,7 @@ func TestUnpackZip(t *testing.T) { testZip := path.Join(tmpDir, "test.zip") testData := "some data" - err = writeZip(testZip, "testroot", "testfile", testData) + err = writeZipFile(testZip, "testroot", "testfile", testData) if err != nil { panic(err) } @@ -125,21 +126,38 @@ func TestUnpackZip(t *testing.T) { } } -func writeTarGzip(archName, root, fileName, data string) error { +func writeTarGzipFile(archName, root, fileName, data string) error { archFile, err := os.Create(archName) if err != nil { return err } defer archFile.Close() - wr := bufio.NewWriter(archFile) + w := bufio.NewWriter(archFile) + err = writeTarGzip(root, fileName, data, w) + if err != nil { + return err + } + // Flush buffered data to file + if err = w.Flush(); err != nil { + return err + } + // Close tar file + if err = archFile.Close(); err != nil { + return err + } + return nil +} + +func writeTarGzip(root, fileName, data string, w io.Writer) error { // gzip writer writes to buffer - gzw := gzip.NewWriter(wr) + gzw := gzip.NewWriter(w) defer gzw.Close() // tar writer writes to gzip tw := tar.NewWriter(gzw) defer tw.Close() + var err error if fileName != "" { hdr := &tar.Header{ Name: path.Join(root, fileName), @@ -163,26 +181,34 @@ func writeTarGzip(archName, root, fileName, data string) error { if err = gzw.Close(); err != nil { return err } - // Flush buffered data to file - if err = wr.Flush(); err != nil { + return nil +} + +func writeZipFile(archName, root, fileName, data string) error { + archFile, err := os.Create(archName) + if err != nil { return err } - // Close tar file + defer archFile.Close() + w := bufio.NewWriter(archFile) + + err = writeZip(root, fileName, data, w) + if err != nil { + return err + } + // Flush buffered data to file + if err = w.Flush(); err != nil { + return err + } + // Close zip file if err = archFile.Close(); err != nil { return err } return nil } -func writeZip(archName, root, fileName, data string) error { - archFile, err := os.Create(archName) - if err != nil { - return err - } - defer archFile.Close() - wr := bufio.NewWriter(archFile) - - zw := zip.NewWriter(wr) +func writeZip(root, fileName, data string, w io.Writer) error { + zw := zip.NewWriter(w) defer zw.Close() // Write file name @@ -200,13 +226,5 @@ func writeZip(archName, root, fileName, data string) error { if err = zw.Close(); err != nil { return err } - // Flush buffered data to file - if err = wr.Flush(); err != nil { - return err - } - // Close zip file - if err = archFile.Close(); err != nil { - return err - } return nil } diff --git a/repo/fsrepo/migrations/versions.go b/repo/fsrepo/migrations/versions.go index fd45f2180..7d9fda835 100644 --- a/repo/fsrepo/migrations/versions.go +++ b/repo/fsrepo/migrations/versions.go @@ -18,17 +18,21 @@ const distVersions = "versions" // LatestDistVersion returns the latest version, of the specified distribution, // that is available on the distribution site. -func LatestDistVersion(ctx context.Context, dist string) (string, error) { - vs, err := DistVersions(ctx, dist, false) +func LatestDistVersion(ctx context.Context, fetcher Fetcher, dist string, stableOnly bool) (string, error) { + vs, err := DistVersions(ctx, fetcher, dist, false) if err != nil { return "", err } for i := len(vs) - 1; i >= 0; i-- { ver := vs[i] - if !strings.Contains(ver, "-dev") { - return ver, nil + if stableOnly && strings.Contains(ver, "-rc") { + continue } + if strings.Contains(ver, "-dev") { + continue + } + return ver, nil } return "", errors.New("could not find a non dev version") } @@ -36,8 +40,8 @@ func LatestDistVersion(ctx context.Context, dist string) (string, error) { // DistVersions returns all versions of the specified distribution, that are // available on the distriburion site. List is in ascending order, unless // sortDesc is true. -func DistVersions(ctx context.Context, dist string, sortDesc bool) ([]string, error) { - rc, err := fetch(ctx, path.Join(ipfsDistPath, dist, distVersions)) +func DistVersions(ctx context.Context, fetcher Fetcher, dist string, sortDesc bool) ([]string, error) { + rc, err := fetcher.Fetch(ctx, path.Join(dist, distVersions)) if err != nil { return nil, err } diff --git a/repo/fsrepo/migrations/versions_test.go b/repo/fsrepo/migrations/versions_test.go index a97254049..af1b11505 100644 --- a/repo/fsrepo/migrations/versions_test.go +++ b/repo/fsrepo/migrations/versions_test.go @@ -14,7 +14,12 @@ func TestDistVersions(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - vers, err := DistVersions(ctx, testDist, true) + fetcher := NewHttpFetcher() + ts := createTestServer() + defer ts.Close() + fetcher.SetGateway(ts.URL) + + vers, err := DistVersions(ctx, fetcher, testDist, true) if err != nil { t.Fatal(err) } @@ -29,7 +34,12 @@ func TestLatestDistVersion(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - latest, err := LatestDistVersion(ctx, testDist) + fetcher := NewHttpFetcher() + //ts := createTestServer() + //defer ts.Close() + //fetcher.SetGateway(ts.URL) + + latest, err := LatestDistVersion(ctx, fetcher, testDist, false) if err != nil { t.Fatal(err) }