ceremonyclient/node/worker/manager.go
Cassandra Heart dbd95bd9e9
v2.1.0 (#439)
* v2.1.0 [omit consensus and adjacent] - this commit will be amended with the full release after the file copy is complete

* 2.1.0 main node rollup
2025-09-30 02:48:15 -05:00

523 lines
13 KiB
Go

package worker
import (
"context"
"fmt"
"os"
"os/exec"
"sync"
"time"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/config"
"source.quilibrium.com/quilibrium/monorepo/node/store"
typesStore "source.quilibrium.com/quilibrium/monorepo/types/store"
typesWorker "source.quilibrium.com/quilibrium/monorepo/types/worker"
)
type WorkerManager struct {
store typesStore.WorkerStore
logger *zap.Logger
ctx context.Context
cancel context.CancelFunc
mu sync.RWMutex
started bool
config *config.Config
proposeFunc func(coreId uint, filter []byte) error
// When automatic, hold reference to the workers
dataWorkers []*exec.Cmd
// In-memory cache for quick lookups
workersByFilter map[string]uint // filter hash -> worker id
filtersByWorker map[uint][]byte // worker id -> filter
allocatedWorkers map[uint]bool // worker id -> allocated status
}
var _ typesWorker.WorkerManager = (*WorkerManager)(nil)
func NewWorkerManager(
store typesStore.WorkerStore,
logger *zap.Logger,
config *config.Config,
proposeFunc func(coreId uint, filter []byte) error,
) typesWorker.WorkerManager {
return &WorkerManager{
store: store,
logger: logger.Named("worker_manager"),
workersByFilter: make(map[string]uint),
filtersByWorker: make(map[uint][]byte),
allocatedWorkers: make(map[uint]bool),
config: config,
proposeFunc: proposeFunc,
}
}
func (w *WorkerManager) Start(ctx context.Context) error {
w.mu.Lock()
defer w.mu.Unlock()
if w.started {
return errors.New("worker manager already started")
}
w.logger.Info("starting worker manager")
w.ctx, w.cancel = context.WithCancel(ctx)
// Load existing workers from the store
if err := w.loadWorkersFromStore(); err != nil {
w.logger.Error("failed to load workers from store", zap.Error(err))
return errors.Wrap(err, "start")
}
go w.spawnDataWorkers()
w.started = true
w.logger.Info("worker manager started successfully")
return nil
}
func (w *WorkerManager) Stop() error {
w.mu.Lock()
defer w.mu.Unlock()
if !w.started {
return errors.New("worker manager not started")
}
w.logger.Info("stopping worker manager")
if w.cancel != nil {
w.cancel()
}
w.stopDataWorkers()
// Clear in-memory caches
w.workersByFilter = make(map[string]uint)
w.filtersByWorker = make(map[uint][]byte)
w.allocatedWorkers = make(map[uint]bool)
w.started = false
// Reset metrics
activeWorkersGauge.Set(0)
allocatedWorkersGauge.Set(0)
totalStorageGauge.Set(0)
w.logger.Info("worker manager stopped")
return nil
}
func (w *WorkerManager) RegisterWorker(info *typesStore.WorkerInfo) error {
timer := prometheus.NewTimer(
workerOperationDuration.WithLabelValues("register"),
)
defer timer.ObserveDuration()
w.mu.Lock()
defer w.mu.Unlock()
if !w.started {
workerOperationsTotal.WithLabelValues("register", "error").Inc()
return errors.New("worker manager not started")
}
w.logger.Info("registering worker",
zap.Uint("core_id", info.CoreId),
zap.String("listen_addr", info.ListenMultiaddr),
zap.Uint("total_storage", info.TotalStorage),
zap.Bool("automatic", info.Automatic),
)
// Check if worker already exists
existingWorker, err := w.store.GetWorker(info.CoreId)
if err == nil && existingWorker != nil {
workerOperationsTotal.WithLabelValues("register", "error").Inc()
return errors.New("worker already registered")
}
// Save to store
txn, err := w.store.NewTransaction(false)
if err != nil {
workerOperationsTotal.WithLabelValues("register", "error").Inc()
return errors.Wrap(err, "register worker")
}
if err := w.store.PutWorker(txn, info); err != nil {
txn.Abort()
workerOperationsTotal.WithLabelValues("register", "error").Inc()
return errors.Wrap(err, "register worker")
}
if err := txn.Commit(); err != nil {
workerOperationsTotal.WithLabelValues("register", "error").Inc()
return errors.Wrap(err, "register worker")
}
// Update in-memory cache
if len(info.Filter) > 0 {
filterKey := string(info.Filter)
w.workersByFilter[filterKey] = info.CoreId
}
w.filtersByWorker[info.CoreId] = info.Filter
// Update metrics
activeWorkersGauge.Inc()
totalStorageGauge.Add(float64(info.TotalStorage))
workerOperationsTotal.WithLabelValues("register", "success").Inc()
w.logger.Info(
"worker registered successfully",
zap.Uint("core_id", info.CoreId),
)
return nil
}
func (w *WorkerManager) AllocateWorker(coreId uint, filter []byte) error {
timer := prometheus.NewTimer(
workerOperationDuration.WithLabelValues("allocate"),
)
defer timer.ObserveDuration()
w.mu.Lock()
defer w.mu.Unlock()
if !w.started {
workerOperationsTotal.WithLabelValues("allocate", "error").Inc()
return errors.Wrap(
errors.New("worker manager not started"),
"allocate worker",
)
}
w.logger.Info("allocating worker",
zap.Uint("core_id", coreId),
zap.Binary("filter", filter),
)
// Check if worker exists
worker, err := w.store.GetWorker(coreId)
if err != nil {
workerOperationsTotal.WithLabelValues("allocate", "error").Inc()
if errors.Is(err, store.ErrNotFound) {
return errors.Wrap(
errors.New("worker not found"),
"allocate worker",
)
}
return errors.Wrap(err, "allocate worker")
}
// Check if already allocated
if worker.Allocated {
workerOperationsTotal.WithLabelValues("allocate", "error").Inc()
return errors.New("worker already allocated")
}
// Update worker filter if provided
if len(filter) > 0 && string(worker.Filter) != string(filter) {
// Remove old filter mapping from cache
if len(worker.Filter) > 0 {
delete(w.workersByFilter, string(worker.Filter))
}
worker.Filter = filter
}
// Update allocation status
worker.Allocated = true
// Save to store
txn, err := w.store.NewTransaction(false)
if err != nil {
workerOperationsTotal.WithLabelValues("allocate", "error").Inc()
return errors.Wrap(err, "allocate worker")
}
if err := w.store.PutWorker(txn, worker); err != nil {
txn.Abort()
workerOperationsTotal.WithLabelValues("allocate", "error").Inc()
return errors.Wrap(err, "allocate worker")
}
if err := txn.Commit(); err != nil {
workerOperationsTotal.WithLabelValues("allocate", "error").Inc()
return errors.Wrap(err, "allocate worker")
}
// Update cache
if len(worker.Filter) > 0 {
filterKey := string(worker.Filter)
w.workersByFilter[filterKey] = coreId
}
w.filtersByWorker[coreId] = worker.Filter
w.allocatedWorkers[coreId] = true
// Update metrics
allocatedWorkersGauge.Inc()
workerOperationsTotal.WithLabelValues("allocate", "success").Inc()
w.logger.Info("worker allocated successfully", zap.Uint("core_id", coreId))
return nil
}
func (w *WorkerManager) DeallocateWorker(coreId uint) error {
timer := prometheus.NewTimer(
workerOperationDuration.WithLabelValues("deallocate"),
)
defer timer.ObserveDuration()
w.mu.Lock()
defer w.mu.Unlock()
if !w.started {
workerOperationsTotal.WithLabelValues("deallocate", "error").Inc()
return errors.Wrap(
errors.New("worker manager not started"),
"deallocate worker",
)
}
w.logger.Info("deallocating worker", zap.Uint("core_id", coreId))
// Check if worker exists
worker, err := w.store.GetWorker(coreId)
if err != nil {
workerOperationsTotal.WithLabelValues("deallocate", "error").Inc()
if errors.Is(err, store.ErrNotFound) {
return errors.New("worker not found")
}
return errors.Wrap(err, "deallocate worker")
}
// Check if allocated
if !worker.Allocated {
workerOperationsTotal.WithLabelValues("deallocate", "error").Inc()
return errors.Wrap(
errors.New("worker not allocated"),
"deallocate worker",
)
}
// Update allocation status
worker.Allocated = false
// Save to store
txn, err := w.store.NewTransaction(false)
if err != nil {
workerOperationsTotal.WithLabelValues("deallocate", "error").Inc()
return errors.Wrap(err, "deallocate worker")
}
if err := w.store.PutWorker(txn, worker); err != nil {
txn.Abort()
workerOperationsTotal.WithLabelValues("deallocate", "error").Inc()
return errors.Wrap(err, "deallocate worker")
}
if err := txn.Commit(); err != nil {
workerOperationsTotal.WithLabelValues("deallocate", "error").Inc()
return errors.Wrap(err, "deallocate worker")
}
// Mark as deallocated in cache
delete(w.allocatedWorkers, coreId)
// Update metrics
allocatedWorkersGauge.Dec()
workerOperationsTotal.WithLabelValues("deallocate", "success").Inc()
w.logger.Info("worker deallocated successfully", zap.Uint("core_id", coreId))
return nil
}
func (w *WorkerManager) GetWorkerIdByFilter(filter []byte) (uint, error) {
timer := prometheus.NewTimer(
workerOperationDuration.WithLabelValues("lookup"),
)
defer timer.ObserveDuration()
w.mu.RLock()
defer w.mu.RUnlock()
if !w.started {
return 0, errors.Wrap(
errors.New("worker manager not started"),
"get worker id by filter",
)
}
if len(filter) == 0 {
return 0, errors.Wrap(
errors.New("filter cannot be empty"),
"get worker id by filter",
)
}
// Check in-memory cache first
filterKey := string(filter)
if coreId, exists := w.workersByFilter[filterKey]; exists {
return coreId, nil
}
// Fallback to store
worker, err := w.store.GetWorkerByFilter(filter)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return 0, errors.Wrap(
errors.New("no worker found for filter"),
"get worker id by filter",
)
}
return 0, errors.Wrap(err, "get worker id by filter")
}
return worker.CoreId, nil
}
func (w *WorkerManager) GetFilterByWorkerId(coreId uint) ([]byte, error) {
timer := prometheus.NewTimer(
workerOperationDuration.WithLabelValues("lookup"),
)
defer timer.ObserveDuration()
w.mu.RLock()
defer w.mu.RUnlock()
if !w.started {
return nil, errors.Wrap(
errors.New("worker manager not started"),
"get filter by worker id",
)
}
// Check in-memory cache first
if filter, exists := w.filtersByWorker[coreId]; exists {
return filter, nil
}
// Fallback to store
worker, err := w.store.GetWorker(coreId)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return nil, errors.Wrap(
errors.New("worker not found"),
"get filter by worker id",
)
}
return nil, errors.Wrap(err, "get filter by worker id")
}
return worker.Filter, nil
}
func (w *WorkerManager) RangeWorkers() ([]*typesStore.WorkerInfo, error) {
return w.store.RangeWorkers()
}
// ProposeAllocation invokes a proposal function set by the parent of the
// manager.
func (w *WorkerManager) ProposeAllocation(coreId uint, filter []byte) error {
return w.proposeFunc(coreId, filter)
}
// loadWorkersFromStore loads all workers from persistent storage into memory
func (w *WorkerManager) loadWorkersFromStore() error {
workers, err := w.store.RangeWorkers()
if err != nil {
return errors.Wrap(err, "load workers from store")
}
var totalStorage uint64
var allocatedCount int
for _, worker := range workers {
// Update cache
if len(worker.Filter) > 0 {
filterKey := string(worker.Filter)
w.workersByFilter[filterKey] = worker.CoreId
}
w.filtersByWorker[worker.CoreId] = worker.Filter
if worker.Allocated {
w.allocatedWorkers[worker.CoreId] = true
allocatedCount++
}
totalStorage += uint64(worker.TotalStorage)
}
// Update metrics
activeWorkersGauge.Set(float64(len(workers)))
allocatedWorkersGauge.Set(float64(allocatedCount))
totalStorageGauge.Set(float64(totalStorage))
w.logger.Info(fmt.Sprintf("loaded %d workers from store", len(workers)))
return nil
}
func (w *WorkerManager) spawnDataWorkers() {
if len(w.config.Engine.DataWorkerStreamMultiaddrs) != 0 {
w.logger.Warn(
"data workers configured by multiaddr, be sure these are running...",
)
return
}
process, err := os.Executable()
if err != nil {
w.logger.Panic("failed to get executable path", zap.Error(err))
}
w.dataWorkers = make([]*exec.Cmd, w.config.Engine.DataWorkerCount)
w.logger.Info(
"spawning data workers",
zap.Int("count", w.config.Engine.DataWorkerCount),
)
for i := 1; i <= w.config.Engine.DataWorkerCount; i++ {
i := i
go func() {
for {
args := []string{
fmt.Sprintf("--core=%d", i),
fmt.Sprintf("--parent-process=%d", os.Getpid()),
}
args = append(args, os.Args[1:]...)
cmd := exec.Command(process, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stdout
err := cmd.Start()
if err != nil {
w.logger.Panic(
"failed to start data worker",
zap.String("cmd", cmd.String()),
zap.Error(err),
)
}
w.dataWorkers[i-1] = cmd
cmd.Wait()
time.Sleep(25 * time.Millisecond)
w.logger.Info(
"Data worker stopped, restarting...",
zap.Int("worker_number", i),
)
}
}()
}
}
func (w *WorkerManager) stopDataWorkers() {
for i := 0; i < len(w.dataWorkers); i++ {
err := w.dataWorkers[i].Process.Signal(os.Kill)
if err != nil {
w.logger.Info(
"unable to kill worker",
zap.Int("pid", w.dataWorkers[i].Process.Pid),
zap.Error(err),
)
}
}
}