ceremonyclient/node/consensus/data/frame_importer.go
Cassandra Heart 3dd9a0c5f3
get develop caught up (#322)
* Update qcommander.sh bootrap (#304)

* v2.0.1 (#308)

* roll up v2.0.1-b2 to develop

* b2-fixed

* adjust return data of fast sync so it doesn't return the earliest frame

* -b3

* fix: announce peer based on leading frame, not initial frame; fix: looping bug

* fix: last batch fails due to underflow; qol: make logging chattier

* -b4

* resolve frame cache issue

* fix: mint loop + re-migrate

* fix: register execution panic

* fix: mint loop, other side

* fix: handle unexpected return of nil status

* final -b4

* handle subtle change to migration

* qol: add heuristic to handle corruption scenario

* bump genesis

* qol: use separate channel for worker

* final parameterization, parallelize streams

* deprecate signers 10, 11, 14, 17

* adjust signatory check size to match rotated out signers

* V2.0.2.3 (#321)

* roll up v2.0.1-b2 to develop

* b2-fixed

* adjust return data of fast sync so it doesn't return the earliest frame

* -b3

* fix: announce peer based on leading frame, not initial frame; fix: looping bug

* fix: last batch fails due to underflow; qol: make logging chattier

* -b4

* resolve frame cache issue

* fix: mint loop + re-migrate

* fix: register execution panic

* fix: mint loop, other side

* fix: handle unexpected return of nil status

* final -b4

* handle subtle change to migration

* qol: add heuristic to handle corruption scenario

* bump genesis

* qol: use separate channel for worker

* final parameterization, parallelize streams

* Add direct peers to blossomsub (#309)

Co-authored-by: Tyler Sturos <tyler.john@qcommander.sh>

* chore(docker): add ca-certificates to fix x509 error. (#307)

* Update qcommander.sh bootrap (#304)

* chore(docker): add ca-certificates to fix x509 error.

---------

Co-authored-by: Tyler Sturos <55340199+tjsturos@users.noreply.github.com>

* deprecate signers 10, 11, 14, 17

* adjust signatory check size to match rotated out signers

* qol: sync by rebroadcast

* upgrade version

* more small adjustments

* wait a little longer

* fix: don't use iterator for frame directly until iterator is fixed

* change iterator, genesis for testnet

* adjust to previous sync handling

* adjust: don't grab the very latest while it's already being broadcasted

* ok, ready for testnet

* handle rebroadcast quirks

* more adjustments from testing

* faster

* temporarily bulk process on frame candidates

* resolve separate frames

* don't loop

* make worker reset resume to check where it should continue

* move window

* reduce signature count now that supermajority signed last

* resolve bottlenecks

* remove GOMAXPROCS limit for now

* revisions for v2.0.2.1

* bump version

* bulk import

* reintroduce sync

* small adustments to make life better

* check bitmask for peers and keep alive

* adjust reconnect

* ensure peer doesn't fall off address list

* adjust blossomsub to background discovery

* bump version

* remove dev check

* remove debug log line

* further adjustments

* a little more logic around connection management

* v2.0.2.3

* Fix peer discovery (#319)

* Fix peer discovery

* Make peer discovery connections parallel

* Monitor peers via pings (#317)

* Support QUILIBRIUM_SIGNATURE_CHECK in client (#314)

* Ensure direct peers are not pruned by resource limits (#315)

* Support pprof profiling via HTTP (#313)

* Fix CPU profiling

* Add pprof server support

* Additional peering connection improvements (#320)

* Lookup peers if not enough external peers are available

* Make bootstrap peer discovery sensitive to a lack of bootstrappers

---------

Co-authored-by: Tyler Sturos <55340199+tjsturos@users.noreply.github.com>
Co-authored-by: Tyler Sturos <tyler.john@qcommander.sh>
Co-authored-by: linquanisaac <33619994+linquanisaac@users.noreply.github.com>
Co-authored-by: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com>

---------

Co-authored-by: Tyler Sturos <55340199+tjsturos@users.noreply.github.com>
Co-authored-by: Tyler Sturos <tyler.john@qcommander.sh>
Co-authored-by: linquanisaac <33619994+linquanisaac@users.noreply.github.com>
Co-authored-by: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com>
2024-10-31 16:46:58 -05:00

273 lines
5.4 KiB
Go

package data
import (
"archive/zip"
"bufio"
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"net/http"
"os"
"path"
"path/filepath"
"strings"
"time"
"github.com/pkg/errors"
"source.quilibrium.com/quilibrium/monorepo/node/config"
"source.quilibrium.com/quilibrium/monorepo/node/store"
)
func (e *DataClockConsensusEngine) downloadSnapshot(
dbPath string,
network uint8,
) error {
frame, _, err := e.clockStore.GetLatestDataClockFrame(e.filter)
if err != nil {
return errors.Wrap(err, "download snapshot")
}
if frame.Timestamp > time.Now().Add(-6*time.Hour).UnixMilli() {
return errors.Wrap(
errors.New("synced higher than recent snapshot"),
"download snapshot",
)
}
resp, err := http.Get(
fmt.Sprintf(
"https://frame-snapshots.quilibrium.com/%d/latest-backup",
network,
),
)
if err != nil {
return errors.Wrap(err, "download snapshot")
}
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
if !scanner.Scan() {
return errors.Wrap(
errors.New("metadata file is empty"),
"download snapshot",
)
}
zipURL := strings.TrimSpace(scanner.Text())
if !scanner.Scan() {
return errors.Wrap(
errors.New("metadata file missing hash"),
"download snapshot",
)
}
expectedHash := strings.TrimSpace(scanner.Text())
resp, err = http.Get(
fmt.Sprintf(
"https://frame-snapshots.quilibrium.com/%d/%s",
network,
zipURL,
),
)
if err != nil {
return errors.Wrap(err, "download snapshot")
}
defer resp.Body.Close()
err = os.MkdirAll(
path.Join(dbPath, "snapshot"),
0755,
)
if err != nil {
return errors.Wrap(
fmt.Errorf("failed to create extraction directory: %w", err),
"download snapshot",
)
}
tempFile, err := os.CreateTemp(
path.Join(dbPath, "snapshot"),
"snapshot.zip",
)
if err != nil {
return errors.Wrap(err, "download snapshot")
}
defer os.Remove(tempFile.Name())
defer tempFile.Close()
hasher := sha256.New()
writer := io.MultiWriter(tempFile, hasher)
_, err = io.Copy(writer, resp.Body)
if err != nil {
return errors.Wrap(err, "download snapshot")
}
actualHash := hex.EncodeToString(hasher.Sum(nil))
if actualHash != expectedHash {
return errors.Wrap(
fmt.Errorf(
"hash mismatch: expected %s, got %s",
expectedHash,
actualHash,
),
"download snapshot",
)
}
zipReader, err := zip.OpenReader(tempFile.Name())
if err != nil {
return fmt.Errorf("failed to open zip file: %w", err)
}
defer zipReader.Close()
for _, file := range zipReader.File {
destPath := filepath.Join(
path.Join(dbPath, "snapshot"),
file.Name,
)
if !strings.HasPrefix(
destPath,
filepath.Clean(path.Join(dbPath, "snapshot"))+string(os.PathSeparator),
) {
return errors.Wrap(
fmt.Errorf("invalid file path in zip: %s", file.Name),
"download snapshot",
)
}
if file.FileInfo().IsDir() {
os.MkdirAll(destPath, file.Mode())
continue
}
err := os.MkdirAll(filepath.Dir(destPath), 0755)
if err != nil {
return errors.Wrap(
fmt.Errorf(
"failed to create directory for file %s: %w",
file.Name,
err,
),
"download snapshot",
)
}
destFile, err := os.OpenFile(
destPath,
os.O_WRONLY|os.O_CREATE|os.O_TRUNC, file.Mode(),
)
if err != nil {
return errors.Wrap(
fmt.Errorf("failed to create destination file %s: %w", file.Name, err),
"download snapshot",
)
}
srcFile, err := file.Open()
if err != nil {
destFile.Close()
return errors.Wrap(
fmt.Errorf("failed to open file in zip %s: %w", file.Name, err),
"download snapshot",
)
}
_, err = io.Copy(destFile, srcFile)
srcFile.Close()
destFile.Close()
if err != nil {
return errors.Wrap(
fmt.Errorf("failed to extract file %s: %w", file.Name, err),
"download snapshot",
)
}
}
return nil
}
func (e *DataClockConsensusEngine) applySnapshot(
dbPath string,
) error {
dirEntries, err := os.ReadDir(
path.Join(dbPath, "snapshot"),
)
if err != nil {
return errors.Wrap(
err,
"apply snapshot",
)
}
defer os.RemoveAll(path.Join(dbPath, "snapshot"))
snapshotDBPath := ""
for _, entry := range dirEntries {
if entry.IsDir() && strings.HasPrefix(entry.Name(), "exporter") {
snapshotDBPath = path.Join(path.Join(dbPath, "snapshot"), entry.Name())
}
}
if snapshotDBPath == "" {
return nil
}
temporaryStore := store.NewPebbleDB(&config.DBConfig{
Path: snapshotDBPath,
})
temporaryClockStore := store.NewPebbleClockStore(temporaryStore, e.logger)
max, _, err := e.clockStore.GetLatestDataClockFrame(e.filter)
if err != nil {
temporaryStore.Close()
return errors.Wrap(
err,
"apply snapshot",
)
}
key := []byte{store.CLOCK_FRAME, store.CLOCK_DATA_FRAME_DATA}
key = binary.BigEndian.AppendUint64(key, 0)
key = append(key, e.filter...)
_, _, err = temporaryClockStore.GetDataClockFrame(
e.filter,
max.FrameNumber+1,
false,
)
if err != nil {
fmt.Println("not found", max.FrameNumber+1)
temporaryStore.Close()
return errors.Wrap(
err,
"apply snapshot",
)
}
for i := max.FrameNumber + 1; true; i++ {
frame, _, err := temporaryClockStore.GetDataClockFrame(
e.filter,
i,
false,
)
if err != nil {
break
}
if err := e.handleClockFrame([]byte{}, []byte{}, frame); err != nil {
temporaryStore.Close()
return errors.Wrap(
err,
"apply snapshot",
)
}
}
temporaryStore.Close()
e.logger.Info("imported snapshot")
return nil
}