mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 18:37:26 +08:00
977 lines
23 KiB
Go
977 lines
23 KiB
Go
//go:build !js && !wasm
|
||
|
||
package main
|
||
|
||
import (
|
||
"bytes"
|
||
"context"
|
||
"crypto/sha3"
|
||
_ "embed"
|
||
"encoding/hex"
|
||
"flag"
|
||
"fmt"
|
||
"log"
|
||
"math/big"
|
||
"net"
|
||
"net/http"
|
||
npprof "net/http/pprof"
|
||
"os"
|
||
"os/signal"
|
||
"path/filepath"
|
||
"runtime"
|
||
rdebug "runtime/debug"
|
||
"runtime/pprof"
|
||
"strconv"
|
||
"strings"
|
||
"syscall"
|
||
"time"
|
||
|
||
"github.com/cloudflare/circl/sign/ed448"
|
||
"github.com/libp2p/go-libp2p/core/crypto"
|
||
"github.com/libp2p/go-libp2p/core/peer"
|
||
"github.com/multiformats/go-multiaddr"
|
||
mn "github.com/multiformats/go-multiaddr/net"
|
||
"github.com/pbnjay/memory"
|
||
"github.com/pkg/errors"
|
||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||
"go.uber.org/zap"
|
||
"google.golang.org/grpc"
|
||
"google.golang.org/grpc/credentials/insecure"
|
||
"source.quilibrium.com/quilibrium/monorepo/config"
|
||
"source.quilibrium.com/quilibrium/monorepo/node/app"
|
||
qgrpc "source.quilibrium.com/quilibrium/monorepo/node/internal/grpc"
|
||
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
|
||
"source.quilibrium.com/quilibrium/monorepo/node/rpc"
|
||
"source.quilibrium.com/quilibrium/monorepo/node/store"
|
||
"source.quilibrium.com/quilibrium/monorepo/protobufs"
|
||
qruntime "source.quilibrium.com/quilibrium/monorepo/utils/runtime"
|
||
)
|
||
|
||
func init() {
|
||
// Use the pure-Go DNS resolver to avoid SIGFPE crashes in cgo-based
|
||
// system resolvers (observed on some glibc/musl configurations).
|
||
net.DefaultResolver = &net.Resolver{PreferGo: true}
|
||
}
|
||
|
||
var (
|
||
configDirectory = flag.String(
|
||
"config",
|
||
filepath.Join(".", ".config"),
|
||
"the configuration directory",
|
||
)
|
||
peerId = flag.Bool(
|
||
"peer-id",
|
||
false,
|
||
"print the peer id to stdout from the config and exit",
|
||
)
|
||
cpuprofile = flag.String(
|
||
"cpuprofile",
|
||
"",
|
||
"write cpu profile to file",
|
||
)
|
||
memprofile = flag.String(
|
||
"memprofile",
|
||
"",
|
||
"write memory profile after 20m to this file",
|
||
)
|
||
pprofServer = flag.String(
|
||
"pprof-server",
|
||
"",
|
||
"enable pprof server on specified address (e.g. localhost:6060)",
|
||
)
|
||
prometheusServer = flag.String(
|
||
"prometheus-server",
|
||
"",
|
||
"enable prometheus server on specified address (e.g. localhost:8080)",
|
||
)
|
||
nodeInfo = flag.Bool(
|
||
"node-info",
|
||
false,
|
||
"print node related information",
|
||
)
|
||
peerInfo = flag.Bool(
|
||
"peer-info",
|
||
false,
|
||
"prints peer info",
|
||
)
|
||
debug = flag.Bool(
|
||
"debug",
|
||
false,
|
||
"sets log output to debug (verbose)",
|
||
)
|
||
dhtOnly = flag.Bool(
|
||
"dht-only",
|
||
false,
|
||
"sets a node to run strictly as a dht bootstrap peer (not full node)",
|
||
)
|
||
network = flag.Uint(
|
||
"network",
|
||
0,
|
||
"sets the active network for the node (mainnet = 0, primary testnet = 1)",
|
||
)
|
||
signatureCheck = flag.Bool(
|
||
"signature-check",
|
||
signatureCheckDefault(),
|
||
"enables or disables signature validation (default true or value of QUILIBRIUM_SIGNATURE_CHECK env var)",
|
||
)
|
||
core = flag.Int(
|
||
"core",
|
||
0,
|
||
"specifies the core of the process (defaults to zero, the initial launcher)",
|
||
)
|
||
parentProcess = flag.Int(
|
||
"parent-process",
|
||
0,
|
||
"specifies the parent process pid for a data worker",
|
||
)
|
||
compactDB = flag.Bool(
|
||
"compact-db",
|
||
false,
|
||
"compacts the database and exits",
|
||
)
|
||
dbConsole = flag.Bool(
|
||
"db-console",
|
||
false,
|
||
"starts the db console mode (does not run nodes)",
|
||
)
|
||
dangerClearPending = flag.Bool(
|
||
"danger-clear-pending",
|
||
false,
|
||
"clears pending states (dangerous action)",
|
||
)
|
||
|
||
// *char flags
|
||
blockchar = "█"
|
||
bver = "Bloom"
|
||
char *string = &blockchar
|
||
ver *string = &bver
|
||
)
|
||
|
||
var capabilityLabels = map[uint32]string{
|
||
0x00010001: "Compute Protocol v1",
|
||
0x00010101: "KZG Verify (BLS48581)",
|
||
0x00010201: "Bulletproof Range Verify (Decaf448)",
|
||
0x00010301: "Bulletproof Sum Verify (Decaf448)",
|
||
0x00010401: "SECP256K1 ECDSA Verify",
|
||
0x00010501: "ED25519 EdDSA Verify",
|
||
0x00010601: "ED448 EdDSA Verify",
|
||
0x00010701: "DECAF448 Schnorr Verify",
|
||
0x00010801: "SECP256R1 ECDSA Verify",
|
||
0x00020001: "Global Protocol v1",
|
||
0x00030001: "Hypergraph Protocol v1",
|
||
0x00040001: "Token Protocol v1",
|
||
0x0101: "Double Ratchet v1",
|
||
0x0201: "Triple Ratchet v1",
|
||
0x0301: "Onion Routing v1",
|
||
}
|
||
|
||
func signatureCheckDefault() bool {
|
||
envVarValue, envVarExists := os.LookupEnv("QUILIBRIUM_SIGNATURE_CHECK")
|
||
if envVarExists {
|
||
def, err := strconv.ParseBool(envVarValue)
|
||
if err == nil {
|
||
return def
|
||
} else {
|
||
fmt.Println(
|
||
"Invalid environment variable QUILIBRIUM_SIGNATURE_CHECK, must be 'true' or 'false':",
|
||
envVarValue,
|
||
)
|
||
}
|
||
}
|
||
|
||
return true
|
||
}
|
||
|
||
// monitorParentProcess watches parent process and stops the worker if parent dies
|
||
func monitorParentProcess(
|
||
parentProcessId int,
|
||
stopFunc func(),
|
||
logger *zap.Logger,
|
||
) {
|
||
for {
|
||
time.Sleep(1 * time.Second)
|
||
proc, err := os.FindProcess(parentProcessId)
|
||
if err != nil {
|
||
logger.Error("parent process not found, terminating")
|
||
if stopFunc != nil {
|
||
stopFunc()
|
||
}
|
||
return
|
||
}
|
||
|
||
// Windows returns an error if the process is dead, nobody else does
|
||
if runtime.GOOS != "windows" {
|
||
err := proc.Signal(syscall.Signal(0))
|
||
if err != nil {
|
||
logger.Error("parent process not found, terminating")
|
||
if stopFunc != nil {
|
||
stopFunc()
|
||
}
|
||
return
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
func main() {
|
||
config.Flags(&char, &ver)
|
||
flag.Parse()
|
||
|
||
nodeConfig, err := config.LoadConfig(*configDirectory, "", false)
|
||
if err != nil {
|
||
log.Fatal("failed to load config", err)
|
||
}
|
||
|
||
if *dbConsole {
|
||
db, err := app.NewDBConsole(nodeConfig)
|
||
if err != nil {
|
||
log.Fatal(err)
|
||
}
|
||
db.Run()
|
||
os.Exit(0)
|
||
}
|
||
|
||
logger, closer, err := nodeConfig.CreateLogger(uint(*core), *debug)
|
||
if err != nil {
|
||
log.Fatal("failed to create logger", err)
|
||
}
|
||
defer closer.Close()
|
||
|
||
if *signatureCheck {
|
||
if runtime.GOOS == "windows" {
|
||
logger.Info("Signature check not available for windows yet, skipping...")
|
||
} else {
|
||
ex, err := os.Executable()
|
||
if err != nil {
|
||
logger.Panic(
|
||
"Failed to get executable path",
|
||
zap.Error(err),
|
||
zap.String("executable", ex),
|
||
)
|
||
}
|
||
|
||
b, err := os.ReadFile(ex)
|
||
if err != nil {
|
||
logger.Panic(
|
||
"Error encountered during signature check – are you running this "+
|
||
"from source? (use --signature-check=false)",
|
||
zap.Error(err),
|
||
)
|
||
}
|
||
|
||
checksum := sha3.Sum256(b)
|
||
digest, err := os.ReadFile(ex + ".dgst")
|
||
if err != nil {
|
||
logger.Fatal("digest file not found", zap.Error(err))
|
||
}
|
||
|
||
parts := strings.Split(string(digest), " ")
|
||
if len(parts) != 2 {
|
||
logger.Fatal("Invalid digest file format")
|
||
}
|
||
|
||
digestBytes, err := hex.DecodeString(parts[1][:64])
|
||
if err != nil {
|
||
logger.Fatal("invalid digest file format", zap.Error(err))
|
||
}
|
||
|
||
if !bytes.Equal(checksum[:], digestBytes) {
|
||
logger.Fatal("invalid digest for node")
|
||
}
|
||
|
||
count := 0
|
||
|
||
for i := 1; i <= len(config.Signatories); i++ {
|
||
signatureFile := fmt.Sprintf(ex+".dgst.sig.%d", i)
|
||
sig, err := os.ReadFile(signatureFile)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
|
||
pubkey, _ := hex.DecodeString(config.Signatories[i-1])
|
||
if !ed448.Verify(pubkey, digest, sig, "") {
|
||
logger.Fatal(
|
||
"failed signature check for signatory",
|
||
zap.Int("signatory", i),
|
||
)
|
||
}
|
||
count++
|
||
}
|
||
|
||
if count < ((len(config.Signatories)-4)/2)+((len(config.Signatories)-4)%2) {
|
||
logger.Fatal("quorum on signatures not met")
|
||
}
|
||
|
||
logger.Info("signature check passed")
|
||
}
|
||
} else {
|
||
logger.Info("signature check disabled, skipping...")
|
||
}
|
||
|
||
if *core == 0 {
|
||
logger = logger.With(zap.String("process", "master"))
|
||
} else {
|
||
logger = logger.With(zap.String("process", fmt.Sprintf("worker %d", *core)))
|
||
}
|
||
|
||
if *memprofile != "" && *core == 0 {
|
||
go func() {
|
||
for {
|
||
time.Sleep(5 * time.Minute)
|
||
f, err := os.Create(*memprofile)
|
||
if err != nil {
|
||
logger.Fatal("failed to create memory profile file", zap.Error(err))
|
||
}
|
||
pprof.WriteHeapProfile(f)
|
||
f.Close()
|
||
}
|
||
}()
|
||
}
|
||
|
||
if *cpuprofile != "" && *core == 0 {
|
||
f, err := os.Create(*cpuprofile)
|
||
if err != nil {
|
||
logger.Fatal("failed to create cpu profile file", zap.Error(err))
|
||
}
|
||
defer f.Close()
|
||
pprof.StartCPUProfile(f)
|
||
defer pprof.StopCPUProfile()
|
||
}
|
||
|
||
if *pprofServer != "" && *core == 0 {
|
||
go func() {
|
||
mux := http.NewServeMux()
|
||
mux.HandleFunc("/debug/pprof/", npprof.Index)
|
||
mux.HandleFunc("/debug/pprof/cmdline", npprof.Cmdline)
|
||
mux.HandleFunc("/debug/pprof/profile", npprof.Profile)
|
||
mux.HandleFunc("/debug/pprof/symbol", npprof.Symbol)
|
||
mux.HandleFunc("/debug/pprof/trace", npprof.Trace)
|
||
logger.Fatal(
|
||
"Failed to start pprof server",
|
||
zap.Error(http.ListenAndServe(*pprofServer, mux)),
|
||
)
|
||
}()
|
||
}
|
||
|
||
if *prometheusServer != "" && *core == 0 {
|
||
go func() {
|
||
mux := http.NewServeMux()
|
||
mux.Handle("/metrics", promhttp.Handler())
|
||
logger.Fatal(
|
||
"Failed to start prometheus server",
|
||
zap.Error(http.ListenAndServe(*prometheusServer, mux)),
|
||
)
|
||
}()
|
||
}
|
||
|
||
if *peerId {
|
||
printPeerID(logger, nodeConfig.P2P)
|
||
return
|
||
}
|
||
|
||
if *nodeInfo {
|
||
config, err := config.LoadConfig(*configDirectory, "", false)
|
||
if err != nil {
|
||
logger.Fatal("failed to load config", zap.Error(err))
|
||
}
|
||
|
||
printNodeInfo(logger, config)
|
||
return
|
||
}
|
||
|
||
if *peerInfo {
|
||
config, err := config.LoadConfig(*configDirectory, "", false)
|
||
if err != nil {
|
||
logger.Fatal("failed to load config", zap.Error(err))
|
||
}
|
||
|
||
printPeerInfo(logger, config)
|
||
return
|
||
}
|
||
|
||
if *dangerClearPending {
|
||
db := store.NewPebbleDB(logger, nodeConfig, 0)
|
||
defer db.Close()
|
||
consensusStore := store.NewPebbleConsensusStore(db, logger)
|
||
state, err := consensusStore.GetConsensusState(nil)
|
||
if err != nil {
|
||
fmt.Println(err.Error())
|
||
return
|
||
}
|
||
|
||
clockStore := store.NewPebbleClockStore(db, logger)
|
||
qc, err := clockStore.GetQuorumCertificate(nil, state.FinalizedRank)
|
||
if err != nil {
|
||
fmt.Println(err.Error())
|
||
return
|
||
}
|
||
|
||
err = clockStore.DeleteGlobalClockFrameRange(
|
||
qc.FrameNumber+1,
|
||
qc.FrameNumber+10000,
|
||
)
|
||
if err != nil {
|
||
fmt.Println(err.Error())
|
||
return
|
||
}
|
||
|
||
err = clockStore.DeleteQuorumCertificateRange(
|
||
nil,
|
||
qc.Rank+1,
|
||
qc.Rank+10000,
|
||
)
|
||
if err != nil {
|
||
fmt.Println(err.Error())
|
||
return
|
||
}
|
||
|
||
if state.LatestTimeout != nil {
|
||
latestTCRank := state.LatestTimeout.Rank
|
||
err = clockStore.DeleteTimeoutCertificateRange(
|
||
nil,
|
||
latestTCRank+1,
|
||
latestTCRank+10000,
|
||
latestTCRank,
|
||
)
|
||
if err != nil {
|
||
fmt.Println(err.Error())
|
||
return
|
||
}
|
||
}
|
||
|
||
fmt.Println("pending entries cleared")
|
||
return
|
||
}
|
||
|
||
if *core == 0 {
|
||
config.PrintLogo(*char)
|
||
config.PrintVersion(uint8(*network), *char, *ver)
|
||
fmt.Println(" ")
|
||
}
|
||
|
||
if *compactDB {
|
||
db := store.NewPebbleDB(logger, nodeConfig, uint(*core))
|
||
if err := db.CompactAll(); err != nil {
|
||
logger.Fatal("failed to compact database", zap.Error(err))
|
||
}
|
||
if err := db.Close(); err != nil {
|
||
logger.Fatal("failed to close database", zap.Error(err))
|
||
}
|
||
return
|
||
}
|
||
|
||
if *network != 0 {
|
||
if nodeConfig.P2P.BootstrapPeers[0] == config.BootstrapPeers[0] {
|
||
logger.Fatal(
|
||
"node has specified to run outside of mainnet but is still " +
|
||
"using default bootstrap list. this will fail. exiting.",
|
||
)
|
||
}
|
||
|
||
nodeConfig.P2P.Network = uint8(*network)
|
||
logger.Warn(
|
||
"node is operating outside of mainnet – be sure you intended to do this.",
|
||
)
|
||
}
|
||
|
||
if *dhtOnly {
|
||
done := make(chan os.Signal, 1)
|
||
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
|
||
dht, err := app.NewDHTNode(logger, nodeConfig, 0, p2p.ConfigDir(*configDirectory))
|
||
if err != nil {
|
||
logger.Error("failed to start dht node", zap.Error(err))
|
||
}
|
||
|
||
go func() {
|
||
dht.Start()
|
||
}()
|
||
|
||
<-done
|
||
dht.Stop()
|
||
return
|
||
}
|
||
|
||
if len(nodeConfig.Engine.DataWorkerP2PMultiaddrs) == 0 {
|
||
maxProcs, numCPU := runtime.GOMAXPROCS(0), runtime.NumCPU()
|
||
if maxProcs > numCPU && !nodeConfig.Engine.AllowExcessiveGOMAXPROCS {
|
||
logger.Fatal(
|
||
"GOMAXPROCS is set higher than the number of available cpus.",
|
||
)
|
||
}
|
||
nodeConfig.Engine.DataWorkerCount = qruntime.WorkerCount(
|
||
nodeConfig.Engine.DataWorkerCount, true, true,
|
||
)
|
||
} else {
|
||
nodeConfig.Engine.DataWorkerCount = len(
|
||
nodeConfig.Engine.DataWorkerP2PMultiaddrs,
|
||
)
|
||
}
|
||
|
||
if len(nodeConfig.Engine.DataWorkerP2PMultiaddrs) !=
|
||
len(nodeConfig.Engine.DataWorkerStreamMultiaddrs) {
|
||
logger.Fatal("mismatch of worker count for p2p and stream multiaddrs")
|
||
}
|
||
|
||
if *core != 0 {
|
||
rdebug.SetMemoryLimit(nodeConfig.Engine.DataWorkerMemoryLimit)
|
||
|
||
if *parentProcess == 0 &&
|
||
len(nodeConfig.Engine.DataWorkerP2PMultiaddrs) == 0 {
|
||
logger.Fatal("parent process pid not specified")
|
||
}
|
||
|
||
rpcMultiaddr := fmt.Sprintf(
|
||
nodeConfig.Engine.DataWorkerBaseListenMultiaddr,
|
||
int(nodeConfig.Engine.DataWorkerBaseStreamPort)+*core-1,
|
||
)
|
||
|
||
if len(nodeConfig.Engine.DataWorkerStreamMultiaddrs) != 0 {
|
||
rpcMultiaddr = nodeConfig.Engine.DataWorkerStreamMultiaddrs[*core-1]
|
||
}
|
||
|
||
rpcMultiaddr = strings.Replace(rpcMultiaddr, "/0.0.0.0/", "/127.0.0.1/", 1)
|
||
rpcMultiaddr = strings.Replace(rpcMultiaddr, "/0:0:0:0:0:0:0:0/", "/::1/", 1)
|
||
rpcMultiaddr = strings.Replace(rpcMultiaddr, "/::/", "/::1/", 1)
|
||
// force TCP as stream is not supported over UDP/QUIC
|
||
rpcMultiaddr = strings.Replace(rpcMultiaddr, "/quic-v1", "", 1)
|
||
rpcMultiaddr = strings.Replace(rpcMultiaddr, "udp", "tcp", 1)
|
||
|
||
dataWorkerNode, err := app.NewDataWorkerNode(
|
||
logger,
|
||
nodeConfig,
|
||
uint(*core),
|
||
rpcMultiaddr,
|
||
*parentProcess,
|
||
p2p.ConfigDir(*configDirectory),
|
||
)
|
||
if err != nil {
|
||
logger.Panic("failed to create data worker node", zap.Error(err))
|
||
}
|
||
|
||
if *parentProcess != 0 {
|
||
go monitorParentProcess(*parentProcess, dataWorkerNode.Stop, logger)
|
||
}
|
||
|
||
done := make(chan os.Signal, 1)
|
||
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
|
||
quitCh := make(chan struct{})
|
||
|
||
go func() {
|
||
err = dataWorkerNode.Start(done, quitCh)
|
||
if err != nil {
|
||
logger.Panic("failed to start data worker node", zap.Error(err))
|
||
close(quitCh)
|
||
}
|
||
}()
|
||
|
||
diskFullCh := make(chan error, 1)
|
||
monitor := store.NewDiskMonitor(
|
||
uint(*core),
|
||
*nodeConfig.DB,
|
||
logger,
|
||
diskFullCh,
|
||
)
|
||
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
defer cancel()
|
||
monitor.Start(ctx)
|
||
|
||
loop:
|
||
for {
|
||
select {
|
||
case <-diskFullCh:
|
||
dataWorkerNode.Stop()
|
||
case <-quitCh:
|
||
dataWorkerNode.Stop()
|
||
break loop
|
||
}
|
||
}
|
||
|
||
return
|
||
} else {
|
||
totalMemory := int64(memory.TotalMemory())
|
||
dataWorkerReservedMemory := int64(0)
|
||
if len(nodeConfig.Engine.DataWorkerStreamMultiaddrs) == 0 {
|
||
dataWorkerReservedMemory =
|
||
nodeConfig.Engine.DataWorkerMemoryLimit * int64(
|
||
nodeConfig.Engine.DataWorkerCount,
|
||
)
|
||
}
|
||
switch availableOverhead := totalMemory - dataWorkerReservedMemory; {
|
||
case totalMemory < dataWorkerReservedMemory:
|
||
logger.Warn(
|
||
"the memory allocated to data workers exceeds the total system memory",
|
||
zap.Int64("total_memory", totalMemory),
|
||
zap.Int64("data_worker_reserved_memory", dataWorkerReservedMemory),
|
||
)
|
||
logger.Warn("you are at risk of running out of memory during runtime")
|
||
case availableOverhead < 2*1024*1024*1024:
|
||
logger.Warn(
|
||
"the memory available to the node, unallocated to "+
|
||
"the data workers, is less than 2gb",
|
||
zap.Int64("available_overhead", availableOverhead),
|
||
)
|
||
logger.Warn("you are at risk of running out of memory during runtime")
|
||
default:
|
||
// Use defaults if archive mode:
|
||
if !nodeConfig.Engine.ArchiveMode {
|
||
if _, limit := os.LookupEnv("GOMEMLIMIT"); !limit {
|
||
rdebug.SetMemoryLimit(availableOverhead * 8 / 10)
|
||
}
|
||
if _, explicitGOGC := os.LookupEnv("GOGC"); !explicitGOGC {
|
||
rdebug.SetGCPercent(10)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
logger.Info("starting node...")
|
||
|
||
// Create MasterNode for core 0
|
||
masterNode, err := app.NewMasterNode(logger, nodeConfig, uint(*core), p2p.ConfigDir(*configDirectory))
|
||
if err != nil {
|
||
logger.Panic("failed to create master node", zap.Error(err))
|
||
}
|
||
|
||
// Start the master node
|
||
ctx, quit := context.WithCancel(context.Background())
|
||
errCh := masterNode.Start(ctx)
|
||
defer masterNode.Stop()
|
||
|
||
if nodeConfig.ListenGRPCMultiaddr != "" {
|
||
srv, err := rpc.NewRPCServer(
|
||
nodeConfig,
|
||
masterNode.GetLogger(),
|
||
masterNode.GetKeyManager(),
|
||
masterNode.GetPubSub(),
|
||
masterNode.GetPeerInfoManager(),
|
||
masterNode.GetWorkerManager(),
|
||
masterNode.GetProverRegistry(),
|
||
masterNode.GetExecutionEngineManager(),
|
||
)
|
||
if err != nil {
|
||
logger.Panic("failed to new rpc server", zap.Error(err))
|
||
}
|
||
if err := srv.Start(); err != nil {
|
||
logger.Panic("failed to start rpc server", zap.Error(err))
|
||
}
|
||
defer srv.Stop()
|
||
}
|
||
|
||
diskFullCh := make(chan error, 1)
|
||
monitor := store.NewDiskMonitor(
|
||
uint(*core),
|
||
*nodeConfig.DB,
|
||
logger,
|
||
diskFullCh,
|
||
)
|
||
|
||
monitor.Start(ctx)
|
||
|
||
select {
|
||
case <-diskFullCh:
|
||
quit()
|
||
case err := <-errCh:
|
||
if err != nil {
|
||
logger.Error("master node error", zap.Error(err))
|
||
}
|
||
quit()
|
||
}
|
||
}
|
||
|
||
func getPeerID(logger *zap.Logger, p2pConfig *config.P2PConfig) peer.ID {
|
||
peerPrivKey, err := hex.DecodeString(p2pConfig.PeerPrivKey)
|
||
if err != nil {
|
||
logger.Panic("error to decode peer private key",
|
||
zap.Error(errors.Wrap(err, "error unmarshaling peerkey")))
|
||
}
|
||
|
||
privKey, err := crypto.UnmarshalEd448PrivateKey(peerPrivKey)
|
||
if err != nil {
|
||
logger.Panic("error to unmarshal ed448 private key",
|
||
zap.Error(errors.Wrap(err, "error unmarshaling peerkey")))
|
||
}
|
||
|
||
pub := privKey.GetPublic()
|
||
id, err := peer.IDFromPublicKey(pub)
|
||
if err != nil {
|
||
logger.Panic("error to get peer id", zap.Error(err))
|
||
}
|
||
|
||
return id
|
||
}
|
||
|
||
func printPeerID(logger *zap.Logger, p2pConfig *config.P2PConfig) {
|
||
id := getPeerID(logger, p2pConfig)
|
||
|
||
fmt.Println("Peer ID: " + id.String())
|
||
}
|
||
|
||
func printNodeInfo(logger *zap.Logger, cfg *config.Config) {
|
||
if cfg.ListenGRPCMultiaddr == "" {
|
||
logger.Fatal("gRPC Not Enabled, Please Configure")
|
||
}
|
||
|
||
printPeerID(logger, cfg.P2P)
|
||
|
||
conn, err := ConnectToNode(logger, cfg)
|
||
if err != nil {
|
||
logger.Fatal(
|
||
"could not connect to node. if it is still booting, please wait.",
|
||
zap.Error(err),
|
||
)
|
||
}
|
||
defer conn.Close()
|
||
|
||
client := protobufs.NewNodeServiceClient(conn)
|
||
|
||
nodeInfo, err := FetchNodeInfo(client)
|
||
if err != nil {
|
||
logger.Panic("failed to fetch node info", zap.Error(err))
|
||
}
|
||
|
||
if patch := formatPatchNumber(nodeInfo.PatchNumber); patch != "" {
|
||
fmt.Println("Version:", config.FormatVersion(nodeInfo.Version)+"."+patch)
|
||
} else {
|
||
fmt.Println("Version:", config.FormatVersion(nodeInfo.Version))
|
||
}
|
||
|
||
fmt.Println("Seniority: " + new(big.Int).SetBytes(
|
||
nodeInfo.PeerSeniority,
|
||
).String())
|
||
fmt.Println("Running Workers:", nodeInfo.RunningWorkers)
|
||
fmt.Println("Active Workers:", nodeInfo.AllocatedWorkers)
|
||
}
|
||
|
||
func printPeerInfo(logger *zap.Logger, cfg *config.Config) {
|
||
if cfg.ListenGRPCMultiaddr == "" {
|
||
logger.Fatal("gRPC Not Enabled, Please Configure")
|
||
}
|
||
|
||
printPeerID(logger, cfg.P2P)
|
||
|
||
conn, err := ConnectToNode(logger, cfg)
|
||
if err != nil {
|
||
logger.Fatal(
|
||
"could not connect to node. if it is still booting, please wait.",
|
||
zap.Error(err),
|
||
)
|
||
}
|
||
defer conn.Close()
|
||
|
||
client := protobufs.NewNodeServiceClient(conn)
|
||
|
||
peerInfo, err := client.GetPeerInfo(
|
||
context.Background(),
|
||
&protobufs.GetPeerInfoRequest{},
|
||
)
|
||
if err != nil {
|
||
logger.Panic("failed to fetch node info", zap.Error(err))
|
||
}
|
||
|
||
for idx, p := range peerInfo.PeerInfo {
|
||
if p == nil {
|
||
continue
|
||
}
|
||
fmt.Printf("Peer %d:\n", idx+1)
|
||
|
||
if peerID := formatPeerID(p.PeerId); peerID != "" {
|
||
fmt.Println(" Peer ID:", peerID)
|
||
}
|
||
|
||
if len(p.Version) >= 3 {
|
||
if patch := formatPatchNumber(p.PatchNumber); patch != "" {
|
||
fmt.Println(" Version:", config.FormatVersion(p.Version)+"."+patch)
|
||
} else {
|
||
fmt.Println(" Version:", config.FormatVersion(p.Version))
|
||
}
|
||
}
|
||
|
||
if p.Timestamp != 0 {
|
||
fmt.Println(
|
||
" Last Seen:",
|
||
time.UnixMilli(p.Timestamp).UTC().Format(time.RFC3339),
|
||
)
|
||
}
|
||
|
||
printReachability(p.Reachability)
|
||
printCapabilities(p.Capabilities)
|
||
|
||
if p.LastReceivedFrame != 0 {
|
||
fmt.Printf(
|
||
" Last Received Global Frame: %d\n",
|
||
p.LastReceivedFrame,
|
||
)
|
||
}
|
||
|
||
if p.LastGlobalHeadFrame != 0 {
|
||
fmt.Printf(
|
||
" Last Global Head Frame: %d\n",
|
||
p.LastGlobalHeadFrame,
|
||
)
|
||
}
|
||
|
||
if len(p.PublicKey) > 0 {
|
||
fmt.Println(" Public Key:", hex.EncodeToString(p.PublicKey))
|
||
}
|
||
|
||
if len(p.Signature) > 0 {
|
||
fmt.Println(" Signature:", hex.EncodeToString(p.Signature))
|
||
}
|
||
|
||
if idx < len(peerInfo.PeerInfo)-1 {
|
||
fmt.Println()
|
||
}
|
||
}
|
||
}
|
||
|
||
func formatPeerID(raw []byte) string {
|
||
if len(raw) == 0 {
|
||
return ""
|
||
}
|
||
id, err := peer.IDFromBytes(raw)
|
||
if err != nil {
|
||
return hex.EncodeToString(raw)
|
||
}
|
||
return id.String()
|
||
}
|
||
|
||
func capabilityDescription(id uint32) string {
|
||
if name, ok := capabilityLabels[id]; ok {
|
||
return fmt.Sprintf("%s (0x%08X)", name, id)
|
||
}
|
||
return fmt.Sprintf("0x%08X", id)
|
||
}
|
||
|
||
func formatPatchNumber(raw []byte) string {
|
||
if len(raw) == 0 {
|
||
return ""
|
||
}
|
||
if len(raw) == 1 {
|
||
return fmt.Sprintf("%d", raw[0])
|
||
}
|
||
return fmt.Sprintf("0x%s", hex.EncodeToString(raw))
|
||
}
|
||
|
||
func nonEmptyStrings(values []string) []string {
|
||
out := make([]string, 0, len(values))
|
||
for _, v := range values {
|
||
if v != "" {
|
||
out = append(out, v)
|
||
}
|
||
}
|
||
return out
|
||
}
|
||
|
||
func printReachability(reach []*protobufs.Reachability) {
|
||
printed := false
|
||
for _, r := range reach {
|
||
if r == nil {
|
||
continue
|
||
}
|
||
filter := hex.EncodeToString(r.Filter)
|
||
pubsub := nonEmptyStrings(r.PubsubMultiaddrs)
|
||
stream := nonEmptyStrings(r.StreamMultiaddrs)
|
||
if filter == "" && len(pubsub) == 0 && len(stream) == 0 {
|
||
continue
|
||
}
|
||
if !printed {
|
||
fmt.Println(" Reachability:")
|
||
printed = true
|
||
}
|
||
//fmt.Println(" -")
|
||
if filter != "" {
|
||
fmt.Println(" - Filter:", filter)
|
||
}
|
||
if len(pubsub) > 0 {
|
||
fmt.Println(" - Pubsub Multiaddrs:")
|
||
for _, addr := range pubsub {
|
||
fmt.Println(" - " + addr)
|
||
}
|
||
}
|
||
if len(stream) > 0 {
|
||
fmt.Println(" - Stream Multiaddrs:")
|
||
for _, addr := range stream {
|
||
fmt.Println(" - " + addr)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
func printCapabilities(list []*protobufs.Capability) {
|
||
entries := make([]string, 0, len(list))
|
||
for _, capability := range list {
|
||
if capability == nil {
|
||
continue
|
||
}
|
||
desc := capabilityDescription(capability.ProtocolIdentifier)
|
||
if len(capability.AdditionalMetadata) > 0 {
|
||
desc = fmt.Sprintf(
|
||
"%s (metadata: %s)",
|
||
desc,
|
||
hex.EncodeToString(capability.AdditionalMetadata),
|
||
)
|
||
}
|
||
entries = append(entries, desc)
|
||
}
|
||
if len(entries) == 0 {
|
||
return
|
||
}
|
||
fmt.Println(" Capabilities:")
|
||
for _, entry := range entries {
|
||
fmt.Println(" - " + entry)
|
||
}
|
||
}
|
||
|
||
var defaultGrpcAddress = "localhost:8337"
|
||
|
||
// Connect to the node via GRPC
|
||
func ConnectToNode(logger *zap.Logger, nodeConfig *config.Config) (
|
||
*grpc.ClientConn,
|
||
error,
|
||
) {
|
||
addr := defaultGrpcAddress
|
||
if nodeConfig.ListenGRPCMultiaddr != "" {
|
||
ma, err := multiaddr.NewMultiaddr(nodeConfig.ListenGRPCMultiaddr)
|
||
if err != nil {
|
||
logger.Panic("error parsing multiaddr", zap.Error(err))
|
||
}
|
||
|
||
_, addr, err = mn.DialArgs(ma)
|
||
if err != nil {
|
||
logger.Panic("error getting dial args", zap.Error(err))
|
||
}
|
||
}
|
||
|
||
return qgrpc.DialContext(
|
||
context.Background(),
|
||
addr,
|
||
grpc.WithTransportCredentials(
|
||
insecure.NewCredentials(),
|
||
),
|
||
grpc.WithDefaultCallOptions(
|
||
grpc.MaxCallSendMsgSize(600*1024*1024),
|
||
grpc.MaxCallRecvMsgSize(600*1024*1024),
|
||
),
|
||
)
|
||
}
|
||
|
||
type TokenBalance struct {
|
||
Owned *big.Int
|
||
UnconfirmedOwned *big.Int
|
||
}
|
||
|
||
func FetchNodeInfo(
|
||
client protobufs.NodeServiceClient,
|
||
) (*protobufs.NodeInfoResponse, error) {
|
||
info, err := client.GetNodeInfo(
|
||
context.Background(),
|
||
&protobufs.GetNodeInfoRequest{},
|
||
)
|
||
if err != nil {
|
||
return nil, errors.Wrap(err, "error getting node info")
|
||
}
|
||
|
||
return info, nil
|
||
}
|