ceremonyclient/node/consensus/global/factory.go
Cassandra Heart 12996487c3
v2.1.0.18 (#508)
* experiment: reject bad peer info messages

* v2.1.0.18 preview

* add tagged sync

* Add missing hypergraph changes

* small tweaks to sync

* allow local sync, use it for provers with workers

* missing file

* resolve build error

* resolve sync issue, remove raw sync

* resolve deletion promotion bug

* resolve sync abstraction leak from tree deletion changes

* rearrange prover sync

* remove pruning from sync

* restore removed sync flag

* fix: sync, event stream deadlock, heuristic scoring of better shards

* resolve hanging shutdown + pubsub proxy issue

* further bugfixes: sync (restore old leaf sync), pubsub shutdown, merge events

* fix: clean up rust ffi, background coverage events, and sync tweaks

* fix: linking issue for channel, connectivity test aggression, sync regression, join tests

* fix: disjoint sync, improper application of filter

* resolve sync/reel/validation deadlock

* adjust sync to handle no leaf edge cases, multi-path segment traversal

* use simpler sync

* faster, simpler sync with some debug extras

* migration to recalculate

* don't use batch

* square up the roots

* fix nil pointer

* fix: seniority calculation, sync race condition, migration

* make sync dumber

* fix: tree deletion issue

* fix: missing seniority merge request canonical serialization

* address issues from previous commit test

* stale workers should be cleared

* remove missing gap check

* rearrange collect, reduce sync logging noise

* fix: the disjoint leaf/branch sync case

* nuclear option on sync failures

* v2.1.0.18, finalized
2026-02-08 23:51:51 -06:00

184 lines
6.1 KiB
Go

package global
import (
"github.com/pkg/errors"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/config"
"source.quilibrium.com/quilibrium/monorepo/consensus"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/events"
consensustime "source.quilibrium.com/quilibrium/monorepo/node/consensus/time"
"source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/global/compat"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/channel"
"source.quilibrium.com/quilibrium/monorepo/types/compiler"
tconsensus "source.quilibrium.com/quilibrium/monorepo/types/consensus"
"source.quilibrium.com/quilibrium/monorepo/types/crypto"
"source.quilibrium.com/quilibrium/monorepo/types/hypergraph"
"source.quilibrium.com/quilibrium/monorepo/types/keys"
tp2p "source.quilibrium.com/quilibrium/monorepo/types/p2p"
"source.quilibrium.com/quilibrium/monorepo/types/store"
)
// ConsensusEngineFactory provides a factory method for creating properly wired
// GlobalConsensusEngine instances with time reels and event distributors
type ConsensusEngineFactory struct {
logger *zap.Logger
config *config.Config
pubsub tp2p.PubSub
hypergraph hypergraph.Hypergraph
keyManager keys.KeyManager
keyStore store.KeyStore
frameProver crypto.FrameProver
inclusionProver crypto.InclusionProver
signerRegistry tconsensus.SignerRegistry
proverRegistry tconsensus.ProverRegistry
dynamicFeeManager tconsensus.DynamicFeeManager
appFrameValidator tconsensus.AppFrameValidator
frameValidator tconsensus.GlobalFrameValidator
difficultyAdjuster tconsensus.DifficultyAdjuster
rewardIssuance tconsensus.RewardIssuance
clockStore store.ClockStore
inboxStore store.InboxStore
hypergraphStore store.HypergraphStore
shardsStore store.ShardsStore
workerStore store.WorkerStore
consensusStore consensus.ConsensusStore[*protobufs.ProposalVote]
encryptedChannel channel.EncryptedChannel
bulletproofProver crypto.BulletproofProver
verEnc crypto.VerifiableEncryptor
decafConstructor crypto.DecafConstructor
compiler compiler.CircuitCompiler
blsConstructor crypto.BlsConstructor
peerInfoManager tp2p.PeerInfoManager
}
// NewConsensusEngineFactory creates a new factory for consensus engines
func NewConsensusEngineFactory(
logger *zap.Logger,
config *config.Config,
pubsub tp2p.PubSub,
hypergraph hypergraph.Hypergraph,
keyManager keys.KeyManager,
keyStore store.KeyStore,
frameProver crypto.FrameProver,
inclusionProver crypto.InclusionProver,
signerRegistry tconsensus.SignerRegistry,
proverRegistry tconsensus.ProverRegistry,
dynamicFeeManager tconsensus.DynamicFeeManager,
appFrameValidator tconsensus.AppFrameValidator,
frameValidator tconsensus.GlobalFrameValidator,
difficultyAdjuster tconsensus.DifficultyAdjuster,
rewardIssuance tconsensus.RewardIssuance,
clockStore store.ClockStore,
inboxStore store.InboxStore,
hypergraphStore store.HypergraphStore,
shardsStore store.ShardsStore,
workerStore store.WorkerStore,
consensusStore consensus.ConsensusStore[*protobufs.ProposalVote],
encryptedChannel channel.EncryptedChannel,
bulletproofProver crypto.BulletproofProver,
verEnc crypto.VerifiableEncryptor,
decafConstructor crypto.DecafConstructor,
compiler compiler.CircuitCompiler,
blsConstructor crypto.BlsConstructor,
peerInfoManager tp2p.PeerInfoManager,
) *ConsensusEngineFactory {
// Initialize peer seniority data
if err := compat.RebuildPeerSeniority(uint(config.P2P.Network)); err != nil {
panic(errors.Wrap(err, "failed to load peer seniority data"))
}
return &ConsensusEngineFactory{
logger: logger,
config: config,
pubsub: pubsub,
hypergraph: hypergraph,
keyManager: keyManager,
keyStore: keyStore,
frameProver: frameProver,
inclusionProver: inclusionProver,
signerRegistry: signerRegistry,
proverRegistry: proverRegistry,
dynamicFeeManager: dynamicFeeManager,
appFrameValidator: appFrameValidator,
frameValidator: frameValidator,
difficultyAdjuster: difficultyAdjuster,
rewardIssuance: rewardIssuance,
clockStore: clockStore,
inboxStore: inboxStore,
hypergraphStore: hypergraphStore,
shardsStore: shardsStore,
workerStore: workerStore,
consensusStore: consensusStore,
encryptedChannel: encryptedChannel,
bulletproofProver: bulletproofProver,
verEnc: verEnc,
decafConstructor: decafConstructor,
compiler: compiler,
blsConstructor: blsConstructor,
peerInfoManager: peerInfoManager,
}
}
// CreateGlobalConsensusEngine creates a new GlobalConsensusEngine
func (f *ConsensusEngineFactory) CreateGlobalConsensusEngine(
frameTimeMillis int64,
) (*GlobalConsensusEngine, *consensustime.GlobalTimeReel, error) {
// Create the global time reel
globalTimeReel, err := consensustime.NewGlobalTimeReel(
f.logger,
f.proverRegistry,
f.clockStore,
f.config.P2P.Network,
f.config.Engine.ArchiveMode,
)
if err != nil {
return nil, nil, errors.Wrap(err, "create global time reel")
}
// Create the event distributor with channel from the global time reel
eventDistributor := events.NewGlobalEventDistributor(
globalTimeReel.GetEventCh(),
)
// Create the consensus engine with the wired event distributor and time reel
engine, err := NewGlobalConsensusEngine(
f.logger,
f.config,
frameTimeMillis,
f.pubsub,
f.hypergraph,
f.keyManager,
f.keyStore,
f.frameProver,
f.inclusionProver,
f.signerRegistry,
f.proverRegistry,
f.dynamicFeeManager,
f.appFrameValidator,
f.frameValidator,
f.difficultyAdjuster,
f.rewardIssuance,
eventDistributor,
globalTimeReel,
f.clockStore,
f.inboxStore,
f.hypergraphStore,
f.shardsStore,
f.consensusStore,
f.workerStore,
f.encryptedChannel,
f.bulletproofProver,
f.verEnc,
f.decafConstructor,
f.compiler,
f.blsConstructor,
f.peerInfoManager,
)
if err != nil {
return nil, nil, errors.Wrap(err, "create global consensus engine")
}
return engine, globalTimeReel, nil
}