ceremonyclient/node/main.go
Cassandra Heart 3dd9a0c5f3
get develop caught up (#322)
* Update qcommander.sh bootrap (#304)

* v2.0.1 (#308)

* roll up v2.0.1-b2 to develop

* b2-fixed

* adjust return data of fast sync so it doesn't return the earliest frame

* -b3

* fix: announce peer based on leading frame, not initial frame; fix: looping bug

* fix: last batch fails due to underflow; qol: make logging chattier

* -b4

* resolve frame cache issue

* fix: mint loop + re-migrate

* fix: register execution panic

* fix: mint loop, other side

* fix: handle unexpected return of nil status

* final -b4

* handle subtle change to migration

* qol: add heuristic to handle corruption scenario

* bump genesis

* qol: use separate channel for worker

* final parameterization, parallelize streams

* deprecate signers 10, 11, 14, 17

* adjust signatory check size to match rotated out signers

* V2.0.2.3 (#321)

* roll up v2.0.1-b2 to develop

* b2-fixed

* adjust return data of fast sync so it doesn't return the earliest frame

* -b3

* fix: announce peer based on leading frame, not initial frame; fix: looping bug

* fix: last batch fails due to underflow; qol: make logging chattier

* -b4

* resolve frame cache issue

* fix: mint loop + re-migrate

* fix: register execution panic

* fix: mint loop, other side

* fix: handle unexpected return of nil status

* final -b4

* handle subtle change to migration

* qol: add heuristic to handle corruption scenario

* bump genesis

* qol: use separate channel for worker

* final parameterization, parallelize streams

* Add direct peers to blossomsub (#309)

Co-authored-by: Tyler Sturos <tyler.john@qcommander.sh>

* chore(docker): add ca-certificates to fix x509 error. (#307)

* Update qcommander.sh bootrap (#304)

* chore(docker): add ca-certificates to fix x509 error.

---------

Co-authored-by: Tyler Sturos <55340199+tjsturos@users.noreply.github.com>

* deprecate signers 10, 11, 14, 17

* adjust signatory check size to match rotated out signers

* qol: sync by rebroadcast

* upgrade version

* more small adjustments

* wait a little longer

* fix: don't use iterator for frame directly until iterator is fixed

* change iterator, genesis for testnet

* adjust to previous sync handling

* adjust: don't grab the very latest while it's already being broadcasted

* ok, ready for testnet

* handle rebroadcast quirks

* more adjustments from testing

* faster

* temporarily bulk process on frame candidates

* resolve separate frames

* don't loop

* make worker reset resume to check where it should continue

* move window

* reduce signature count now that supermajority signed last

* resolve bottlenecks

* remove GOMAXPROCS limit for now

* revisions for v2.0.2.1

* bump version

* bulk import

* reintroduce sync

* small adustments to make life better

* check bitmask for peers and keep alive

* adjust reconnect

* ensure peer doesn't fall off address list

* adjust blossomsub to background discovery

* bump version

* remove dev check

* remove debug log line

* further adjustments

* a little more logic around connection management

* v2.0.2.3

* Fix peer discovery (#319)

* Fix peer discovery

* Make peer discovery connections parallel

* Monitor peers via pings (#317)

* Support QUILIBRIUM_SIGNATURE_CHECK in client (#314)

* Ensure direct peers are not pruned by resource limits (#315)

* Support pprof profiling via HTTP (#313)

* Fix CPU profiling

* Add pprof server support

* Additional peering connection improvements (#320)

* Lookup peers if not enough external peers are available

* Make bootstrap peer discovery sensitive to a lack of bootstrappers

---------

Co-authored-by: Tyler Sturos <55340199+tjsturos@users.noreply.github.com>
Co-authored-by: Tyler Sturos <tyler.john@qcommander.sh>
Co-authored-by: linquanisaac <33619994+linquanisaac@users.noreply.github.com>
Co-authored-by: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com>

---------

Co-authored-by: Tyler Sturos <55340199+tjsturos@users.noreply.github.com>
Co-authored-by: Tyler Sturos <tyler.john@qcommander.sh>
Co-authored-by: linquanisaac <33619994+linquanisaac@users.noreply.github.com>
Co-authored-by: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com>
2024-10-31 16:46:58 -05:00

701 lines
15 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//go:build !js && !wasm
package main
import (
"bytes"
"encoding/binary"
"encoding/hex"
"flag"
"fmt"
"io/fs"
"log"
"math/big"
"net/http"
npprof "net/http/pprof"
"os"
"os/exec"
"os/signal"
"path/filepath"
"runtime"
rdebug "runtime/debug"
"runtime/pprof"
"strconv"
"strings"
"syscall"
"time"
"go.uber.org/zap"
"golang.org/x/crypto/sha3"
"google.golang.org/protobuf/proto"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
"source.quilibrium.com/quilibrium/monorepo/node/utils"
"github.com/cloudflare/circl/sign/ed448"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pbnjay/memory"
"github.com/pkg/errors"
"source.quilibrium.com/quilibrium/monorepo/node/app"
"source.quilibrium.com/quilibrium/monorepo/node/config"
qcrypto "source.quilibrium.com/quilibrium/monorepo/node/crypto"
"source.quilibrium.com/quilibrium/monorepo/node/crypto/kzg"
"source.quilibrium.com/quilibrium/monorepo/node/rpc"
)
var (
configDirectory = flag.String(
"config",
filepath.Join(".", ".config"),
"the configuration directory",
)
balance = flag.Bool(
"balance",
false,
"print the node's confirmed token balance to stdout and exit",
)
dbConsole = flag.Bool(
"db-console",
false,
"starts the node in database console mode",
)
importPrivKey = flag.String(
"import-priv-key",
"",
"creates a new config using a specific key from the phase one ceremony",
)
peerId = flag.Bool(
"peer-id",
false,
"print the peer id to stdout from the config and exit",
)
cpuprofile = flag.String(
"cpuprofile",
"",
"write cpu profile to file",
)
memprofile = flag.String(
"memprofile",
"",
"write memory profile after 20m to this file",
)
pprofServer = flag.String(
"pprof-server",
"",
"enable pprof server on specified address (e.g. localhost:6060)",
)
nodeInfo = flag.Bool(
"node-info",
false,
"print node related information",
)
debug = flag.Bool(
"debug",
false,
"sets log output to debug (verbose)",
)
dhtOnly = flag.Bool(
"dht-only",
false,
"sets a node to run strictly as a dht bootstrap peer (not full node)",
)
network = flag.Uint(
"network",
0,
"sets the active network for the node (mainnet = 0, primary testnet = 1)",
)
signatureCheck = flag.Bool(
"signature-check",
signatureCheckDefault(),
"enables or disables signature validation (default true or value of QUILIBRIUM_SIGNATURE_CHECK env var)",
)
core = flag.Int(
"core",
0,
"specifies the core of the process (defaults to zero, the initial launcher)",
)
parentProcess = flag.Int(
"parent-process",
0,
"specifies the parent process pid for a data worker",
)
integrityCheck = flag.Bool(
"integrity-check",
false,
"runs an integrity check on the store, helpful for confirming backups are not corrupted (defaults to false)",
)
)
func signatureCheckDefault() bool {
envVarValue, envVarExists := os.LookupEnv("QUILIBRIUM_SIGNATURE_CHECK")
if envVarExists {
def, err := strconv.ParseBool(envVarValue)
if err == nil {
return def
} else {
fmt.Println("Invalid environment variable QUILIBRIUM_SIGNATURE_CHECK, must be 'true' or 'false'. Got: " + envVarValue)
}
}
return true
}
func main() {
flag.Parse()
if *signatureCheck {
if runtime.GOOS == "windows" {
fmt.Println("Signature check not available for windows yet, skipping...")
} else {
ex, err := os.Executable()
if err != nil {
panic(err)
}
b, err := os.ReadFile(ex)
if err != nil {
fmt.Println(
"Error encountered during signature check are you running this " +
"from source? (use --signature-check=false)",
)
panic(err)
}
checksum := sha3.Sum256(b)
digest, err := os.ReadFile(ex + ".dgst")
if err != nil {
fmt.Println("Digest file not found")
os.Exit(1)
}
parts := strings.Split(string(digest), " ")
if len(parts) != 2 {
fmt.Println("Invalid digest file format")
os.Exit(1)
}
digestBytes, err := hex.DecodeString(parts[1][:64])
if err != nil {
fmt.Println("Invalid digest file format")
os.Exit(1)
}
if !bytes.Equal(checksum[:], digestBytes) {
fmt.Println("Invalid digest for node")
os.Exit(1)
}
count := 0
for i := 1; i <= len(config.Signatories); i++ {
signatureFile := fmt.Sprintf(ex+".dgst.sig.%d", i)
sig, err := os.ReadFile(signatureFile)
if err != nil {
continue
}
pubkey, _ := hex.DecodeString(config.Signatories[i-1])
if !ed448.Verify(pubkey, digest, sig, "") {
fmt.Printf("Failed signature check for signatory #%d\n", i)
os.Exit(1)
}
count++
}
if count < ((len(config.Signatories)-4)/2)+((len(config.Signatories)-4)%2) {
fmt.Printf("Quorum on signatures not met")
os.Exit(1)
}
fmt.Println("Signature check passed")
}
} else {
fmt.Println("Signature check disabled, skipping...")
}
if *memprofile != "" && *core == 0 {
go func() {
for {
time.Sleep(5 * time.Minute)
f, err := os.Create(*memprofile)
if err != nil {
log.Fatal(err)
}
pprof.WriteHeapProfile(f)
f.Close()
}
}()
}
if *cpuprofile != "" && *core == 0 {
f, err := os.Create(*cpuprofile)
if err != nil {
log.Fatal(err)
}
defer f.Close()
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
if *pprofServer != "" && *core == 0 {
go func() {
mux := http.NewServeMux()
mux.HandleFunc("/debug/pprof/", npprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", npprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", npprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", npprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", npprof.Trace)
log.Fatal(http.ListenAndServe(*pprofServer, mux))
}()
}
if *balance {
config, err := config.LoadConfig(*configDirectory, "", false)
if err != nil {
panic(err)
}
printBalance(config)
return
}
if *peerId {
config, err := config.LoadConfig(*configDirectory, "", false)
if err != nil {
panic(err)
}
printPeerID(config.P2P)
return
}
if *importPrivKey != "" {
config, err := config.LoadConfig(*configDirectory, *importPrivKey, false)
if err != nil {
panic(err)
}
printPeerID(config.P2P)
fmt.Println("Import completed, you are ready for the launch.")
return
}
if *nodeInfo {
config, err := config.LoadConfig(*configDirectory, "", false)
if err != nil {
panic(err)
}
printNodeInfo(config)
return
}
if !*dbConsole && *core == 0 {
config.PrintLogo()
config.PrintVersion(uint8(*network))
fmt.Println(" ")
}
nodeConfig, err := config.LoadConfig(*configDirectory, "", false)
if err != nil {
panic(err)
}
if *network != 0 {
if nodeConfig.P2P.BootstrapPeers[0] == config.BootstrapPeers[0] {
fmt.Println(
"Node has specified to run outside of mainnet but is still " +
"using default bootstrap list. This will fail. Exiting.",
)
os.Exit(1)
}
nodeConfig.Engine.GenesisSeed = fmt.Sprintf(
"%02x%s",
byte(*network),
nodeConfig.Engine.GenesisSeed,
)
nodeConfig.P2P.Network = uint8(*network)
fmt.Println(
"Node is operating outside of mainnet be sure you intended to do this.",
)
}
clearIfTestData(*configDirectory, nodeConfig)
if *dbConsole {
console, err := app.NewDBConsole(nodeConfig)
if err != nil {
panic(err)
}
console.Run()
return
}
if *dhtOnly {
done := make(chan os.Signal, 1)
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
dht, err := app.NewDHTNode(nodeConfig)
if err != nil {
panic(err)
}
go func() {
dht.Start()
}()
<-done
dht.Stop()
return
}
if *core != 0 {
// runtime.GOMAXPROCS(2)
rdebug.SetGCPercent(9999)
if nodeConfig.Engine.DataWorkerMemoryLimit == 0 {
nodeConfig.Engine.DataWorkerMemoryLimit = 1792 * 1024 * 1024 // 1.75GiB
}
rdebug.SetMemoryLimit(nodeConfig.Engine.DataWorkerMemoryLimit)
if nodeConfig.Engine.DataWorkerBaseListenMultiaddr == "" {
nodeConfig.Engine.DataWorkerBaseListenMultiaddr = "/ip4/127.0.0.1/tcp/%d"
}
if nodeConfig.Engine.DataWorkerBaseListenPort == 0 {
nodeConfig.Engine.DataWorkerBaseListenPort = 40000
}
if *parentProcess == 0 && len(nodeConfig.Engine.DataWorkerMultiaddrs) == 0 {
panic("parent process pid not specified")
}
l, err := zap.NewProduction()
if err != nil {
panic(err)
}
rpcMultiaddr := fmt.Sprintf(
nodeConfig.Engine.DataWorkerBaseListenMultiaddr,
int(nodeConfig.Engine.DataWorkerBaseListenPort)+*core-1,
)
if len(nodeConfig.Engine.DataWorkerMultiaddrs) != 0 {
rpcMultiaddr = nodeConfig.Engine.DataWorkerMultiaddrs[*core-1]
}
srv, err := rpc.NewDataWorkerIPCServer(
rpcMultiaddr,
l,
uint32(*core)-1,
qcrypto.NewWesolowskiFrameProver(l),
nodeConfig,
*parentProcess,
)
if err != nil {
panic(err)
}
err = srv.Start()
if err != nil {
panic(err)
}
return
}
fmt.Println("Loading ceremony state and starting node...")
if !*integrityCheck {
go spawnDataWorkers(nodeConfig)
}
kzg.Init()
report := RunSelfTestIfNeeded(*configDirectory, nodeConfig)
if *core == 0 {
for {
genesis, err := config.DownloadAndVerifyGenesis(*network)
if err != nil {
time.Sleep(10 * time.Minute)
continue
}
nodeConfig.Engine.GenesisSeed = genesis.GenesisSeedHex
break
}
}
done := make(chan os.Signal, 1)
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
var node *app.Node
if *debug {
node, err = app.NewDebugNode(nodeConfig, report)
} else {
node, err = app.NewNode(nodeConfig, report)
}
if err != nil {
panic(err)
}
if *integrityCheck {
fmt.Println("Running integrity check...")
node.VerifyProofIntegrity()
fmt.Println("Integrity check passed!")
return
}
// runtime.GOMAXPROCS(1)
if nodeConfig.ListenGRPCMultiaddr != "" {
srv, err := rpc.NewRPCServer(
nodeConfig.ListenGRPCMultiaddr,
nodeConfig.ListenRestMultiaddr,
node.GetLogger(),
node.GetDataProofStore(),
node.GetClockStore(),
node.GetCoinStore(),
node.GetKeyManager(),
node.GetPubSub(),
node.GetMasterClock(),
node.GetExecutionEngines(),
)
if err != nil {
panic(err)
}
go func() {
err := srv.Start()
if err != nil {
panic(err)
}
}()
}
node.Start()
<-done
stopDataWorkers()
node.Stop()
}
var dataWorkers []*exec.Cmd
func spawnDataWorkers(nodeConfig *config.Config) {
if len(nodeConfig.Engine.DataWorkerMultiaddrs) != 0 {
fmt.Println(
"Data workers configured by multiaddr, be sure these are running...",
)
return
}
process, err := os.Executable()
if err != nil {
panic(err)
}
cores := runtime.GOMAXPROCS(0)
dataWorkers = make([]*exec.Cmd, cores-1)
fmt.Printf("Spawning %d data workers...\n", cores-1)
for i := 1; i <= cores-1; i++ {
i := i
go func() {
for {
args := []string{
fmt.Sprintf("--core=%d", i),
fmt.Sprintf("--parent-process=%d", os.Getpid()),
}
args = append(args, os.Args[1:]...)
cmd := exec.Command(process, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stdout
err := cmd.Start()
if err != nil {
panic(err)
}
dataWorkers[i-1] = cmd
cmd.Wait()
time.Sleep(25 * time.Millisecond)
fmt.Printf("Data worker %d stopped, restarting...\n", i)
}
}()
}
}
func stopDataWorkers() {
for i := 0; i < len(dataWorkers); i++ {
err := dataWorkers[i].Process.Signal(os.Kill)
if err != nil {
fmt.Printf(
"fatal: unable to kill worker with pid %d, please kill this process!\n",
dataWorkers[i].Process.Pid,
)
}
}
}
func RunSelfTestIfNeeded(
configDir string,
nodeConfig *config.Config,
) *protobufs.SelfTestReport {
logger, _ := zap.NewProduction()
cores := runtime.GOMAXPROCS(0)
if len(nodeConfig.Engine.DataWorkerMultiaddrs) != 0 {
cores = len(nodeConfig.Engine.DataWorkerMultiaddrs) + 1
}
memory := memory.TotalMemory()
d, err := os.Stat(filepath.Join(configDir, "store"))
if d == nil {
err := os.Mkdir(filepath.Join(configDir, "store"), 0755)
if err != nil {
panic(err)
}
}
report := &protobufs.SelfTestReport{}
report.Cores = uint32(cores)
report.Memory = binary.BigEndian.AppendUint64([]byte{}, memory)
disk := utils.GetDiskSpace(nodeConfig.DB.Path)
report.Storage = binary.BigEndian.AppendUint64([]byte{}, disk)
logger.Info("writing report")
report.Capabilities = []*protobufs.Capability{
{
ProtocolIdentifier: 0x020000,
},
}
reportBytes, err := proto.Marshal(report)
if err != nil {
panic(err)
}
err = os.WriteFile(
filepath.Join(configDir, "SELF_TEST"),
reportBytes,
fs.FileMode(0600),
)
if err != nil {
panic(err)
}
return report
}
func clearIfTestData(configDir string, nodeConfig *config.Config) {
_, err := os.Stat(filepath.Join(configDir, "RELEASE_VERSION"))
if os.IsNotExist(err) {
fmt.Println("Clearing test data...")
err := os.RemoveAll(nodeConfig.DB.Path)
if err != nil {
panic(err)
}
versionFile, err := os.OpenFile(
filepath.Join(configDir, "RELEASE_VERSION"),
os.O_CREATE|os.O_RDWR,
fs.FileMode(0600),
)
if err != nil {
panic(err)
}
_, err = versionFile.Write([]byte{0x01, 0x00, 0x00})
if err != nil {
panic(err)
}
err = versionFile.Close()
if err != nil {
panic(err)
}
}
}
func printBalance(config *config.Config) {
if config.ListenGRPCMultiaddr == "" {
_, _ = fmt.Fprintf(os.Stderr, "gRPC Not Enabled, Please Configure\n")
os.Exit(1)
}
conn, err := app.ConnectToNode(config)
if err != nil {
panic(err)
}
defer conn.Close()
client := protobufs.NewNodeServiceClient(conn)
balance, err := app.FetchTokenBalance(client)
if err != nil {
panic(err)
}
conversionFactor, _ := new(big.Int).SetString("1DCD65000", 16)
r := new(big.Rat).SetFrac(balance.Owned, conversionFactor)
fmt.Println("Owned balance:", r.FloatString(12), "QUIL")
fmt.Println("Note: bridged balance is not reflected here, you must bridge back to QUIL to use QUIL on mainnet.")
}
func getPeerID(p2pConfig *config.P2PConfig) peer.ID {
peerPrivKey, err := hex.DecodeString(p2pConfig.PeerPrivKey)
if err != nil {
panic(errors.Wrap(err, "error unmarshaling peerkey"))
}
privKey, err := crypto.UnmarshalEd448PrivateKey(peerPrivKey)
if err != nil {
panic(errors.Wrap(err, "error unmarshaling peerkey"))
}
pub := privKey.GetPublic()
id, err := peer.IDFromPublicKey(pub)
if err != nil {
panic(errors.Wrap(err, "error getting peer id"))
}
return id
}
func printPeerID(p2pConfig *config.P2PConfig) {
id := getPeerID(p2pConfig)
fmt.Println("Peer ID: " + id.String())
}
func printNodeInfo(cfg *config.Config) {
if cfg.ListenGRPCMultiaddr == "" {
_, _ = fmt.Fprintf(os.Stderr, "gRPC Not Enabled, Please Configure\n")
os.Exit(1)
}
printPeerID(cfg.P2P)
conn, err := app.ConnectToNode(cfg)
if err != nil {
fmt.Println("Could not connect to node. If it is still booting, please wait.")
os.Exit(1)
}
defer conn.Close()
client := protobufs.NewNodeServiceClient(conn)
nodeInfo, err := app.FetchNodeInfo(client)
if err != nil {
panic(err)
}
fmt.Println("Version: " + config.FormatVersion(nodeInfo.Version))
fmt.Println("Max Frame: " + strconv.FormatUint(nodeInfo.GetMaxFrame(), 10))
fmt.Println("Peer Score: " + strconv.FormatUint(nodeInfo.GetPeerScore(), 10))
printBalance(cfg)
}