Adaptive reserved cores (#363)

* Add adaptive data worker count

* Use runtime worker count for validation workers

* Reserve cores for networking during transition application

* Automatically set GOGC and GOMEMLIMIT
This commit is contained in:
petricadaipegsp 2024-11-19 23:51:14 +01:00 committed by GitHub
parent d6234aa328
commit bc05a4d7b9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 95 additions and 41 deletions

View File

@ -19,7 +19,9 @@ type EngineConfig struct {
DataWorkerBaseListenPort uint16 `yaml:"dataWorkerBaseListenPort"`
DataWorkerMemoryLimit int64 `yaml:"dataWorkerMemoryLimit"`
// Alternative configuration path to manually specify data workers by multiaddr
DataWorkerMultiaddrs []string `yaml:"dataWorkerMultiaddrs"`
DataWorkerMultiaddrs []string `yaml:"dataWorkerMultiaddrs"`
// Number of data worker processes to spawn.
DataWorkerCount int `yaml:"dataWorkerCount"`
MultisigProverEnrollmentPaths []string `yaml:"multisigProverEnrollmentPaths"`
// Fully verifies execution, omit to enable light prover
FullProver bool `yaml:"fullProver"`

View File

@ -509,12 +509,6 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
go e.runPreMidnightProofWorker()
go func() {
parallelism := e.report.Cores - 1
if parallelism < 3 {
panic("invalid system configuration, minimum system configuration must be four cores")
}
if len(e.config.Engine.DataWorkerMultiaddrs) != 0 {
e.clients, err = e.createParallelDataClientsFromList()
if err != nil {
@ -522,7 +516,7 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
}
} else {
e.clients, err = e.createParallelDataClientsFromBaseMultiaddr(
int(parallelism),
e.config.Engine.DataWorkerCount,
)
if err != nil {
panic(err)

View File

@ -4,7 +4,6 @@ import (
"bytes"
"crypto"
"encoding/binary"
"runtime"
"sync"
"github.com/iden3/go-iden3-crypto/poseidon"
@ -12,6 +11,7 @@ import (
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"source.quilibrium.com/quilibrium/monorepo/node/config"
qruntime "source.quilibrium.com/quilibrium/monorepo/node/internal/runtime"
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
"source.quilibrium.com/quilibrium/monorepo/node/store"
@ -366,7 +366,7 @@ func (a *TokenApplication) ApplyTransitions(
}
wg := sync.WaitGroup{}
throttle := make(chan struct{}, runtime.GOMAXPROCS(-1))
throttle := make(chan struct{}, qruntime.WorkerCount(0, false))
for i, transition := range set {
if transition == nil {
continue

View File

@ -0,0 +1,37 @@
package runtime
import "runtime"
const minimumCores = 3
// WorkerCount returns the number of workers to use CPU bound tasks.
// It will use GOMAXPROCS as a base, and then subtract a number of CPUs
// which are meant to be left for other tasks, such as networking.
func WorkerCount(requested int, validate bool) int {
n := runtime.GOMAXPROCS(0)
if validate {
if n < minimumCores {
panic("invalid system configuration, must have at least 3 cores")
}
if requested > 0 && requested < minimumCores {
panic("invalid worker count, must have at least 3 workers")
}
}
if requested > 0 {
return min(requested, n)
}
switch {
case n == 1:
return 1
case n <= 4:
return n - 1
case n <= 16:
return n - 2
case n <= 32:
return n - 3
case n <= 64:
return n - 4
default:
return n - 5
}
}

View File

@ -28,17 +28,6 @@ import (
"syscall"
"time"
"go.uber.org/zap"
"golang.org/x/crypto/sha3"
"google.golang.org/protobuf/proto"
"source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token"
"source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token/application"
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
"source.quilibrium.com/quilibrium/monorepo/node/store"
"source.quilibrium.com/quilibrium/monorepo/node/tries"
"source.quilibrium.com/quilibrium/monorepo/node/utils"
"github.com/cloudflare/circl/sign/ed448"
"github.com/iden3/go-iden3-crypto/poseidon"
"github.com/libp2p/go-libp2p/core/crypto"
@ -46,11 +35,22 @@ import (
"github.com/pbnjay/memory"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
"golang.org/x/crypto/sha3"
"google.golang.org/protobuf/proto"
"source.quilibrium.com/quilibrium/monorepo/node/app"
"source.quilibrium.com/quilibrium/monorepo/node/config"
qcrypto "source.quilibrium.com/quilibrium/monorepo/node/crypto"
"source.quilibrium.com/quilibrium/monorepo/node/crypto/kzg"
"source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token"
"source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token/application"
qruntime "source.quilibrium.com/quilibrium/monorepo/node/internal/runtime"
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
"source.quilibrium.com/quilibrium/monorepo/node/rpc"
"source.quilibrium.com/quilibrium/monorepo/node/store"
"source.quilibrium.com/quilibrium/monorepo/node/tries"
"source.quilibrium.com/quilibrium/monorepo/node/utils"
)
var (
@ -384,24 +384,25 @@ func main() {
return
}
if nodeConfig.Engine.DataWorkerBaseListenMultiaddr == "" {
nodeConfig.Engine.DataWorkerBaseListenMultiaddr = "/ip4/127.0.0.1/tcp/%d"
}
if nodeConfig.Engine.DataWorkerBaseListenPort == 0 {
nodeConfig.Engine.DataWorkerBaseListenPort = 40000
}
if nodeConfig.Engine.DataWorkerMemoryLimit == 0 {
nodeConfig.Engine.DataWorkerMemoryLimit = 1792 * 1024 * 1024 // 1.75GiB
}
if len(nodeConfig.Engine.DataWorkerMultiaddrs) == 0 {
nodeConfig.Engine.DataWorkerCount = qruntime.WorkerCount(
nodeConfig.Engine.DataWorkerCount, true,
)
}
if *core != 0 {
// runtime.GOMAXPROCS(2)
rdebug.SetGCPercent(9999)
if nodeConfig.Engine.DataWorkerMemoryLimit == 0 {
nodeConfig.Engine.DataWorkerMemoryLimit = 1792 * 1024 * 1024 // 1.75GiB
}
rdebug.SetMemoryLimit(nodeConfig.Engine.DataWorkerMemoryLimit)
if nodeConfig.Engine.DataWorkerBaseListenMultiaddr == "" {
nodeConfig.Engine.DataWorkerBaseListenMultiaddr = "/ip4/127.0.0.1/tcp/%d"
}
if nodeConfig.Engine.DataWorkerBaseListenPort == 0 {
nodeConfig.Engine.DataWorkerBaseListenPort = 40000
}
if *parentProcess == 0 && len(nodeConfig.Engine.DataWorkerMultiaddrs) == 0 {
panic("parent process pid not specified")
}
@ -437,6 +438,27 @@ func main() {
panic(err)
}
return
} else {
totalMemory := int64(memory.TotalMemory())
dataWorkerReservedMemory := int64(0)
if len(nodeConfig.Engine.DataWorkerMultiaddrs) == 0 {
dataWorkerReservedMemory = nodeConfig.Engine.DataWorkerMemoryLimit * int64(nodeConfig.Engine.DataWorkerCount)
}
switch availableOverhead := totalMemory - dataWorkerReservedMemory; {
case totalMemory < dataWorkerReservedMemory:
fmt.Println("The memory allocated to data workers exceeds the total system memory.")
fmt.Println("You are at risk of running out of memory during runtime.")
case availableOverhead < 8*1024*1024*1024:
fmt.Println("The memory available to the node, unallocated to the data workers, is less than 8GiB.")
fmt.Println("You are at risk of running out of memory during runtime.")
default:
if _, explicitGOGC := os.LookupEnv("GOGC"); !explicitGOGC {
rdebug.SetGCPercent(9999)
}
if _, explicitGOMEMLIMIT := os.LookupEnv("GOMEMLIMIT"); !explicitGOMEMLIMIT {
rdebug.SetMemoryLimit(availableOverhead * 8 / 10)
}
}
}
fmt.Println("Loading ceremony state and starting node...")
@ -533,11 +555,10 @@ func spawnDataWorkers(nodeConfig *config.Config) {
panic(err)
}
cores := runtime.GOMAXPROCS(0)
dataWorkers = make([]*exec.Cmd, cores-1)
fmt.Printf("Spawning %d data workers...\n", cores-1)
dataWorkers = make([]*exec.Cmd, nodeConfig.Engine.DataWorkerCount)
fmt.Printf("Spawning %d data workers...\n", nodeConfig.Engine.DataWorkerCount)
for i := 1; i <= cores-1; i++ {
for i := 1; i <= nodeConfig.Engine.DataWorkerCount; i++ {
i := i
go func() {
for {

View File

@ -12,7 +12,6 @@ import (
"math/bits"
"net"
"net/http"
"runtime"
"strconv"
"strings"
"sync"
@ -47,6 +46,7 @@ import (
"source.quilibrium.com/quilibrium/monorepo/node/config"
qgrpc "source.quilibrium.com/quilibrium/monorepo/node/internal/grpc"
"source.quilibrium.com/quilibrium/monorepo/node/internal/observability"
qruntime "source.quilibrium.com/quilibrium/monorepo/node/internal/runtime"
"source.quilibrium.com/quilibrium/monorepo/node/p2p/internal"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
)
@ -1107,7 +1107,7 @@ func withDefaults(p2pConfig *config.P2PConfig) *config.P2PConfig {
p2pConfig.ValidateQueueSize = blossomsub.DefaultValidateQueueSize
}
if p2pConfig.ValidateWorkers == 0 {
p2pConfig.ValidateWorkers = runtime.NumCPU()
p2pConfig.ValidateWorkers = qruntime.WorkerCount(0, false)
}
return p2pConfig
}