From bc05a4d7b96235347499e76e82aec83ac83adb5c Mon Sep 17 00:00:00 2001 From: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> Date: Tue, 19 Nov 2024 23:51:14 +0100 Subject: [PATCH] Adaptive reserved cores (#363) * Add adaptive data worker count * Use runtime worker count for validation workers * Reserve cores for networking during transition application * Automatically set GOGC and GOMEMLIMIT --- node/config/engine.go | 4 +- .../data/data_clock_consensus_engine.go | 8 +- .../token/application/token_application.go | 4 +- node/internal/runtime/runtime.go | 37 +++++++++ node/main.go | 79 ++++++++++++------- node/p2p/blossomsub.go | 4 +- 6 files changed, 95 insertions(+), 41 deletions(-) create mode 100644 node/internal/runtime/runtime.go diff --git a/node/config/engine.go b/node/config/engine.go index 707080f..3e15a6f 100644 --- a/node/config/engine.go +++ b/node/config/engine.go @@ -19,7 +19,9 @@ type EngineConfig struct { DataWorkerBaseListenPort uint16 `yaml:"dataWorkerBaseListenPort"` DataWorkerMemoryLimit int64 `yaml:"dataWorkerMemoryLimit"` // Alternative configuration path to manually specify data workers by multiaddr - DataWorkerMultiaddrs []string `yaml:"dataWorkerMultiaddrs"` + DataWorkerMultiaddrs []string `yaml:"dataWorkerMultiaddrs"` + // Number of data worker processes to spawn. + DataWorkerCount int `yaml:"dataWorkerCount"` MultisigProverEnrollmentPaths []string `yaml:"multisigProverEnrollmentPaths"` // Fully verifies execution, omit to enable light prover FullProver bool `yaml:"fullProver"` diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index 2593437..0fee9ad 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -509,12 +509,6 @@ func (e *DataClockConsensusEngine) Start() <-chan error { go e.runPreMidnightProofWorker() go func() { - parallelism := e.report.Cores - 1 - - if parallelism < 3 { - panic("invalid system configuration, minimum system configuration must be four cores") - } - if len(e.config.Engine.DataWorkerMultiaddrs) != 0 { e.clients, err = e.createParallelDataClientsFromList() if err != nil { @@ -522,7 +516,7 @@ func (e *DataClockConsensusEngine) Start() <-chan error { } } else { e.clients, err = e.createParallelDataClientsFromBaseMultiaddr( - int(parallelism), + e.config.Engine.DataWorkerCount, ) if err != nil { panic(err) diff --git a/node/execution/intrinsics/token/application/token_application.go b/node/execution/intrinsics/token/application/token_application.go index a05bf82..49d1703 100644 --- a/node/execution/intrinsics/token/application/token_application.go +++ b/node/execution/intrinsics/token/application/token_application.go @@ -4,7 +4,6 @@ import ( "bytes" "crypto" "encoding/binary" - "runtime" "sync" "github.com/iden3/go-iden3-crypto/poseidon" @@ -12,6 +11,7 @@ import ( "go.uber.org/zap" "google.golang.org/protobuf/proto" "source.quilibrium.com/quilibrium/monorepo/node/config" + qruntime "source.quilibrium.com/quilibrium/monorepo/node/internal/runtime" "source.quilibrium.com/quilibrium/monorepo/node/p2p" "source.quilibrium.com/quilibrium/monorepo/node/protobufs" "source.quilibrium.com/quilibrium/monorepo/node/store" @@ -366,7 +366,7 @@ func (a *TokenApplication) ApplyTransitions( } wg := sync.WaitGroup{} - throttle := make(chan struct{}, runtime.GOMAXPROCS(-1)) + throttle := make(chan struct{}, qruntime.WorkerCount(0, false)) for i, transition := range set { if transition == nil { continue diff --git a/node/internal/runtime/runtime.go b/node/internal/runtime/runtime.go new file mode 100644 index 0000000..a8aa545 --- /dev/null +++ b/node/internal/runtime/runtime.go @@ -0,0 +1,37 @@ +package runtime + +import "runtime" + +const minimumCores = 3 + +// WorkerCount returns the number of workers to use CPU bound tasks. +// It will use GOMAXPROCS as a base, and then subtract a number of CPUs +// which are meant to be left for other tasks, such as networking. +func WorkerCount(requested int, validate bool) int { + n := runtime.GOMAXPROCS(0) + if validate { + if n < minimumCores { + panic("invalid system configuration, must have at least 3 cores") + } + if requested > 0 && requested < minimumCores { + panic("invalid worker count, must have at least 3 workers") + } + } + if requested > 0 { + return min(requested, n) + } + switch { + case n == 1: + return 1 + case n <= 4: + return n - 1 + case n <= 16: + return n - 2 + case n <= 32: + return n - 3 + case n <= 64: + return n - 4 + default: + return n - 5 + } +} diff --git a/node/main.go b/node/main.go index c8e92c6..0003832 100644 --- a/node/main.go +++ b/node/main.go @@ -28,17 +28,6 @@ import ( "syscall" "time" - "go.uber.org/zap" - "golang.org/x/crypto/sha3" - "google.golang.org/protobuf/proto" - "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token" - "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token/application" - "source.quilibrium.com/quilibrium/monorepo/node/p2p" - "source.quilibrium.com/quilibrium/monorepo/node/protobufs" - "source.quilibrium.com/quilibrium/monorepo/node/store" - "source.quilibrium.com/quilibrium/monorepo/node/tries" - "source.quilibrium.com/quilibrium/monorepo/node/utils" - "github.com/cloudflare/circl/sign/ed448" "github.com/iden3/go-iden3-crypto/poseidon" "github.com/libp2p/go-libp2p/core/crypto" @@ -46,11 +35,22 @@ import ( "github.com/pbnjay/memory" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus/promhttp" + "go.uber.org/zap" + "golang.org/x/crypto/sha3" + "google.golang.org/protobuf/proto" "source.quilibrium.com/quilibrium/monorepo/node/app" "source.quilibrium.com/quilibrium/monorepo/node/config" qcrypto "source.quilibrium.com/quilibrium/monorepo/node/crypto" "source.quilibrium.com/quilibrium/monorepo/node/crypto/kzg" + "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token" + "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token/application" + qruntime "source.quilibrium.com/quilibrium/monorepo/node/internal/runtime" + "source.quilibrium.com/quilibrium/monorepo/node/p2p" + "source.quilibrium.com/quilibrium/monorepo/node/protobufs" "source.quilibrium.com/quilibrium/monorepo/node/rpc" + "source.quilibrium.com/quilibrium/monorepo/node/store" + "source.quilibrium.com/quilibrium/monorepo/node/tries" + "source.quilibrium.com/quilibrium/monorepo/node/utils" ) var ( @@ -384,24 +384,25 @@ func main() { return } + if nodeConfig.Engine.DataWorkerBaseListenMultiaddr == "" { + nodeConfig.Engine.DataWorkerBaseListenMultiaddr = "/ip4/127.0.0.1/tcp/%d" + } + if nodeConfig.Engine.DataWorkerBaseListenPort == 0 { + nodeConfig.Engine.DataWorkerBaseListenPort = 40000 + } + if nodeConfig.Engine.DataWorkerMemoryLimit == 0 { + nodeConfig.Engine.DataWorkerMemoryLimit = 1792 * 1024 * 1024 // 1.75GiB + } + if len(nodeConfig.Engine.DataWorkerMultiaddrs) == 0 { + nodeConfig.Engine.DataWorkerCount = qruntime.WorkerCount( + nodeConfig.Engine.DataWorkerCount, true, + ) + } + if *core != 0 { - // runtime.GOMAXPROCS(2) rdebug.SetGCPercent(9999) - - if nodeConfig.Engine.DataWorkerMemoryLimit == 0 { - nodeConfig.Engine.DataWorkerMemoryLimit = 1792 * 1024 * 1024 // 1.75GiB - } - rdebug.SetMemoryLimit(nodeConfig.Engine.DataWorkerMemoryLimit) - if nodeConfig.Engine.DataWorkerBaseListenMultiaddr == "" { - nodeConfig.Engine.DataWorkerBaseListenMultiaddr = "/ip4/127.0.0.1/tcp/%d" - } - - if nodeConfig.Engine.DataWorkerBaseListenPort == 0 { - nodeConfig.Engine.DataWorkerBaseListenPort = 40000 - } - if *parentProcess == 0 && len(nodeConfig.Engine.DataWorkerMultiaddrs) == 0 { panic("parent process pid not specified") } @@ -437,6 +438,27 @@ func main() { panic(err) } return + } else { + totalMemory := int64(memory.TotalMemory()) + dataWorkerReservedMemory := int64(0) + if len(nodeConfig.Engine.DataWorkerMultiaddrs) == 0 { + dataWorkerReservedMemory = nodeConfig.Engine.DataWorkerMemoryLimit * int64(nodeConfig.Engine.DataWorkerCount) + } + switch availableOverhead := totalMemory - dataWorkerReservedMemory; { + case totalMemory < dataWorkerReservedMemory: + fmt.Println("The memory allocated to data workers exceeds the total system memory.") + fmt.Println("You are at risk of running out of memory during runtime.") + case availableOverhead < 8*1024*1024*1024: + fmt.Println("The memory available to the node, unallocated to the data workers, is less than 8GiB.") + fmt.Println("You are at risk of running out of memory during runtime.") + default: + if _, explicitGOGC := os.LookupEnv("GOGC"); !explicitGOGC { + rdebug.SetGCPercent(9999) + } + if _, explicitGOMEMLIMIT := os.LookupEnv("GOMEMLIMIT"); !explicitGOMEMLIMIT { + rdebug.SetMemoryLimit(availableOverhead * 8 / 10) + } + } } fmt.Println("Loading ceremony state and starting node...") @@ -533,11 +555,10 @@ func spawnDataWorkers(nodeConfig *config.Config) { panic(err) } - cores := runtime.GOMAXPROCS(0) - dataWorkers = make([]*exec.Cmd, cores-1) - fmt.Printf("Spawning %d data workers...\n", cores-1) + dataWorkers = make([]*exec.Cmd, nodeConfig.Engine.DataWorkerCount) + fmt.Printf("Spawning %d data workers...\n", nodeConfig.Engine.DataWorkerCount) - for i := 1; i <= cores-1; i++ { + for i := 1; i <= nodeConfig.Engine.DataWorkerCount; i++ { i := i go func() { for { diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index b41f6c3..d079ab4 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -12,7 +12,6 @@ import ( "math/bits" "net" "net/http" - "runtime" "strconv" "strings" "sync" @@ -47,6 +46,7 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/config" qgrpc "source.quilibrium.com/quilibrium/monorepo/node/internal/grpc" "source.quilibrium.com/quilibrium/monorepo/node/internal/observability" + qruntime "source.quilibrium.com/quilibrium/monorepo/node/internal/runtime" "source.quilibrium.com/quilibrium/monorepo/node/p2p/internal" "source.quilibrium.com/quilibrium/monorepo/node/protobufs" ) @@ -1107,7 +1107,7 @@ func withDefaults(p2pConfig *config.P2PConfig) *config.P2PConfig { p2pConfig.ValidateQueueSize = blossomsub.DefaultValidateQueueSize } if p2pConfig.ValidateWorkers == 0 { - p2pConfig.ValidateWorkers = runtime.NumCPU() + p2pConfig.ValidateWorkers = qruntime.WorkerCount(0, false) } return p2pConfig }