ceremonyclient/node/app/node.go
Cassandra Heart c797d482f9
v2.1.0.5 (#457)
* wip: conversion of hotstuff from flow into Q-oriented model

* bulk of tests

* remaining non-integration tests

* add integration test, adjust log interface, small tweaks

* further adjustments, restore full pacemaker shape

* add component lifecycle management+supervisor

* further refinements

* resolve timeout hanging

* mostly finalized state for consensus

* bulk of engine swap out

* lifecycle-ify most types

* wiring nearly complete, missing needed hooks for proposals

* plugged in, vetting message validation paths

* global consensus, plugged in and verified

* app shard now wired in too

* do not decode empty keys.yml (#456)

* remove obsolete engine.maxFrames config parameter (#454)

* default to Info log level unless debug is enabled (#453)

* respect config's  "logging" section params, remove obsolete single-file logging (#452)

* Trivial code cleanup aiming to reduce Go compiler warnings (#451)

* simplify range traversal

* simplify channel read for single select case

* delete rand.Seed() deprecated in Go 1.20 and no-op as of Go 1.24

* simplify range traversal

* simplify channel read for single select case

* remove redundant type from array

* simplify range traversal

* simplify channel read for single select case

* RC slate

* finalize 2.1.0.5

* Update comments in StrictMonotonicCounter

Fix comment formatting and clarify description.

---------

Co-authored-by: Black Swan <3999712+blacks1ne@users.noreply.github.com>
2025-11-11 05:00:17 -06:00

194 lines
4.5 KiB
Go

package app
import (
"context"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/global"
consensustime "source.quilibrium.com/quilibrium/monorepo/node/consensus/time"
"source.quilibrium.com/quilibrium/monorepo/node/execution/manager"
"source.quilibrium.com/quilibrium/monorepo/types/consensus"
"source.quilibrium.com/quilibrium/monorepo/types/keys"
"source.quilibrium.com/quilibrium/monorepo/types/p2p"
"source.quilibrium.com/quilibrium/monorepo/types/store"
"source.quilibrium.com/quilibrium/monorepo/types/worker"
)
type DHTNode struct {
pubSub p2p.PubSub
quit chan struct{}
}
type MasterNode struct {
logger *zap.Logger
dataProofStore store.DataProofStore
clockStore store.ClockStore
coinStore store.TokenStore
keyManager keys.KeyManager
pubSub p2p.PubSub
peerInfoManager p2p.PeerInfoManager
globalConsensus *global.GlobalConsensusEngine
globalTimeReel *consensustime.GlobalTimeReel
pebble store.KVDB
coreId uint
quit chan struct{}
}
func newDHTNode(
pubSub p2p.PubSub,
) (*DHTNode, error) {
return &DHTNode{
pubSub: pubSub,
quit: make(chan struct{}),
}, nil
}
func newMasterNode(
logger *zap.Logger,
dataProofStore store.DataProofStore,
clockStore store.ClockStore,
coinStore store.TokenStore,
keyManager keys.KeyManager,
pubSub p2p.PubSub,
peerInfoManager p2p.PeerInfoManager,
globalConsensus *global.GlobalConsensusEngine,
globalTimeReel *consensustime.GlobalTimeReel,
pebble store.KVDB,
coreId uint,
) (*MasterNode, error) {
logger = logger.With(zap.String("process", "master"))
return &MasterNode{
logger: logger,
dataProofStore: dataProofStore,
clockStore: clockStore,
coinStore: coinStore,
keyManager: keyManager,
pubSub: pubSub,
peerInfoManager: peerInfoManager,
globalConsensus: globalConsensus,
globalTimeReel: globalTimeReel,
pebble: pebble,
coreId: coreId,
quit: make(chan struct{}),
}, nil
}
func (d *DHTNode) Start() {
<-d.quit
}
func (d *DHTNode) Stop() {
go func() {
d.quit <- struct{}{}
}()
}
func (m *MasterNode) Start(ctx context.Context) <-chan error {
errChan := make(chan error)
// Start the global consensus engine
supervisor, err := lifecycle.NewSupervisor(
[]*lifecycle.Node{
&lifecycle.Node{
Name: "master node",
Factory: func() (lifecycle.Component, error) {
return m.globalConsensus, nil
},
OnError: func(err error) lifecycle.ErrorHandlingBehavior {
return lifecycle.ErrorShouldShutdown
},
},
},
)
if err != nil {
go func() {
errChan <- err
}()
return errChan
}
go func() {
errChan <- supervisor.Start(ctx)
}()
m.logger.Info("master node started", zap.Uint("core_id", m.coreId))
return errChan
}
func (m *MasterNode) Stop() {
m.logger.Info("stopping master node")
// Stop the global consensus engine
if err := <-m.globalConsensus.Stop(false); err != nil {
m.logger.Error("error stopping global consensus", zap.Error(err))
}
defer func() {
// Close database
if m.pebble != nil {
err := m.pebble.Close()
if err != nil {
m.logger.Error("database shut down with errors", zap.Error(err))
} else {
m.logger.Info("database stopped cleanly")
}
}
}()
}
func (m *MasterNode) GetLogger() *zap.Logger {
return m.logger
}
func (m *MasterNode) GetClockStore() store.ClockStore {
return m.clockStore
}
func (m *MasterNode) GetCoinStore() store.TokenStore {
return m.coinStore
}
func (m *MasterNode) GetDataProofStore() store.DataProofStore {
return m.dataProofStore
}
func (m *MasterNode) GetKeyManager() keys.KeyManager {
return m.keyManager
}
func (m *MasterNode) GetPubSub() p2p.PubSub {
return m.pubSub
}
func (m *MasterNode) GetGlobalConsensusEngine() *global.GlobalConsensusEngine {
return m.globalConsensus
}
func (m *MasterNode) GetGlobalTimeReel() *consensustime.GlobalTimeReel {
return m.globalTimeReel
}
func (m *MasterNode) GetCoreId() uint {
return m.coreId
}
func (m *MasterNode) GetPeerInfoManager() p2p.PeerInfoManager {
return m.peerInfoManager
}
func (m *MasterNode) GetWorkerManager() worker.WorkerManager {
return m.globalConsensus.GetWorkerManager()
}
func (m *MasterNode) GetProverRegistry() consensus.ProverRegistry {
return m.globalConsensus.GetProverRegistry()
}
func (
m *MasterNode,
) GetExecutionEngineManager() *manager.ExecutionEngineManager {
return m.globalConsensus.GetExecutionEngineManager()
}