//go:build !js && !wasm package main import ( "bytes" "context" "crypto/sha3" _ "embed" "encoding/hex" "flag" "fmt" "log" "math/big" "net/http" npprof "net/http/pprof" "os" "os/signal" "path/filepath" "runtime" rdebug "runtime/debug" "runtime/pprof" "strconv" "strings" "syscall" "time" "github.com/cloudflare/circl/sign/ed448" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" mn "github.com/multiformats/go-multiaddr/net" "github.com/pbnjay/memory" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus/promhttp" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "source.quilibrium.com/quilibrium/monorepo/config" "source.quilibrium.com/quilibrium/monorepo/node/app" qgrpc "source.quilibrium.com/quilibrium/monorepo/node/internal/grpc" "source.quilibrium.com/quilibrium/monorepo/node/rpc" "source.quilibrium.com/quilibrium/monorepo/node/store" "source.quilibrium.com/quilibrium/monorepo/protobufs" qruntime "source.quilibrium.com/quilibrium/monorepo/utils/runtime" ) var ( configDirectory = flag.String( "config", filepath.Join(".", ".config"), "the configuration directory", ) peerId = flag.Bool( "peer-id", false, "print the peer id to stdout from the config and exit", ) cpuprofile = flag.String( "cpuprofile", "", "write cpu profile to file", ) memprofile = flag.String( "memprofile", "", "write memory profile after 20m to this file", ) pprofServer = flag.String( "pprof-server", "", "enable pprof server on specified address (e.g. localhost:6060)", ) prometheusServer = flag.String( "prometheus-server", "", "enable prometheus server on specified address (e.g. localhost:8080)", ) nodeInfo = flag.Bool( "node-info", false, "print node related information", ) peerInfo = flag.Bool( "peer-info", false, "prints peer info", ) debug = flag.Bool( "debug", false, "sets log output to debug (verbose)", ) dhtOnly = flag.Bool( "dht-only", false, "sets a node to run strictly as a dht bootstrap peer (not full node)", ) network = flag.Uint( "network", 0, "sets the active network for the node (mainnet = 0, primary testnet = 1)", ) signatureCheck = flag.Bool( "signature-check", signatureCheckDefault(), "enables or disables signature validation (default true or value of QUILIBRIUM_SIGNATURE_CHECK env var)", ) core = flag.Int( "core", 0, "specifies the core of the process (defaults to zero, the initial launcher)", ) parentProcess = flag.Int( "parent-process", 0, "specifies the parent process pid for a data worker", ) compactDB = flag.Bool( "compact-db", false, "compacts the database and exits", ) dbConsole = flag.Bool( "db-console", false, "starts the db console mode (does not run nodes)", ) dangerClearPending = flag.Bool( "danger-clear-pending", false, "clears pending states (dangerous action)", ) // *char flags blockchar = "█" bver = "Bloom" char *string = &blockchar ver *string = &bver ) var capabilityLabels = map[uint32]string{ 0x00010001: "Compute Protocol v1", 0x00010101: "KZG Verify (BLS48581)", 0x00010201: "Bulletproof Range Verify (Decaf448)", 0x00010301: "Bulletproof Sum Verify (Decaf448)", 0x00010401: "SECP256K1 ECDSA Verify", 0x00010501: "ED25519 EdDSA Verify", 0x00010601: "ED448 EdDSA Verify", 0x00010701: "DECAF448 Schnorr Verify", 0x00010801: "SECP256R1 ECDSA Verify", 0x00020001: "Global Protocol v1", 0x00030001: "Hypergraph Protocol v1", 0x00040001: "Token Protocol v1", 0x0101: "Double Ratchet v1", 0x0201: "Triple Ratchet v1", 0x0301: "Onion Routing v1", } func signatureCheckDefault() bool { envVarValue, envVarExists := os.LookupEnv("QUILIBRIUM_SIGNATURE_CHECK") if envVarExists { def, err := strconv.ParseBool(envVarValue) if err == nil { return def } else { fmt.Println( "Invalid environment variable QUILIBRIUM_SIGNATURE_CHECK, must be 'true' or 'false':", envVarValue, ) } } return true } // monitorParentProcess watches parent process and stops the worker if parent dies func monitorParentProcess( parentProcessId int, stopFunc func(), logger *zap.Logger, ) { for { time.Sleep(1 * time.Second) proc, err := os.FindProcess(parentProcessId) if err != nil { logger.Error("parent process not found, terminating") if stopFunc != nil { stopFunc() } return } // Windows returns an error if the process is dead, nobody else does if runtime.GOOS != "windows" { err := proc.Signal(syscall.Signal(0)) if err != nil { logger.Error("parent process not found, terminating") if stopFunc != nil { stopFunc() } return } } } } func main() { config.Flags(&char, &ver) flag.Parse() nodeConfig, err := config.LoadConfig(*configDirectory, "", false) if err != nil { log.Fatal("failed to load config", err) } if *dbConsole { db, err := app.NewDBConsole(nodeConfig) if err != nil { log.Fatal(err) } db.Run() os.Exit(0) } logger, closer, err := nodeConfig.CreateLogger(uint(*core), *debug) if err != nil { log.Fatal("failed to create logger", err) } defer closer.Close() if *signatureCheck { if runtime.GOOS == "windows" { logger.Info("Signature check not available for windows yet, skipping...") } else { ex, err := os.Executable() if err != nil { logger.Panic( "Failed to get executable path", zap.Error(err), zap.String("executable", ex), ) } b, err := os.ReadFile(ex) if err != nil { logger.Panic( "Error encountered during signature check – are you running this "+ "from source? (use --signature-check=false)", zap.Error(err), ) } checksum := sha3.Sum256(b) digest, err := os.ReadFile(ex + ".dgst") if err != nil { logger.Fatal("digest file not found", zap.Error(err)) } parts := strings.Split(string(digest), " ") if len(parts) != 2 { logger.Fatal("Invalid digest file format") } digestBytes, err := hex.DecodeString(parts[1][:64]) if err != nil { logger.Fatal("invalid digest file format", zap.Error(err)) } if !bytes.Equal(checksum[:], digestBytes) { logger.Fatal("invalid digest for node") } count := 0 for i := 1; i <= len(config.Signatories); i++ { signatureFile := fmt.Sprintf(ex+".dgst.sig.%d", i) sig, err := os.ReadFile(signatureFile) if err != nil { continue } pubkey, _ := hex.DecodeString(config.Signatories[i-1]) if !ed448.Verify(pubkey, digest, sig, "") { logger.Fatal( "failed signature check for signatory", zap.Int("signatory", i), ) } count++ } if count < ((len(config.Signatories)-4)/2)+((len(config.Signatories)-4)%2) { logger.Fatal("quorum on signatures not met") } logger.Info("signature check passed") } } else { logger.Info("signature check disabled, skipping...") } if *core == 0 { logger = logger.With(zap.String("process", "master")) } else { logger = logger.With(zap.String("process", fmt.Sprintf("worker %d", *core))) } if *memprofile != "" && *core == 0 { go func() { for { time.Sleep(5 * time.Minute) f, err := os.Create(*memprofile) if err != nil { logger.Fatal("failed to create memory profile file", zap.Error(err)) } pprof.WriteHeapProfile(f) f.Close() } }() } if *cpuprofile != "" && *core == 0 { f, err := os.Create(*cpuprofile) if err != nil { logger.Fatal("failed to create cpu profile file", zap.Error(err)) } defer f.Close() pprof.StartCPUProfile(f) defer pprof.StopCPUProfile() } if *pprofServer != "" && *core == 0 { go func() { mux := http.NewServeMux() mux.HandleFunc("/debug/pprof/", npprof.Index) mux.HandleFunc("/debug/pprof/cmdline", npprof.Cmdline) mux.HandleFunc("/debug/pprof/profile", npprof.Profile) mux.HandleFunc("/debug/pprof/symbol", npprof.Symbol) mux.HandleFunc("/debug/pprof/trace", npprof.Trace) logger.Fatal( "Failed to start pprof server", zap.Error(http.ListenAndServe(*pprofServer, mux)), ) }() } if *prometheusServer != "" && *core == 0 { go func() { mux := http.NewServeMux() mux.Handle("/metrics", promhttp.Handler()) logger.Fatal( "Failed to start prometheus server", zap.Error(http.ListenAndServe(*prometheusServer, mux)), ) }() } if *peerId { printPeerID(logger, nodeConfig.P2P) return } if *nodeInfo { config, err := config.LoadConfig(*configDirectory, "", false) if err != nil { logger.Fatal("failed to load config", zap.Error(err)) } printNodeInfo(logger, config) return } if *peerInfo { config, err := config.LoadConfig(*configDirectory, "", false) if err != nil { logger.Fatal("failed to load config", zap.Error(err)) } printPeerInfo(logger, config) return } if *dangerClearPending { db := store.NewPebbleDB(logger, nodeConfig.DB, 0) defer db.Close() consensusStore := store.NewPebbleConsensusStore(db, logger) state, err := consensusStore.GetConsensusState(nil) if err != nil { fmt.Println(err.Error()) return } clockStore := store.NewPebbleClockStore(db, logger) qc, err := clockStore.GetQuorumCertificate(nil, state.FinalizedRank) if err != nil { fmt.Println(err.Error()) return } err = clockStore.DeleteGlobalClockFrameRange( qc.FrameNumber+1, qc.FrameNumber+10000, ) if err != nil { fmt.Println(err.Error()) return } err = clockStore.DeleteQuorumCertificateRange( nil, qc.Rank+1, qc.Rank+10000, ) if err != nil { fmt.Println(err.Error()) return } if state.LatestTimeout != nil { latestTCRank := state.LatestTimeout.Rank err = clockStore.DeleteTimeoutCertificateRange( nil, latestTCRank+1, latestTCRank+10000, latestTCRank, ) if err != nil { fmt.Println(err.Error()) return } } fmt.Println("pending entries cleared") return } if *core == 0 { config.PrintLogo(*char) config.PrintVersion(uint8(*network), *char, *ver) fmt.Println(" ") } if *compactDB { db := store.NewPebbleDB(logger, nodeConfig.DB, uint(*core)) if err := db.CompactAll(); err != nil { logger.Fatal("failed to compact database", zap.Error(err)) } if err := db.Close(); err != nil { logger.Fatal("failed to close database", zap.Error(err)) } return } if *network != 0 { if nodeConfig.P2P.BootstrapPeers[0] == config.BootstrapPeers[0] { logger.Fatal( "node has specified to run outside of mainnet but is still " + "using default bootstrap list. this will fail. exiting.", ) } nodeConfig.P2P.Network = uint8(*network) logger.Warn( "node is operating outside of mainnet – be sure you intended to do this.", ) } if *dhtOnly { done := make(chan os.Signal, 1) signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) dht, err := app.NewDHTNode(logger, nodeConfig, 0) if err != nil { logger.Error("failed to start dht node", zap.Error(err)) } go func() { dht.Start() }() <-done dht.Stop() return } if len(nodeConfig.Engine.DataWorkerP2PMultiaddrs) == 0 { maxProcs, numCPU := runtime.GOMAXPROCS(0), runtime.NumCPU() if maxProcs > numCPU && !nodeConfig.Engine.AllowExcessiveGOMAXPROCS { logger.Fatal( "GOMAXPROCS is set higher than the number of available cpus.", ) } nodeConfig.Engine.DataWorkerCount = qruntime.WorkerCount( nodeConfig.Engine.DataWorkerCount, true, true, ) } else { nodeConfig.Engine.DataWorkerCount = len( nodeConfig.Engine.DataWorkerP2PMultiaddrs, ) } if len(nodeConfig.Engine.DataWorkerP2PMultiaddrs) != len(nodeConfig.Engine.DataWorkerStreamMultiaddrs) { logger.Fatal("mismatch of worker count for p2p and stream multiaddrs") } if *core != 0 { rdebug.SetMemoryLimit(nodeConfig.Engine.DataWorkerMemoryLimit) if *parentProcess == 0 && len(nodeConfig.Engine.DataWorkerP2PMultiaddrs) == 0 { logger.Fatal("parent process pid not specified") } rpcMultiaddr := fmt.Sprintf( nodeConfig.Engine.DataWorkerBaseListenMultiaddr, int(nodeConfig.Engine.DataWorkerBaseStreamPort)+*core-1, ) if len(nodeConfig.Engine.DataWorkerStreamMultiaddrs) != 0 { rpcMultiaddr = nodeConfig.Engine.DataWorkerStreamMultiaddrs[*core-1] } dataWorkerNode, err := app.NewDataWorkerNode( logger, nodeConfig, uint(*core), rpcMultiaddr, *parentProcess, ) if err != nil { logger.Panic("failed to create data worker node", zap.Error(err)) } if *parentProcess != 0 { go monitorParentProcess(*parentProcess, dataWorkerNode.Stop, logger) } done := make(chan os.Signal, 1) signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) quitCh := make(chan struct{}) go func() { err = dataWorkerNode.Start(done, quitCh) if err != nil { logger.Panic("failed to start data worker node", zap.Error(err)) close(quitCh) } }() diskFullCh := make(chan error, 1) monitor := store.NewDiskMonitor( uint(*core), *nodeConfig.DB, logger, diskFullCh, ) ctx, cancel := context.WithCancel(context.Background()) defer cancel() monitor.Start(ctx) loop: for { select { case <-diskFullCh: dataWorkerNode.Stop() case <-quitCh: dataWorkerNode.Stop() break loop } } return } else { totalMemory := int64(memory.TotalMemory()) dataWorkerReservedMemory := int64(0) if len(nodeConfig.Engine.DataWorkerStreamMultiaddrs) == 0 { dataWorkerReservedMemory = nodeConfig.Engine.DataWorkerMemoryLimit * int64( nodeConfig.Engine.DataWorkerCount, ) } switch availableOverhead := totalMemory - dataWorkerReservedMemory; { case totalMemory < dataWorkerReservedMemory: logger.Warn( "the memory allocated to data workers exceeds the total system memory", zap.Int64("total_memory", totalMemory), zap.Int64("data_worker_reserved_memory", dataWorkerReservedMemory), ) logger.Warn("you are at risk of running out of memory during runtime") case availableOverhead < 2*1024*1024*1024: logger.Warn( "the memory available to the node, unallocated to "+ "the data workers, is less than 2gb", zap.Int64("available_overhead", availableOverhead), ) logger.Warn("you are at risk of running out of memory during runtime") default: if _, limit := os.LookupEnv("GOMEMLIMIT"); !limit { rdebug.SetMemoryLimit(availableOverhead * 8 / 10) } if _, explicitGOGC := os.LookupEnv("GOGC"); !explicitGOGC { rdebug.SetGCPercent(10) } } } logger.Info("starting node...") // Create MasterNode for core 0 masterNode, err := app.NewMasterNode(logger, nodeConfig, uint(*core)) if err != nil { logger.Panic("failed to create master node", zap.Error(err)) } // Start the master node ctx, quit := context.WithCancel(context.Background()) errCh := masterNode.Start(ctx) defer masterNode.Stop() if nodeConfig.ListenGRPCMultiaddr != "" { srv, err := rpc.NewRPCServer( nodeConfig, masterNode.GetLogger(), masterNode.GetKeyManager(), masterNode.GetPubSub(), masterNode.GetPeerInfoManager(), masterNode.GetWorkerManager(), masterNode.GetProverRegistry(), masterNode.GetExecutionEngineManager(), ) if err != nil { logger.Panic("failed to new rpc server", zap.Error(err)) } if err := srv.Start(); err != nil { logger.Panic("failed to start rpc server", zap.Error(err)) } defer srv.Stop() } diskFullCh := make(chan error, 1) monitor := store.NewDiskMonitor( uint(*core), *nodeConfig.DB, logger, diskFullCh, ) monitor.Start(ctx) select { case <-diskFullCh: quit() case err := <-errCh: if err != nil { logger.Error("master node error", zap.Error(err)) } quit() } } func getPeerID(logger *zap.Logger, p2pConfig *config.P2PConfig) peer.ID { peerPrivKey, err := hex.DecodeString(p2pConfig.PeerPrivKey) if err != nil { logger.Panic("error to decode peer private key", zap.Error(errors.Wrap(err, "error unmarshaling peerkey"))) } privKey, err := crypto.UnmarshalEd448PrivateKey(peerPrivKey) if err != nil { logger.Panic("error to unmarshal ed448 private key", zap.Error(errors.Wrap(err, "error unmarshaling peerkey"))) } pub := privKey.GetPublic() id, err := peer.IDFromPublicKey(pub) if err != nil { logger.Panic("error to get peer id", zap.Error(err)) } return id } func printPeerID(logger *zap.Logger, p2pConfig *config.P2PConfig) { id := getPeerID(logger, p2pConfig) fmt.Println("Peer ID: " + id.String()) } func printNodeInfo(logger *zap.Logger, cfg *config.Config) { if cfg.ListenGRPCMultiaddr == "" { logger.Fatal("gRPC Not Enabled, Please Configure") } printPeerID(logger, cfg.P2P) conn, err := ConnectToNode(logger, cfg) if err != nil { logger.Fatal( "could not connect to node. if it is still booting, please wait.", zap.Error(err), ) } defer conn.Close() client := protobufs.NewNodeServiceClient(conn) nodeInfo, err := FetchNodeInfo(client) if err != nil { logger.Panic("failed to fetch node info", zap.Error(err)) } if patch := formatPatchNumber(nodeInfo.PatchNumber); patch != "" { fmt.Println("Version:", config.FormatVersion(nodeInfo.Version)+"."+patch) } else { fmt.Println("Version:", config.FormatVersion(nodeInfo.Version)) } fmt.Println("Seniority: " + new(big.Int).SetBytes( nodeInfo.PeerSeniority, ).String()) fmt.Println("Running Workers:", nodeInfo.RunningWorkers) fmt.Println("Active Workers:", nodeInfo.AllocatedWorkers) } func printPeerInfo(logger *zap.Logger, cfg *config.Config) { if cfg.ListenGRPCMultiaddr == "" { logger.Fatal("gRPC Not Enabled, Please Configure") } printPeerID(logger, cfg.P2P) conn, err := ConnectToNode(logger, cfg) if err != nil { logger.Fatal( "could not connect to node. if it is still booting, please wait.", zap.Error(err), ) } defer conn.Close() client := protobufs.NewNodeServiceClient(conn) peerInfo, err := client.GetPeerInfo( context.Background(), &protobufs.GetPeerInfoRequest{}, ) if err != nil { logger.Panic("failed to fetch node info", zap.Error(err)) } for idx, p := range peerInfo.PeerInfo { if p == nil { continue } fmt.Printf("Peer %d:\n", idx+1) if peerID := formatPeerID(p.PeerId); peerID != "" { fmt.Println(" Peer ID:", peerID) } if len(p.Version) >= 3 { if patch := formatPatchNumber(p.PatchNumber); patch != "" { fmt.Println(" Version:", config.FormatVersion(p.Version)+"."+patch) } else { fmt.Println(" Version:", config.FormatVersion(p.Version)) } } if p.Timestamp != 0 { fmt.Println( " Last Seen:", time.UnixMilli(p.Timestamp).UTC().Format(time.RFC3339), ) } printReachability(p.Reachability) printCapabilities(p.Capabilities) if p.LastReceivedFrame != 0 { fmt.Printf( " Last Received Global Frame: %d\n", p.LastReceivedFrame, ) } if p.LastGlobalHeadFrame != 0 { fmt.Printf( " Last Global Head Frame: %d\n", p.LastGlobalHeadFrame, ) } if len(p.PublicKey) > 0 { fmt.Println(" Public Key:", hex.EncodeToString(p.PublicKey)) } if len(p.Signature) > 0 { fmt.Println(" Signature:", hex.EncodeToString(p.Signature)) } if idx < len(peerInfo.PeerInfo)-1 { fmt.Println() } } } func formatPeerID(raw []byte) string { if len(raw) == 0 { return "" } id, err := peer.IDFromBytes(raw) if err != nil { return hex.EncodeToString(raw) } return id.String() } func capabilityDescription(id uint32) string { if name, ok := capabilityLabels[id]; ok { return fmt.Sprintf("%s (0x%08X)", name, id) } return fmt.Sprintf("0x%08X", id) } func formatPatchNumber(raw []byte) string { if len(raw) == 0 { return "" } if len(raw) == 1 { return fmt.Sprintf("%d", raw[0]) } return fmt.Sprintf("0x%s", hex.EncodeToString(raw)) } func nonEmptyStrings(values []string) []string { out := make([]string, 0, len(values)) for _, v := range values { if v != "" { out = append(out, v) } } return out } func printReachability(reach []*protobufs.Reachability) { printed := false for _, r := range reach { if r == nil { continue } filter := hex.EncodeToString(r.Filter) pubsub := nonEmptyStrings(r.PubsubMultiaddrs) stream := nonEmptyStrings(r.StreamMultiaddrs) if filter == "" && len(pubsub) == 0 && len(stream) == 0 { continue } if !printed { fmt.Println(" Reachability:") printed = true } //fmt.Println(" -") if filter != "" { fmt.Println(" - Filter:", filter) } if len(pubsub) > 0 { fmt.Println(" - Pubsub Multiaddrs:") for _, addr := range pubsub { fmt.Println(" - " + addr) } } if len(stream) > 0 { fmt.Println(" - Stream Multiaddrs:") for _, addr := range stream { fmt.Println(" - " + addr) } } } } func printCapabilities(list []*protobufs.Capability) { entries := make([]string, 0, len(list)) for _, capability := range list { if capability == nil { continue } desc := capabilityDescription(capability.ProtocolIdentifier) if len(capability.AdditionalMetadata) > 0 { desc = fmt.Sprintf( "%s (metadata: %s)", desc, hex.EncodeToString(capability.AdditionalMetadata), ) } entries = append(entries, desc) } if len(entries) == 0 { return } fmt.Println(" Capabilities:") for _, entry := range entries { fmt.Println(" - " + entry) } } var defaultGrpcAddress = "localhost:8337" // Connect to the node via GRPC func ConnectToNode(logger *zap.Logger, nodeConfig *config.Config) ( *grpc.ClientConn, error, ) { addr := defaultGrpcAddress if nodeConfig.ListenGRPCMultiaddr != "" { ma, err := multiaddr.NewMultiaddr(nodeConfig.ListenGRPCMultiaddr) if err != nil { logger.Panic("error parsing multiaddr", zap.Error(err)) } _, addr, err = mn.DialArgs(ma) if err != nil { logger.Panic("error getting dial args", zap.Error(err)) } } return qgrpc.DialContext( context.Background(), addr, grpc.WithTransportCredentials( insecure.NewCredentials(), ), grpc.WithDefaultCallOptions( grpc.MaxCallSendMsgSize(600*1024*1024), grpc.MaxCallRecvMsgSize(600*1024*1024), ), ) } type TokenBalance struct { Owned *big.Int UnconfirmedOwned *big.Int } func FetchNodeInfo( client protobufs.NodeServiceClient, ) (*protobufs.NodeInfoResponse, error) { info, err := client.GetNodeInfo( context.Background(), &protobufs.GetNodeInfoRequest{}, ) if err != nil { return nil, errors.Wrap(err, "error getting node info") } return info, nil }