ceremonyclient/node/app/data_worker_node.go
Cassandra Heart c64ab68ce2
v2.1.0.12
2025-11-26 03:22:07 -06:00

152 lines
3.4 KiB
Go

package app
import (
"fmt"
"os"
"sync"
"go.uber.org/zap"
consensustime "source.quilibrium.com/quilibrium/monorepo/node/consensus/time"
"source.quilibrium.com/quilibrium/monorepo/node/datarpc"
"source.quilibrium.com/quilibrium/monorepo/types/crypto"
"source.quilibrium.com/quilibrium/monorepo/types/keys"
"source.quilibrium.com/quilibrium/monorepo/types/store"
)
type DataWorkerNode struct {
logger *zap.Logger
dataProofStore store.DataProofStore
clockStore store.ClockStore
coinStore store.TokenStore
keyManager keys.KeyManager
pebble store.KVDB
coreId uint
ipcServer *datarpc.DataWorkerIPCServer
frameProver crypto.FrameProver
globalTimeReel *consensustime.GlobalTimeReel
parentProcess int
quit chan struct{}
stopOnce sync.Once
}
func newDataWorkerNode(
logger *zap.Logger,
dataProofStore store.DataProofStore,
clockStore store.ClockStore,
coinStore store.TokenStore,
keyManager keys.KeyManager,
pebble store.KVDB,
frameProver crypto.FrameProver,
ipcServer *datarpc.DataWorkerIPCServer,
globalTimeReel *consensustime.GlobalTimeReel,
coreId uint,
parentProcess int,
) (*DataWorkerNode, error) {
logger = logger.With(zap.String("process", fmt.Sprintf("worker %d", coreId)))
return &DataWorkerNode{
logger: logger,
dataProofStore: dataProofStore,
clockStore: clockStore,
coinStore: coinStore,
keyManager: keyManager,
pebble: pebble,
coreId: coreId,
ipcServer: ipcServer,
frameProver: frameProver,
globalTimeReel: globalTimeReel,
parentProcess: parentProcess,
quit: make(chan struct{}),
}, nil
}
func (n *DataWorkerNode) Start(
done chan os.Signal,
quitCh chan struct{},
) error {
go func() {
err := n.ipcServer.Start()
if err != nil {
n.logger.Error(
"error while starting ipc server for core",
zap.Uint64("core", uint64(n.coreId)),
)
n.Stop()
}
}()
n.logger.Info("data worker node started", zap.Uint("core_id", n.coreId))
select {
case <-n.quit:
case <-done:
}
n.ipcServer.Stop()
err := n.pebble.Close()
if err != nil {
n.logger.Error(
"database shut down with errors",
zap.Error(err),
zap.Uint("core_id", n.coreId),
)
} else {
n.logger.Info(
"database stopped cleanly",
zap.Uint("core_id", n.coreId),
)
}
quitCh <- struct{}{}
return nil
}
func (n *DataWorkerNode) Stop() {
n.stopOnce.Do(func() {
n.logger.Info("stopping data worker node")
if n.quit != nil {
close(n.quit)
}
})
}
// GetQuitChannel returns the quit channel for external signaling
func (n *DataWorkerNode) GetQuitChannel() chan struct{} {
return n.quit
}
func (n *DataWorkerNode) GetLogger() *zap.Logger {
return n.logger
}
func (n *DataWorkerNode) GetClockStore() store.ClockStore {
return n.clockStore
}
func (n *DataWorkerNode) GetCoinStore() store.TokenStore {
return n.coinStore
}
func (n *DataWorkerNode) GetDataProofStore() store.DataProofStore {
return n.dataProofStore
}
func (n *DataWorkerNode) GetKeyManager() keys.KeyManager {
return n.keyManager
}
func (n *DataWorkerNode) GetGlobalTimeReel() *consensustime.GlobalTimeReel {
return n.globalTimeReel
}
func (n *DataWorkerNode) GetCoreId() uint {
return n.coreId
}
func (n *DataWorkerNode) GetFrameProver() crypto.FrameProver {
return n.frameProver
}
func (n *DataWorkerNode) GetIPCServer() *datarpc.DataWorkerIPCServer {
return n.ipcServer
}