feat: refactor Fetcher interface used for downloading migrations (#8728)

* feat: refactor Fetcher interface used for downloading migrations
* feat: add RetryFetcher for migration downloads
* feat: 3 retries for each HTTP migration download

(cherry picked from commit b1ffc870d5)
This commit is contained in:
Adin Schmahmann 2022-02-11 14:57:51 -05:00
parent 418bc6e709
commit bf76ebe667
10 changed files with 83 additions and 44 deletions

View File

@ -111,15 +111,14 @@ func FetchBinary(ctx context.Context, fetcher Fetcher, dist, ver, binName, out s
}
defer arcFile.Close()
// Open connection to download archive from ipfs path
rc, err := fetcher.Fetch(ctx, arcDistPath)
// Open connection to download archive from ipfs path and write to file
arcBytes, err := fetcher.Fetch(ctx, arcDistPath)
if err != nil {
return "", err
}
defer rc.Close()
// Write download data
_, err = io.Copy(arcFile, rc)
_, err = io.Copy(arcFile, bytes.NewReader(arcBytes))
if err != nil {
return "", err
}

View File

@ -2,10 +2,10 @@ package migrations
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
@ -96,14 +96,13 @@ func TestHttpFetch(t *testing.T) {
fetcher := NewHttpFetcher("", ts.URL, "", 0)
rc, err := fetcher.Fetch(ctx, "/versions")
out, err := fetcher.Fetch(ctx, "/versions")
if err != nil {
t.Fatal(err)
}
defer rc.Close()
var lines []string
scan := bufio.NewScanner(rc)
scan := bufio.NewScanner(bytes.NewReader(out))
for scan.Scan() {
lines = append(lines, scan.Text())
}
@ -232,16 +231,11 @@ func TestMultiFetcher(t *testing.T) {
mf := NewMultiFetcher(badFetcher, fetcher)
rc, err := mf.Fetch(ctx, "/versions")
vers, err := mf.Fetch(ctx, "/versions")
if err != nil {
t.Fatal(err)
}
defer rc.Close()
vers, err := ioutil.ReadAll(rc)
if err != nil {
t.Fatal("could not read versions:", err)
}
if len(vers) < 45 {
fmt.Println("unexpected more data")
}

View File

@ -21,8 +21,7 @@ const (
type Fetcher interface {
// Fetch attempts to fetch the file at the given ipfs path.
// Returns io.ReadCloser on success, which caller must close.
Fetch(ctx context.Context, filePath string) (io.ReadCloser, error)
Fetch(ctx context.Context, filePath string) ([]byte, error)
// Close performs any cleanup after the fetcher is not longer needed.
Close() error
}
@ -50,13 +49,12 @@ func NewMultiFetcher(f ...Fetcher) *MultiFetcher {
}
// Fetch attempts to fetch the file at each of its fetchers until one succeeds.
// Returns io.ReadCloser on success, which caller must close.
func (f *MultiFetcher) Fetch(ctx context.Context, ipfsPath string) (io.ReadCloser, error) {
func (f *MultiFetcher) Fetch(ctx context.Context, ipfsPath string) ([]byte, error) {
var errs error
for _, fetcher := range f.fetchers {
rc, err := fetcher.Fetch(ctx, ipfsPath)
out, err := fetcher.Fetch(ctx, ipfsPath)
if err == nil {
return rc, nil
return out, nil
}
fmt.Printf("Error fetching: %s\n", err.Error())
errs = multierror.Append(errs, err)

View File

@ -60,9 +60,8 @@ func NewHttpFetcher(distPath, gateway, userAgent string, fetchLimit int64) *Http
}
// Fetch attempts to fetch the file at the given path, from the distribution
// site configured for this HttpFetcher. Returns io.ReadCloser on success,
// which caller must close.
func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) {
// site configured for this HttpFetcher.
func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) {
gwURL := f.gateway + path.Join(f.distPath, filePath)
fmt.Printf("Fetching with HTTP: %q\n", gwURL)
@ -89,10 +88,15 @@ func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser
return nil, fmt.Errorf("GET %s error: %s: %s", gwURL, resp.Status, string(mes))
}
var rc io.ReadCloser
if f.limit != 0 {
return NewLimitReadCloser(resp.Body, f.limit), nil
rc = NewLimitReadCloser(resp.Body, f.limit)
} else {
rc = resp.Body
}
return resp.Body, nil
defer rc.Close()
return ioutil.ReadAll(rc)
}
func (f *HttpFetcher) Close() error {

View File

@ -52,6 +52,8 @@ type IpfsFetcher struct {
addrInfo peer.AddrInfo
}
var _ migrations.Fetcher = (*IpfsFetcher)(nil)
// NewIpfsFetcher creates a new IpfsFetcher
//
// Specifying "" for distPath sets the default IPNS path.
@ -85,9 +87,8 @@ func NewIpfsFetcher(distPath string, fetchLimit int64, repoRoot *string) *IpfsFe
}
// Fetch attempts to fetch the file at the given path, from the distribution
// site configured for this HttpFetcher. Returns io.ReadCloser on success,
// which caller must close.
func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) {
// site configured for this HttpFetcher.
func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) {
// Initialize and start IPFS node on first call to Fetch, since the fetcher
// may be created by not used.
f.openOnce.Do(func() {
@ -123,10 +124,15 @@ func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser
return nil, fmt.Errorf("%q is not a file", filePath)
}
var rc io.ReadCloser
if f.limit != 0 {
return migrations.NewLimitReadCloser(fileNode, f.limit), nil
rc = migrations.NewLimitReadCloser(fileNode, f.limit)
} else {
rc = fileNode
}
return fileNode, nil
defer rc.Close()
return ioutil.ReadAll(rc)
}
func (f *IpfsFetcher) Close() error {

View File

@ -2,6 +2,7 @@ package ipfsfetcher
import (
"bufio"
"bytes"
"context"
"fmt"
"os"
@ -28,14 +29,13 @@ func TestIpfsFetcher(t *testing.T) {
fetcher := NewIpfsFetcher("", 0, nil)
defer fetcher.Close()
rc, err := fetcher.Fetch(ctx, "go-ipfs/versions")
out, err := fetcher.Fetch(ctx, "go-ipfs/versions")
if err != nil {
t.Fatal(err)
}
defer rc.Close()
var lines []string
scan := bufio.NewScanner(rc)
scan := bufio.NewScanner(bytes.NewReader(out))
for scan.Scan() {
lines = append(lines, scan.Text())
}
@ -52,8 +52,7 @@ func TestIpfsFetcher(t *testing.T) {
}
// Check not found
_, err = fetcher.Fetch(ctx, "/no_such_file")
if err == nil {
if _, err = fetcher.Fetch(ctx, "/no_such_file"); err == nil {
t.Fatal("expected error 404")
}

View File

@ -155,13 +155,14 @@ func ReadMigrationConfig(repoRoot string) (*config.Migration, error) {
// downloadSources,
func GetMigrationFetcher(downloadSources []string, distPath string, newIpfsFetcher func(string) Fetcher) (Fetcher, error) {
const httpUserAgent = "go-ipfs"
const numTriesPerHTTP = 3
var fetchers []Fetcher
for _, src := range downloadSources {
src := strings.TrimSpace(src)
switch src {
case "HTTPS", "https", "HTTP", "http":
fetchers = append(fetchers, NewHttpFetcher(distPath, "", httpUserAgent, 0))
fetchers = append(fetchers, &RetryFetcher{NewHttpFetcher(distPath, "", httpUserAgent, 0), numTriesPerHTTP})
case "IPFS", "ipfs":
if newIpfsFetcher != nil {
fetchers = append(fetchers, newIpfsFetcher(distPath))
@ -178,7 +179,7 @@ func GetMigrationFetcher(downloadSources []string, distPath string, newIpfsFetch
default:
return nil, errors.New("bad gateway address: url scheme must be http or https")
}
fetchers = append(fetchers, NewHttpFetcher(distPath, u.String(), httpUserAgent, 0))
fetchers = append(fetchers, &RetryFetcher{NewHttpFetcher(distPath, u.String(), httpUserAgent, 0), numTriesPerHTTP})
case "":
// Ignore empty string
}

View File

@ -3,7 +3,6 @@ package migrations
import (
"context"
"fmt"
"io"
"log"
"os"
"path/filepath"
@ -290,7 +289,9 @@ func TestReadMigrationConfig(t *testing.T) {
type mockIpfsFetcher struct{}
func (m *mockIpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) {
var _ Fetcher = (*mockIpfsFetcher)(nil)
func (m *mockIpfsFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) {
return nil, nil
}
@ -323,7 +324,9 @@ func TestGetMigrationFetcher(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if _, ok := f.(*HttpFetcher); !ok {
if rf, ok := f.(*RetryFetcher); !ok {
t.Fatal("expected RetryFetcher")
} else if _, ok := rf.Fetcher.(*HttpFetcher); !ok {
t.Fatal("expected HttpFetcher")
}
@ -341,7 +344,9 @@ func TestGetMigrationFetcher(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if _, ok := f.(*HttpFetcher); !ok {
if rf, ok := f.(*RetryFetcher); !ok {
t.Fatal("expected RetryFetcher")
} else if _, ok := rf.Fetcher.(*HttpFetcher); !ok {
t.Fatal("expected HttpFetcher")
}

View File

@ -0,0 +1,33 @@
package migrations
import (
"context"
"fmt"
)
type RetryFetcher struct {
Fetcher
MaxTries int
}
var _ Fetcher = (*RetryFetcher)(nil)
func (r *RetryFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) {
var lastErr error
for i := 0; i < r.MaxTries; i++ {
out, err := r.Fetcher.Fetch(ctx, filePath)
if err == nil {
return out, nil
}
if ctx.Err() != nil {
return nil, ctx.Err()
}
lastErr = err
}
return nil, fmt.Errorf("exceeded number of retries. last error was %w", lastErr)
}
func (r *RetryFetcher) Close() error {
return r.Fetcher.Close()
}

View File

@ -2,6 +2,7 @@ package migrations
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
@ -39,16 +40,15 @@ func LatestDistVersion(ctx context.Context, fetcher Fetcher, dist string, stable
// available on the distriburion site. List is in ascending order, unless
// sortDesc is true.
func DistVersions(ctx context.Context, fetcher Fetcher, dist string, sortDesc bool) ([]string, error) {
rc, err := fetcher.Fetch(ctx, path.Join(dist, distVersions))
versionBytes, err := fetcher.Fetch(ctx, path.Join(dist, distVersions))
if err != nil {
return nil, err
}
defer rc.Close()
prefix := "v"
var vers []semver.Version
scan := bufio.NewScanner(rc)
scan := bufio.NewScanner(bytes.NewReader(versionBytes))
for scan.Scan() {
ver, err := semver.Make(strings.TrimLeft(scan.Text(), prefix))
if err != nil {