diff --git a/Dockerfile.source b/Dockerfile.source index 298e43e..9f5644c 100644 --- a/Dockerfile.source +++ b/Dockerfile.source @@ -91,8 +91,6 @@ RUN cd emp-tool && sed -i 's/add_library(${NAME} SHARED ${sources})/add_library( RUN cd emp-ot && mkdir build && cd build && cmake .. -DCMAKE_INSTALL_PREFIX=/usr/local && cd .. && make && make install && cd .. -# RUN go mod download - ## Generate Rust bindings for channel WORKDIR /opt/ceremonyclient/channel diff --git a/Dockerfile.sourceavx512 b/Dockerfile.sourceavx512 index 6b61429..963cf62 100644 --- a/Dockerfile.sourceavx512 +++ b/Dockerfile.sourceavx512 @@ -94,8 +94,6 @@ RUN cd emp-tool && sed -i 's/add_library(${NAME} SHARED ${sources})/add_library( RUN cd emp-ot && mkdir build && cd build && cmake .. -DCMAKE_INSTALL_PREFIX=/usr/local && cd .. && make && make install && cd .. -RUN go mod download - ## Generate Rust bindings for channel WORKDIR /opt/ceremonyclient/channel diff --git a/config/version.go b/config/version.go index a5dd651..d19b918 100644 --- a/config/version.go +++ b/config/version.go @@ -43,7 +43,7 @@ func FormatVersion(version []byte) string { } func GetPatchNumber() byte { - return 0x08 + return 0x09 } func GetRCNumber() byte { diff --git a/consensus/consensus_sync.go b/consensus/consensus_sync.go index 9a09180..64e12a4 100644 --- a/consensus/consensus_sync.go +++ b/consensus/consensus_sync.go @@ -4,6 +4,7 @@ import ( "context" "source.quilibrium.com/quilibrium/monorepo/consensus/models" + "source.quilibrium.com/quilibrium/monorepo/types/tries" ) // SyncProvider handles synchronization management @@ -17,4 +18,20 @@ type SyncProvider[StateT models.Unique] interface { ctx context.Context, existing *StateT, ) (<-chan *StateT, <-chan error) + + // Initiates hypersync with a prover. + HyperSync( + ctx context.Context, + prover []byte, + shardKey tries.ShardKey, + ) + + // Enqueues state information to begin synchronization with a given peer. If + // expectedIdentity is provided, may use this to determine if the initial + // frameNumber for which synchronization begins is the correct fork. + AddState( + sourcePeerID []byte, + frameNumber uint64, + expectedIdentity []byte, + ) } diff --git a/consensus/participant/participant.go b/consensus/participant/participant.go index 5371b7e..ee1dc2a 100644 --- a/consensus/participant/participant.go +++ b/consensus/participant/participant.go @@ -44,7 +44,7 @@ func NewParticipant[ pending []*models.SignedProposal[StateT, VoteT], ) (*eventloop.EventLoop[StateT, VoteT], error) { cfg, err := timeout.NewConfig( - 20*time.Second, + 24*time.Second, 3*time.Minute, 1.2, 6, diff --git a/node/app/wire.go b/node/app/wire.go index 286642a..3d6a84d 100644 --- a/node/app/wire.go +++ b/node/app/wire.go @@ -296,6 +296,7 @@ func provideDataWorkerIPC( proverRegistry consensus.ProverRegistry, appConsensusEngineFactory *app.AppConsensusEngineFactory, peerInfoManager tp2p.PeerInfoManager, + pubsub tp2p.PubSub, frameProver crypto.FrameProver, logger *zap.Logger, coreId uint, @@ -307,6 +308,7 @@ func provideDataWorkerIPC( signerRegistry, proverRegistry, peerInfoManager, + pubsub, frameProver, appConsensusEngineFactory, logger, diff --git a/node/app/wire_gen.go b/node/app/wire_gen.go index 825210a..2c3bca6 100644 --- a/node/app/wire_gen.go +++ b/node/app/wire_gen.go @@ -120,7 +120,7 @@ func NewDataWorkerNodeWithProxyPubsub(logger *zap.Logger, config2 *config.Config optimizedProofOfMeaningfulWorkRewardIssuance := reward.NewOptRewardIssuance() doubleRatchetEncryptedChannel := channel.NewDoubleRatchetEncryptedChannel() appConsensusEngineFactory := app.NewAppConsensusEngineFactory(logger, config2, proxyBlossomSub, hypergraph, fileKeyManager, pebbleKeyStore, pebbleClockStore, pebbleInboxStore, pebbleShardsStore, pebbleHypergraphStore, pebbleConsensusStore, frameProver, kzgInclusionProver, decaf448BulletproofProver, mpCitHVerifiableEncryptor, decaf448KeyConstructor, bedlamCompiler, cachedSignerRegistry, proverRegistry, inMemoryPeerInfoManager, dynamicFeeManager, blsAppFrameValidator, blsGlobalFrameValidator, asertDifficultyAdjuster, optimizedProofOfMeaningfulWorkRewardIssuance, bls48581KeyConstructor, doubleRatchetEncryptedChannel) - dataWorkerIPCServer := provideDataWorkerIPC(rpcMultiaddr, config2, cachedSignerRegistry, proverRegistry, appConsensusEngineFactory, inMemoryPeerInfoManager, frameProver, logger, coreId, parentProcess) + dataWorkerIPCServer := provideDataWorkerIPC(rpcMultiaddr, config2, cachedSignerRegistry, proverRegistry, appConsensusEngineFactory, inMemoryPeerInfoManager, proxyBlossomSub, frameProver, logger, coreId, parentProcess) globalTimeReel, err := provideGlobalTimeReel(appConsensusEngineFactory) if err != nil { return nil, err @@ -177,7 +177,7 @@ func NewDataWorkerNodeWithoutProxyPubsub(logger *zap.Logger, config2 *config.Con optimizedProofOfMeaningfulWorkRewardIssuance := reward.NewOptRewardIssuance() doubleRatchetEncryptedChannel := channel.NewDoubleRatchetEncryptedChannel() appConsensusEngineFactory := app.NewAppConsensusEngineFactory(logger, config2, blossomSub, hypergraph, fileKeyManager, pebbleKeyStore, pebbleClockStore, pebbleInboxStore, pebbleShardsStore, pebbleHypergraphStore, pebbleConsensusStore, frameProver, kzgInclusionProver, decaf448BulletproofProver, mpCitHVerifiableEncryptor, decaf448KeyConstructor, bedlamCompiler, cachedSignerRegistry, proverRegistry, inMemoryPeerInfoManager, dynamicFeeManager, blsAppFrameValidator, blsGlobalFrameValidator, asertDifficultyAdjuster, optimizedProofOfMeaningfulWorkRewardIssuance, bls48581KeyConstructor, doubleRatchetEncryptedChannel) - dataWorkerIPCServer := provideDataWorkerIPC(rpcMultiaddr, config2, cachedSignerRegistry, proverRegistry, appConsensusEngineFactory, inMemoryPeerInfoManager, frameProver, logger, coreId, parentProcess) + dataWorkerIPCServer := provideDataWorkerIPC(rpcMultiaddr, config2, cachedSignerRegistry, proverRegistry, appConsensusEngineFactory, inMemoryPeerInfoManager, blossomSub, frameProver, logger, coreId, parentProcess) globalTimeReel, err := provideGlobalTimeReel(appConsensusEngineFactory) if err != nil { return nil, err @@ -361,6 +361,7 @@ func provideDataWorkerIPC( proverRegistry consensus2.ProverRegistry, appConsensusEngineFactory *app.AppConsensusEngineFactory, peerInfoManager p2p2.PeerInfoManager, + pubsub p2p2.PubSub, frameProver crypto.FrameProver, logger *zap.Logger, coreId uint, @@ -370,6 +371,7 @@ func provideDataWorkerIPC( rpcMultiaddr, config2, signerRegistry, proverRegistry, peerInfoManager, + pubsub, frameProver, appConsensusEngineFactory, logger, diff --git a/node/consensus/app/app_consensus_engine.go b/node/consensus/app/app_consensus_engine.go index ccbac93..bc301de 100644 --- a/node/consensus/app/app_consensus_engine.go +++ b/node/consensus/app/app_consensus_engine.go @@ -34,6 +34,7 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/consensus/aggregator" "source.quilibrium.com/quilibrium/monorepo/node/consensus/global" "source.quilibrium.com/quilibrium/monorepo/node/consensus/reward" + qsync "source.quilibrium.com/quilibrium/monorepo/node/consensus/sync" consensustime "source.quilibrium.com/quilibrium/monorepo/node/consensus/time" "source.quilibrium.com/quilibrium/monorepo/node/consensus/tracing" "source.quilibrium.com/quilibrium/monorepo/node/consensus/voting" @@ -129,6 +130,9 @@ type AppConsensusEngine struct { blacklistMap map[string]bool currentRank uint64 alertPublicKey []byte + peerAuthCache map[string]time.Time + peerAuthCacheMu sync.RWMutex + proverAddress []byte // Message queues consensusMessageQueue chan *pb.Message @@ -158,7 +162,7 @@ type AppConsensusEngine struct { timeoutAggregator consensus.TimeoutAggregator[*protobufs.ProposalVote] // Provider implementations - syncProvider *AppSyncProvider + syncProvider *qsync.SyncProvider[*protobufs.AppShardFrame, *protobufs.AppShardProposal] votingProvider *AppVotingProvider leaderProvider *AppLeaderProvider livenessProvider *AppLivenessProvider @@ -266,9 +270,25 @@ func NewAppConsensusEngine( currentDifficulty: config.Engine.Difficulty, blacklistMap: make(map[string]bool), alertPublicKey: []byte{}, + peerAuthCache: make(map[string]time.Time), } - engine.syncProvider = NewAppSyncProvider(engine) + keyId := "q-prover-key" + + key, err := keyManager.GetSigningKey(keyId) + if err != nil { + logger.Error("failed to get key for prover address", zap.Error(err)) + panic(err) + } + + addressBI, err := poseidon.HashBytes(key.Public().([]byte)) + if err != nil { + logger.Error("failed to calculate prover address", zap.Error(err)) + panic(err) + } + + engine.proverAddress = addressBI.FillBytes(make([]byte, 32)) + engine.votingProvider = &AppVotingProvider{engine: engine} engine.leaderProvider = &AppLeaderProvider{engine: engine} engine.livenessProvider = &AppLivenessProvider{engine: engine} @@ -542,6 +562,33 @@ func NewAppConsensusEngine( } engine.forks = forks + engine.syncProvider = qsync.NewSyncProvider[ + *protobufs.AppShardFrame, + *protobufs.AppShardProposal, + ]( + logger, + forks, + proverRegistry, + signerRegistry, + peerInfoManager, + qsync.NewAppSyncClient( + frameProver, + proverRegistry, + blsConstructor, + engine, + config, + appAddress, + ), + hypergraph, + config, + appAddress, + engine.proverAddress, + ) + + // Add sync provider + componentBuilder.AddWorker(engine.syncProvider.Start) + + // Add consensus componentBuilder.AddWorker(func( ctx lifecycle.SignalerContext, ready lifecycle.ReadyFunc, @@ -716,14 +763,8 @@ func (e *AppConsensusEngine) Stop(force bool) <-chan error { } func (e *AppConsensusEngine) GetFrame() *protobufs.AppShardFrame { - // Get the current state from the state machine - frame, _ := e.appTimeReel.GetHead() - - if frame == nil { - return nil - } - - return frame.Clone().(*protobufs.AppShardFrame) + frame, _, _ := e.clockStore.GetLatestShardClockFrame(e.appAddress) + return frame } func (e *AppConsensusEngine) GetDifficulty() uint32 { @@ -940,16 +981,7 @@ func (e *AppConsensusEngine) getPeerID() PeerID { } func (e *AppConsensusEngine) getProverAddress() []byte { - keyId := "q-prover-key" - - key, err := e.keyManager.GetSigningKey(keyId) - if err != nil { - e.logger.Error("failed to get key for prover address", zap.Error(err)) - return []byte{} - } - - addressBI, _ := poseidon.HashBytes(key.Public().([]byte)) - return addressBI.FillBytes(make([]byte, 32)) + return e.proverAddress } func (e *AppConsensusEngine) getAddressFromPublicKey(publicKey []byte) []byte { @@ -1280,7 +1312,7 @@ func (e *AppConsensusEngine) adjustFeeForTraffic(baseFee uint64) uint64 { } // Get the current frame - currentFrame, err := e.appTimeReel.GetHead() + currentFrame, _, err := e.clockStore.GetLatestShardClockFrame(e.appAddress) if err != nil || currentFrame == nil || currentFrame.Header == nil { e.logger.Debug("could not get latest frame for fee adjustment") return baseFee @@ -1293,19 +1325,19 @@ func (e *AppConsensusEngine) adjustFeeForTraffic(baseFee uint64) uint64 { } previousFrameNum := currentFrame.Header.FrameNumber - 1 - var previousFrame *protobufs.AppShardFrame // Try to get the previous frame - frames, err := e.appTimeReel.GetFramesByNumber(previousFrameNum) + previousFrame, _, err := e.clockStore.GetShardClockFrame( + e.appAddress, + previousFrameNum, + false, + ) if err != nil { e.logger.Debug( "could not get prior frame for fee adjustment", zap.Error(err), ) } - if len(frames) > 0 { - previousFrame = frames[0] - } if previousFrame == nil || previousFrame.Header == nil { e.logger.Debug("could not get prior frame for fee adjustment") @@ -1735,6 +1767,35 @@ func (e *AppConsensusEngine) OnOwnProposal( return } + txn, err := e.clockStore.NewTransaction(false) + if err != nil { + e.logger.Error("could not create transaction", zap.Error(err)) + return + } + + if err := e.clockStore.PutProposalVote(txn, *proposal.Vote); err != nil { + e.logger.Error("could not put vote", zap.Error(err)) + txn.Abort() + return + } + + err = e.clockStore.StageShardClockFrame( + []byte(proposal.State.Identifier), + *proposal.State.State, + txn, + ) + if err != nil { + e.logger.Error("could not put frame candidate", zap.Error(err)) + txn.Abort() + return + } + + if err := txn.Commit(); err != nil { + e.logger.Error("could not commit transaction", zap.Error(err)) + txn.Abort() + return + } + e.voteAggregator.AddState(proposal) e.consensusParticipant.SubmitProposal(proposal) @@ -1872,20 +1933,35 @@ func (e *AppConsensusEngine) OnQuorumCertificateTriggeredRankChange( frame, ok := e.frameStore[qc.Identity()] e.frameStoreMu.RUnlock() + if !ok { + frame, err = e.clockStore.GetStagedShardClockFrame( + e.appAddress, + qc.GetFrameNumber(), + []byte(qc.Identity()), + false, + ) + if err == nil { + ok = true + } + } + if !ok { e.logger.Error( "no frame for quorum certificate", zap.Uint64("rank", newRank-1), zap.Uint64("frame_number", qc.GetFrameNumber()), ) - return - } - - frame.Header.PublicKeySignatureBls48581 = aggregateSig - - err = e.appTimeReel.Insert(frame) - if err != nil { - e.logger.Error("could not insert frame into time reel", zap.Error(err)) + current := (*e.forks.FinalizedState().State) + peer, err := e.getRandomProverPeerId() + if err != nil { + e.logger.Error("could not get random peer", zap.Error(err)) + return + } + e.syncProvider.AddState( + []byte(peer), + current.Header.FrameNumber, + []byte(current.Identity()), + ) return } @@ -1927,12 +2003,58 @@ func (e *AppConsensusEngine) OnQuorumCertificateTriggeredRankChange( return } + frame.Header.PublicKeySignatureBls48581 = aggregateSig + + latest, _, err := e.clockStore.GetLatestShardClockFrame(e.appAddress) + if err != nil { + e.logger.Error("could not obtain latest frame", zap.Error(err)) + return + } + if latest.Header.FrameNumber+1 != frame.Header.FrameNumber || + !bytes.Equal([]byte(latest.Identity()), frame.Header.ParentSelector) { + e.logger.Debug( + "not next frame, cannot advance", + zap.Uint64("latest_frame_number", latest.Header.FrameNumber), + zap.Uint64("new_frame_number", frame.Header.FrameNumber), + zap.String( + "latest_frame_selector", + hex.EncodeToString([]byte(latest.Identity())), + ), + zap.String( + "new_frame_number", + hex.EncodeToString(frame.Header.ParentSelector), + ), + ) + return + } + txn, err = e.clockStore.NewTransaction(false) if err != nil { e.logger.Error("could not create transaction", zap.Error(err)) return } + if err := e.materialize( + txn, + frame, + ); err != nil { + _ = txn.Abort() + e.logger.Error("could not materialize frame requests", zap.Error(err)) + return + } + if err := e.clockStore.CommitShardClockFrame( + e.appAddress, + frame.GetFrameNumber(), + []byte(frame.Identity()), + []*tries.RollingFrecencyCritbitTrie{}, + txn, + false, + ); err != nil { + _ = txn.Abort() + e.logger.Error("could not put global frame", zap.Error(err)) + return + } + if err := e.clockStore.PutCertifiedAppShardState( &protobufs.AppShardProposal{ State: frame, @@ -2149,7 +2271,10 @@ func (e *AppConsensusEngine) OnRankChange(oldRank uint64, newRank uint64) { } e.provingMessagesMu.Unlock() - commitments, err := e.livenessProvider.Collect(context.Background()) + commitments, err := e.livenessProvider.Collect( + context.Background(), + frame.Header.FrameNumber, + ) if err != nil { e.logger.Error("could not collect commitments", zap.Error(err)) return @@ -2495,52 +2620,55 @@ func (e *AppConsensusEngine) getPendingProposals( *protobufs.AppShardFrame, *protobufs.ProposalVote, ] { - pendingFrames, err := e.clockStore.RangeShardClockFrames( - e.appAddress, - frameNumber, - 0xfffffffffffffffe, - ) + root, _, err := e.clockStore.GetShardClockFrame(e.appAddress, frameNumber, false) if err != nil { panic(err) } - defer pendingFrames.Close() result := []*models.SignedProposal[ *protobufs.AppShardFrame, *protobufs.ProposalVote, ]{} - pendingFrames.First() - if !pendingFrames.Valid() { - return result + e.logger.Debug("getting pending proposals", zap.Uint64("start", frameNumber)) + + startRank := root.Header.Rank + latestQC, err := e.clockStore.GetLatestQuorumCertificate(e.appAddress) + if err != nil { + panic(err) } - value, err := pendingFrames.Value() - if err != nil || value == nil { - return result + endRank := latestQC.Rank + + parent, err := e.clockStore.GetQuorumCertificate(e.appAddress, startRank) + if err != nil { + panic(err) } - previous := value - for pendingFrames.First(); pendingFrames.Valid(); pendingFrames.Next() { - value, err := pendingFrames.Value() - if err != nil || value == nil { - break + for rank := startRank + 1; rank <= endRank; rank++ { + nextQC, err := e.clockStore.GetQuorumCertificate(e.appAddress, rank) + if err != nil { + e.logger.Debug("no qc for rank", zap.Error(err)) + continue } - parent, err := e.clockStore.GetQuorumCertificate( + value, err := e.clockStore.GetStagedShardClockFrame( e.appAddress, - previous.GetRank(), + nextQC.FrameNumber, + []byte(nextQC.Identity()), + false, ) if err != nil { - panic(err) + e.logger.Debug("no frame for qc", zap.Error(err)) + parent = nextQC + continue } - priorTC, _ := e.clockStore.GetTimeoutCertificate( - e.appAddress, - value.GetRank()-1, - ) var priorTCModel models.TimeoutCertificate = nil - if priorTC != nil { - priorTCModel = priorTC + if parent.Rank != rank-1 { + priorTC, _ := e.clockStore.GetTimeoutCertificate(e.appAddress, rank-1) + if priorTC != nil { + priorTCModel = priorTC + } } vote := &protobufs.ProposalVote{ @@ -2569,7 +2697,7 @@ func (e *AppConsensusEngine) getPendingProposals( }, Vote: &vote, }) - previous = value + parent = nextQC } return result } diff --git a/node/consensus/app/consensus_leader_provider.go b/node/consensus/app/consensus_leader_provider.go index 845319f..d1dc2b6 100644 --- a/node/consensus/app/consensus_leader_provider.go +++ b/node/consensus/app/consensus_leader_provider.go @@ -1,7 +1,6 @@ package app import ( - "bytes" "context" "encoding/hex" "time" @@ -66,11 +65,6 @@ func (p *AppLeaderProvider) ProveNextState( filter []byte, priorState models.Identity, ) (**protobufs.AppShardFrame, error) { - timer := prometheus.NewTimer(frameProvingDuration.WithLabelValues( - p.engine.appAddressHex, - )) - defer timer.ObserveDuration() - prior, _, err := p.engine.clockStore.GetLatestShardClockFrame( p.engine.appAddress, ) @@ -100,12 +94,16 @@ func (p *AppLeaderProvider) ProveNextState( if latestQC != nil && latestQC.Identity() == priorState { switch { case prior.Header.Rank < latestQC.GetRank(): + // We should never be in this scenario because the consensus + // implementation's safety rules should forbid it, it'll demand sync + // happen out of band. Nevertheless, we note it so we can find it in + // logs if it _did_ happen. return nil, models.NewNoVoteErrorf( "needs sync: prior rank %d behind latest qc rank %d", prior.Header.Rank, latestQC.GetRank(), ) - case prior.Header.Rank == latestQC.GetRank() && + case prior.Header.FrameNumber == latestQC.GetFrameNumber() && latestQC.Identity() != prior.Identity(): peerID, peerErr := p.engine.getRandomProverPeerId() if peerErr != nil { @@ -145,25 +143,10 @@ func (p *AppLeaderProvider) ProveNextState( ) } - // Get prover index - provers, err := p.engine.proverRegistry.GetActiveProvers(p.engine.appAddress) - if err != nil { - frameProvingTotal.WithLabelValues("error").Inc() - return nil, errors.Wrap(err, "prove next state") - } - - found := false - for _, prover := range provers { - if bytes.Equal(prover.Address, p.engine.getProverAddress()) { - found = true - break - } - } - - if !found { - frameProvingTotal.WithLabelValues("error").Inc() - return nil, models.NewNoVoteErrorf("not a prover") - } + timer := prometheus.NewTimer(frameProvingDuration.WithLabelValues( + p.engine.appAddressHex, + )) + defer timer.ObserveDuration() // Get collected messages to include in frame p.engine.provingMessagesMu.Lock() diff --git a/node/consensus/app/consensus_liveness_provider.go b/node/consensus/app/consensus_liveness_provider.go index db6def8..7adecfc 100644 --- a/node/consensus/app/consensus_liveness_provider.go +++ b/node/consensus/app/consensus_liveness_provider.go @@ -16,6 +16,7 @@ type AppLivenessProvider struct { func (p *AppLivenessProvider) Collect( ctx context.Context, + frameNumber uint64, ) (CollectedCommitments, error) { if p.engine.GetFrame() == nil { return CollectedCommitments{}, errors.Wrap( @@ -48,8 +49,6 @@ func (p *AppLivenessProvider) Collect( p.engine.pendingMessages = []*protobufs.Message{} p.engine.pendingMessagesMu.Unlock() - frameNumber := uint64(p.engine.GetFrame().Header.FrameNumber) + 1 - txMap := map[string][][]byte{} for i, message := range slices.Concat(mixnetMessages, pendingMessages) { lockedAddrs, err := p.validateAndLockMessage(frameNumber, i, message) diff --git a/node/consensus/app/consensus_sync_provider.go b/node/consensus/app/consensus_sync_provider.go deleted file mode 100644 index a138968..0000000 --- a/node/consensus/app/consensus_sync_provider.go +++ /dev/null @@ -1,873 +0,0 @@ -package app - -import ( - "bufio" - "bytes" - "context" - "crypto/sha256" - "encoding/hex" - "fmt" - "io" - "math/big" - "net/http" - "os" - "path" - "path/filepath" - "slices" - "strings" - "time" - - "github.com/libp2p/go-libp2p/core/crypto" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multiaddr" - mn "github.com/multiformats/go-multiaddr/net" - "github.com/pkg/errors" - "go.uber.org/zap" - "google.golang.org/grpc" - "source.quilibrium.com/quilibrium/monorepo/lifecycle" - "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token" - "source.quilibrium.com/quilibrium/monorepo/node/internal/frametime" - "source.quilibrium.com/quilibrium/monorepo/node/p2p" - "source.quilibrium.com/quilibrium/monorepo/protobufs" - "source.quilibrium.com/quilibrium/monorepo/types/channel" - "source.quilibrium.com/quilibrium/monorepo/types/tries" - up2p "source.quilibrium.com/quilibrium/monorepo/utils/p2p" -) - -const defaultStateQueueCapacity = 10 - -type syncRequest struct { - frameNumber uint64 - peerId []byte - identity []byte -} - -// AppSyncProvider implements SyncProvider -type AppSyncProvider struct { - // TODO(2.1.1+): Refactor out direct use of engine - engine *AppConsensusEngine - queuedStates chan syncRequest -} - -func NewAppSyncProvider( - engine *AppConsensusEngine, -) *AppSyncProvider { - return &AppSyncProvider{ - engine: engine, - queuedStates: make(chan syncRequest, defaultStateQueueCapacity), - } -} - -func (p *AppSyncProvider) Start( - ctx lifecycle.SignalerContext, - ready lifecycle.ReadyFunc, -) { - ready() - for { - select { - case <-ctx.Done(): - return - case request := <-p.queuedStates: - finalized := p.engine.forks.FinalizedState() - if request.frameNumber <= - (*p.engine.forks.FinalizedState().State).Header.FrameNumber { - continue - } - p.engine.logger.Info( - "synchronizing with peer", - zap.String("peer", peer.ID(request.peerId).String()), - zap.Uint64("finalized_rank", finalized.Rank), - zap.Uint64("peer_frame", request.frameNumber), - ) - p.processState( - ctx, - request.frameNumber, - request.peerId, - request.identity, - ) - case <-time.After(10 * time.Minute): - // If we got here, it means we hit a pretty strong halt. This is an act of - // last resort to get everyone re-aligned: - head, err := p.engine.appTimeReel.GetHead() - if err != nil { - p.engine.logger.Error( - "head frame not found for time reel", - zap.Error(err), - ) - continue - } - - if time.UnixMilli(head.Header.Timestamp).Before( - time.Now().Add(-10 * time.Minute), - ) { - peer, err := p.engine.getRandomProverPeerId() - if err != nil { - p.engine.logger.Error("could not get random prover", zap.Error(err)) - continue - } - - p.processState( - ctx, - head.Header.FrameNumber, - []byte(peer), - []byte(head.Identity()), - ) - } - } - } -} - -func (p *AppSyncProvider) processState( - ctx context.Context, - frameNumber uint64, - peerID []byte, - identity []byte, -) { - err := p.syncWithPeer( - ctx, - frameNumber, - peerID, - identity, - ) - if err != nil { - p.engine.logger.Error("could not sync with peer", zap.Error(err)) - } -} - -func (p *AppSyncProvider) Synchronize( - existing **protobufs.AppShardFrame, - ctx context.Context, -) (<-chan **protobufs.AppShardFrame, <-chan error) { - dataCh := make(chan **protobufs.AppShardFrame, 1) - errCh := make(chan error, 1) - - go func() { - defer close(dataCh) - defer close(errCh) - defer func() { - if r := recover(); r != nil { - errCh <- errors.Wrap( - errors.New(fmt.Sprintf("fatal error encountered: %+v", r)), - "synchronize", - ) - } - }() - - // Check if we have a current frame - p.engine.frameStoreMu.RLock() - hasFrame := len(p.engine.frameStore) > 0 - p.engine.frameStoreMu.RUnlock() - - if !hasFrame { - errCh <- errors.New("no frame") - return - } - - peerCount := p.engine.pubsub.GetPeerstoreCount() - requiredPeers := p.engine.config.Engine.MinimumPeersRequired - if peerCount < requiredPeers { - p.engine.logger.Info( - "waiting for minimum peers", - zap.Int("current", peerCount), - zap.Int("required", requiredPeers), - ) - - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - - waitPeers: - for { - select { - case <-ctx.Done(): - errCh <- errors.Wrap( - ctx.Err(), - "synchronize cancelled while waiting for peers", - ) - return - case <-ticker.C: - peerCount = p.engine.pubsub.GetPeerstoreCount() - if peerCount >= requiredPeers { - p.engine.logger.Info( - "minimum peers reached", - zap.Int("peers", peerCount), - ) - break waitPeers - } - } - } - } - - if peerCount < int(p.engine.minimumProvers()) { - errCh <- errors.Wrap( - errors.New("minimum provers not reached"), - "synchronize", - ) - return - } - - // We have frames, return the latest one - p.engine.frameStoreMu.RLock() - var latestFrame *protobufs.AppShardFrame - var maxFrameNumber uint64 - for _, frame := range p.engine.frameStore { - if frame.Header != nil && frame.Header.FrameNumber > maxFrameNumber { - maxFrameNumber = frame.Header.FrameNumber - latestFrame = frame - } - } - p.engine.frameStoreMu.RUnlock() - - if latestFrame != nil { - bits := up2p.GetBloomFilterIndices(p.engine.appAddress, 256, 3) - l2 := make([]byte, 32) - copy(l2, p.engine.appAddress[:min(len(p.engine.appAddress), 32)]) - - shardKey := tries.ShardKey{ - L1: [3]byte(bits), - L2: [32]byte(l2), - } - - shouldHypersync := false - comm, err := p.engine.hypergraph.GetShardCommits( - latestFrame.Header.FrameNumber, - p.engine.appAddress, - ) - if err != nil { - p.engine.logger.Error("could not get commits", zap.Error(err)) - } else { - for i, c := range comm { - if !bytes.Equal(c, latestFrame.Header.StateRoots[i]) { - shouldHypersync = true - break - } - } - - if shouldHypersync { - p.hyperSyncWithProver(latestFrame.Header.Prover, shardKey) - } - } - } - - // TODO(2.1.1): remove this - if p.engine.config.P2P.Network == 0 && - bytes.Equal(p.engine.appAddress[:32], token.QUIL_TOKEN_ADDRESS[:]) { - // Empty, candidate for snapshot reload - if p.engine.hypergraph.GetSize(nil, nil).Cmp(big.NewInt(0)) == 0 { - config := p.engine.config.DB - cfgPath := config.Path - coreId := p.engine.coreId - if coreId > 0 && len(config.WorkerPaths) > int(coreId-1) { - cfgPath = config.WorkerPaths[coreId-1] - } else if coreId > 0 { - cfgPath = fmt.Sprintf(config.WorkerPathPrefix, coreId) - } - err := p.downloadSnapshot( - cfgPath, - p.engine.config.P2P.Network, - p.engine.appAddress, - ) - if err != nil { - p.engine.logger.Warn( - "could not perform snapshot reload", - zap.Error(err), - ) - } - } - } - - err := p.syncWithMesh(ctx) - if err != nil { - if latestFrame != nil { - dataCh <- &latestFrame - } else if existing != nil { - dataCh <- existing - } - errCh <- err - return - } - - if latestFrame != nil { - p.engine.logger.Info("returning latest frame") - dataCh <- &latestFrame - } else if existing != nil { - p.engine.logger.Info("returning existing frame") - dataCh <- existing - } - - syncStatusCheck.WithLabelValues(p.engine.appAddressHex, "synced").Inc() - - errCh <- nil - }() - - return dataCh, errCh -} - -func (p *AppSyncProvider) syncWithMesh(ctx context.Context) error { - p.engine.logger.Info("synchronizing with peers") - - latest, err := p.engine.appTimeReel.GetHead() - if err != nil { - return errors.Wrap(err, "sync") - } - - peers, err := p.engine.proverRegistry.GetActiveProvers(p.engine.appAddress) - if len(peers) <= 1 || err != nil { - p.engine.logger.Info("no peers to sync from") - return nil - } - - for _, candidate := range peers { - if bytes.Equal(candidate.Address, p.engine.getProverAddress()) { - continue - } - - registry, err := p.engine.keyStore.GetKeyRegistryByProver( - candidate.Address, - ) - if err != nil { - continue - } - - if registry.IdentityKey == nil || registry.IdentityKey.KeyValue == nil { - continue - } - - pub, err := crypto.UnmarshalEd448PublicKey(registry.IdentityKey.KeyValue) - if err != nil { - p.engine.logger.Warn("error unmarshaling identity key", zap.Error(err)) - continue - } - - peerID, err := peer.IDFromPublicKey(pub) - if err != nil { - p.engine.logger.Warn("error deriving peer id", zap.Error(err)) - continue - } - - head, err := p.engine.appTimeReel.GetHead() - if err != nil { - return errors.Wrap(err, "sync") - } - - if latest.Header.FrameNumber < head.Header.FrameNumber { - latest = head - } - - err = p.syncWithPeer( - ctx, - latest.Header.FrameNumber, - []byte(peerID), - []byte(latest.Identity()), - ) - if err != nil { - p.engine.logger.Debug("error syncing frame", zap.Error(err)) - } - } - - p.engine.logger.Info( - "returning leader frame", - zap.Uint64("frame_number", latest.Header.FrameNumber), - zap.Duration("frame_age", frametime.AppFrameSince(latest)), - ) - - return nil -} - -func (p *AppSyncProvider) syncWithPeer( - ctx context.Context, - frameNumber uint64, - peerId []byte, - expectedIdentity []byte, -) error { - p.engine.logger.Info( - "polling peer for new frames", - zap.String("peer_id", peer.ID(peerId).String()), - zap.Uint64("current_frame", frameNumber), - ) - - info := p.engine.peerInfoManager.GetPeerInfo(peerId) - if info == nil { - p.engine.logger.Info( - "no peer info known yet, skipping sync", - zap.String("peer", peer.ID(peerId).String()), - ) - return nil - } - if len(info.Reachability) == 0 { - p.engine.logger.Info( - "no reachability info known yet, skipping sync", - zap.String("peer", peer.ID(peerId).String()), - ) - return nil - } - syncTimeout := p.engine.config.Engine.SyncTimeout - for _, reachability := range info.Reachability { - if !bytes.Equal(reachability.Filter, p.engine.appAddress) { - continue - } - for _, s := range reachability.StreamMultiaddrs { - creds, err := p2p.NewPeerAuthenticator( - p.engine.logger, - p.engine.config.P2P, - nil, - nil, - nil, - nil, - [][]byte{[]byte(peerId)}, - map[string]channel.AllowedPeerPolicyType{}, - map[string]channel.AllowedPeerPolicyType{}, - ).CreateClientTLSCredentials([]byte(peerId)) - if err != nil { - return errors.Wrap(err, "sync") - } - - ma, err := multiaddr.StringCast(s) - if err != nil { - return errors.Wrap(err, "sync") - } - mga, err := mn.ToNetAddr(ma) - if err != nil { - return errors.Wrap(err, "sync") - } - cc, err := grpc.NewClient( - mga.String(), - grpc.WithTransportCredentials(creds), - ) - - if err != nil { - p.engine.logger.Debug( - "could not establish direct channel, trying next multiaddr", - zap.String("peer", peer.ID(peerId).String()), - zap.String("multiaddr", ma.String()), - - zap.Error(err), - ) - continue - } - - defer func() { - if err := cc.Close(); err != nil { - p.engine.logger.Error( - "error while closing connection", - zap.Error(err), - ) - } - }() - - client := protobufs.NewAppShardServiceClient(cc) - - inner: - for { - getCtx, cancelGet := context.WithTimeout(ctx, syncTimeout) - response, err := client.GetAppShardProposal( - getCtx, - &protobufs.GetAppShardProposalRequest{ - Filter: p.engine.appAddress, - FrameNumber: frameNumber, - }, - // The message size limits are swapped because the server is the one - // sending the data. - grpc.MaxCallRecvMsgSize( - p.engine.config.Engine.SyncMessageLimits.MaxSendMsgSize, - ), - grpc.MaxCallSendMsgSize( - p.engine.config.Engine.SyncMessageLimits.MaxRecvMsgSize, - ), - ) - cancelGet() - if err != nil { - p.engine.logger.Debug( - "could not get frame, trying next multiaddr", - zap.String("peer", peer.ID(peerId).String()), - zap.String("multiaddr", ma.String()), - zap.Error(err), - ) - break inner - } - - if response == nil { - p.engine.logger.Debug( - "received no response from peer", - zap.String("peer", peer.ID(peerId).String()), - zap.String("multiaddr", ma.String()), - zap.Error(err), - ) - break inner - } - if response.Proposal == nil || response.Proposal.State == nil || - response.Proposal.State.Header == nil || - response.Proposal.State.Header.FrameNumber != frameNumber { - p.engine.logger.Debug("received empty response from peer") - return nil - } - if err := response.Proposal.Validate(); err != nil { - p.engine.logger.Debug("received invalid response from peer") - return nil - } - if len(expectedIdentity) != 0 { - if !bytes.Equal( - []byte(response.Proposal.State.Identity()), - expectedIdentity, - ) { - p.engine.logger.Warn( - "aborting sync due to unexpected frame identity", - zap.Uint64("frame_number", frameNumber), - zap.String( - "expected", - hex.EncodeToString(expectedIdentity), - ), - zap.String( - "received", - hex.EncodeToString( - []byte(response.Proposal.State.Identity()), - ), - ), - ) - return errors.New("sync frame identity mismatch") - } - expectedIdentity = nil - } - p.engine.logger.Info( - "received new leading frame", - zap.Uint64( - "frame_number", - response.Proposal.State.Header.FrameNumber, - ), - zap.Duration( - "frame_age", - frametime.AppFrameSince(response.Proposal.State), - ), - ) - provers, err := p.engine.proverRegistry.GetActiveProvers( - p.engine.appAddress, - ) - if err != nil { - p.engine.logger.Error( - "could not obtain active provers", - zap.Error(err), - ) - return err - } - - ids := [][]byte{} - for _, p := range provers { - ids = append(ids, p.Address) - } - if _, err := p.engine.frameProver.VerifyFrameHeader( - response.Proposal.State.Header, - p.engine.blsConstructor, - ids, - ); err != nil { - return errors.Wrap(err, "sync") - } - - p.engine.appShardProposalQueue <- response.Proposal - frameNumber = frameNumber + 1 - } - } - } - - p.engine.logger.Debug( - "failed to complete sync for all known multiaddrs", - zap.String("peer", peer.ID(peerId).String()), - ) - return nil -} - -func (p *AppSyncProvider) AddState( - sourcePeerID []byte, - frameNumber uint64, - expectedIdentity []byte, -) { - // Drop if we're within the threshold - if frameNumber <= - (*p.engine.forks.FinalizedState().State).Header.FrameNumber && - frameNumber != 0 { - p.engine.logger.Debug( - "dropping stale state for sync", - zap.Uint64("frame_requested", frameNumber), - zap.Uint64( - "finalized_frame", - (*p.engine.forks.FinalizedState().State).Header.FrameNumber, - ), - ) - return - } - - // Handle special case: we're at genesis frame on time reel - if frameNumber == 0 { - frameNumber = 1 - expectedIdentity = []byte{} - } - - // Enqueue if we can, otherwise drop it because we'll catch up - select { - case p.queuedStates <- syncRequest{ - frameNumber: frameNumber, - peerId: slices.Clone(sourcePeerID), - identity: slices.Clone(expectedIdentity), - }: - p.engine.logger.Debug( - "enqueued sync request", - zap.String("peer", peer.ID(sourcePeerID).String()), - zap.Uint64("enqueued_frame_number", frameNumber), - ) - default: - p.engine.logger.Debug("no queue capacity, dropping state for sync") - } -} - -func (p *AppSyncProvider) downloadSnapshot( - dbPath string, - network uint8, - lookupKey []byte, -) error { - base := "https://frame-snapshots.quilibrium.com" - keyHex := fmt.Sprintf("%x", lookupKey) - - manifestURL := fmt.Sprintf("%s/%d/%s/manifest", base, network, keyHex) - resp, err := http.Get(manifestURL) - if err != nil { - return errors.Wrap(err, "download snapshot") - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return errors.Wrap( - fmt.Errorf("manifest http status %d", resp.StatusCode), - "download snapshot", - ) - } - - type mfLine struct { - Name string - Hash string // lowercase hex - } - var lines []mfLine - - sc := bufio.NewScanner(resp.Body) - sc.Buffer(make([]byte, 0, 64*1024), 10*1024*1024) // handle large manifests - for sc.Scan() { - raw := strings.TrimSpace(sc.Text()) - if raw == "" || strings.HasPrefix(raw, "#") { - continue - } - fields := strings.Fields(raw) - if len(fields) != 2 { - return errors.Wrap( - fmt.Errorf("invalid manifest line: %q", raw), - "download snapshot", - ) - } - name := fields[0] - hash := strings.ToLower(fields[1]) - // quick sanity check hash looks hex - if _, err := hex.DecodeString(hash); err != nil || len(hash) != 64 { - return errors.Wrap( - fmt.Errorf("invalid sha256 hex in manifest for %s: %q", name, hash), - "download snapshot", - ) - } - lines = append(lines, mfLine{Name: name, Hash: hash}) - } - if err := sc.Err(); err != nil { - return errors.Wrap(err, "download snapshot") - } - if len(lines) == 0 { - return errors.Wrap(errors.New("manifest is empty"), "download snapshot") - } - - snapDir := path.Join(dbPath, "snapshot") - // Start fresh - _ = os.RemoveAll(snapDir) - if err := os.MkdirAll(snapDir, 0o755); err != nil { - return errors.Wrap(err, "download snapshot") - } - - // Download each file with retries + hash verification - for _, entry := range lines { - srcURL := fmt.Sprintf("%s/%d/%s/%s", base, network, keyHex, entry.Name) - dstPath := filepath.Join(snapDir, entry.Name) - - // ensure parent dir exists (manifest may list nested files like CURRENT, - // MANIFEST-xxxx, OPTIONS, *.sst) - if err := os.MkdirAll(filepath.Dir(dstPath), 0o755); err != nil { - return errors.Wrap( - fmt.Errorf("mkdir for %s: %w", dstPath, err), - "download snapshot", - ) - } - - if err := downloadWithRetryAndHash( - srcURL, - dstPath, - entry.Hash, - 5, - ); err != nil { - return errors.Wrap( - fmt.Errorf("downloading %s failed: %w", entry.Name, err), - "download snapshot", - ) - } - } - - return nil -} - -// downloadWithRetryAndHash fetches url, stores in dstPath, verifies -// sha256 == expectedHex, and retries up to retries times. Writes atomically via -// a temporary file. -func downloadWithRetryAndHash( - url, dstPath, expectedHex string, - retries int, -) error { - var lastErr error - for attempt := 1; attempt <= retries; attempt++ { - if err := func() error { - resp, err := http.Get(url) - if err != nil { - return err - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("http status %d", resp.StatusCode) - } - - tmp, err := os.CreateTemp(filepath.Dir(dstPath), ".part-*") - if err != nil { - return err - } - defer func() { - tmp.Close() - _ = os.Remove(tmp.Name()) - }() - - h := sha256.New() - if _, err := io.Copy(io.MultiWriter(tmp, h), resp.Body); err != nil { - return err - } - - sumHex := hex.EncodeToString(h.Sum(nil)) - if !strings.EqualFold(sumHex, expectedHex) { - return fmt.Errorf( - "hash mismatch for %s: expected %s, got %s", - url, - expectedHex, - sumHex, - ) - } - - // fsync to be safe before rename - if err := tmp.Sync(); err != nil { - return err - } - - // atomic replace - if err := os.Rename(tmp.Name(), dstPath); err != nil { - return err - } - return nil - }(); err != nil { - lastErr = err - // simple backoff: 200ms * attempt - time.Sleep(time.Duration(200*attempt) * time.Millisecond) - continue - } - return nil - } - return lastErr -} - -func (p *AppSyncProvider) hyperSyncWithProver( - prover []byte, - shardKey tries.ShardKey, -) { - registry, err := p.engine.signerRegistry.GetKeyRegistryByProver(prover) - if err == nil && registry != nil && registry.IdentityKey != nil { - peerKey := registry.IdentityKey - pubKey, err := crypto.UnmarshalEd448PublicKey(peerKey.KeyValue) - if err == nil { - peerId, err := peer.IDFromPublicKey(pubKey) - if err == nil { - ch, err := p.engine.pubsub.GetDirectChannel( - context.Background(), - []byte(peerId), - "sync", - ) - - if err == nil { - defer ch.Close() - client := protobufs.NewHypergraphComparisonServiceClient(ch) - str, err := client.HyperStream(context.Background()) - if err != nil { - p.engine.logger.Error("error from sync", zap.Error(err)) - } else { - p.hyperSyncVertexAdds(str, shardKey) - p.hyperSyncVertexRemoves(str, shardKey) - p.hyperSyncHyperedgeAdds(str, shardKey) - p.hyperSyncHyperedgeRemoves(str, shardKey) - } - } - } - } - } -} - -func (p *AppSyncProvider) hyperSyncVertexAdds( - str protobufs.HypergraphComparisonService_HyperStreamClient, - shardKey tries.ShardKey, -) { - err := p.engine.hypergraph.Sync( - str, - shardKey, - protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, - ) - if err != nil { - p.engine.logger.Error("error from sync", zap.Error(err)) - } - str.CloseSend() -} - -func (p *AppSyncProvider) hyperSyncVertexRemoves( - str protobufs.HypergraphComparisonService_HyperStreamClient, - shardKey tries.ShardKey, -) { - err := p.engine.hypergraph.Sync( - str, - shardKey, - protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES, - ) - if err != nil { - p.engine.logger.Error("error from sync", zap.Error(err)) - } - str.CloseSend() -} - -func (p *AppSyncProvider) hyperSyncHyperedgeAdds( - str protobufs.HypergraphComparisonService_HyperStreamClient, - shardKey tries.ShardKey, -) { - err := p.engine.hypergraph.Sync( - str, - shardKey, - protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS, - ) - if err != nil { - p.engine.logger.Error("error from sync", zap.Error(err)) - } - str.CloseSend() -} - -func (p *AppSyncProvider) hyperSyncHyperedgeRemoves( - str protobufs.HypergraphComparisonService_HyperStreamClient, - shardKey tries.ShardKey, -) { - err := p.engine.hypergraph.Sync( - str, - shardKey, - protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES, - ) - if err != nil { - p.engine.logger.Error("error from sync", zap.Error(err)) - } - str.CloseSend() -} diff --git a/node/consensus/app/message_processors.go b/node/consensus/app/message_processors.go index 6463c50..1a67fad 100644 --- a/node/consensus/app/message_processors.go +++ b/node/consensus/app/message_processors.go @@ -16,6 +16,7 @@ import ( "source.quilibrium.com/quilibrium/monorepo/lifecycle" "source.quilibrium.com/quilibrium/monorepo/protobufs" "source.quilibrium.com/quilibrium/monorepo/types/crypto" + "source.quilibrium.com/quilibrium/monorepo/types/tries" ) func (e *AppConsensusEngine) processConsensusMessageQueue( @@ -242,7 +243,7 @@ func (e *AppConsensusEngine) handleAppShardProposal( } } - head, err := e.appTimeReel.GetHead() + head, _, err := e.clockStore.GetLatestShardClockFrame(e.appAddress) if err != nil || head == nil || head.Header == nil { e.logger.Debug("could not get shard time reel head", zap.Error(err)) return @@ -256,7 +257,7 @@ func (e *AppConsensusEngine) handleAppShardProposal( return } } - expectedFrame, err := e.appTimeReel.GetHead() + expectedFrame, _, err := e.clockStore.GetLatestShardClockFrame(e.appAddress) if err != nil { e.logger.Error("could not obtain app time reel head", zap.Error(err)) return @@ -489,7 +490,7 @@ func (e *AppConsensusEngine) trySealParentWithChild( return } - head, err := e.appTimeReel.GetHead() + head, _, err := e.clockStore.GetLatestShardClockFrame(e.appAddress) if err != nil { e.logger.Error("error fetching app time reel head", zap.Error(err)) return @@ -563,28 +564,7 @@ func (e *AppConsensusEngine) addCertifiedState( return } - child.State.Header.PublicKeySignatureBls48581 = aggregateSig - - if err := e.appTimeReel.Insert(child.State); err != nil { - e.logger.Error("could not insert frame into app time reel", zap.Error(err)) - return - } - - head, err := e.appTimeReel.GetHead() - if err != nil { - e.logger.Error("could not get app time reel head", zap.Error(err)) - return - } - - if head == nil || head.Header == nil || - !bytes.Equal(child.State.Header.Output, head.Header.Output) { - e.logger.Error( - "app frames not aligned", - zap.String("address", e.appAddressHex), - zap.Uint64("new_frame_number", child.State.Header.FrameNumber), - ) - return - } + parent.State.Header.PublicKeySignatureBls48581 = aggregateSig txn, err = e.clockStore.NewTransaction(false) if err != nil { @@ -592,8 +572,29 @@ func (e *AppConsensusEngine) addCertifiedState( return } + if err := e.materialize( + txn, + parent.State, + ); err != nil { + _ = txn.Abort() + e.logger.Error("could not materialize frame requests", zap.Error(err)) + return + } + if err := e.clockStore.CommitShardClockFrame( + e.appAddress, + parent.State.GetFrameNumber(), + []byte(parent.State.Identity()), + []*tries.RollingFrecencyCritbitTrie{}, + txn, + false, + ); err != nil { + _ = txn.Abort() + e.logger.Error("could not put global frame", zap.Error(err)) + return + } + if err := e.clockStore.PutCertifiedAppShardState( - child, + parent, txn, ); err != nil { e.logger.Error("could not insert certified state", zap.Error(err)) @@ -947,6 +948,16 @@ func (e *AppConsensusEngine) handleProposal(message *pb.Message) { return } + if err := e.clockStore.StageShardClockFrame( + []byte(proposal.State.Identity()), + proposal.State, + txn, + ); err != nil { + e.logger.Error("could not stage clock frame", zap.Error(err)) + txn.Abort() + return + } + if err := txn.Commit(); err != nil { e.logger.Error("could not commit transaction", zap.Error(err)) txn.Abort() @@ -958,6 +969,10 @@ func (e *AppConsensusEngine) handleProposal(message *pb.Message) { proposalProcessedTotal.WithLabelValues(e.appAddressHex, "success").Inc() } +func (e *AppConsensusEngine) AddProposal(proposal *protobufs.AppShardProposal) { + e.appShardProposalQueue <- proposal +} + func (e *AppConsensusEngine) handleVote(message *pb.Message) { timer := prometheus.NewTimer( voteProcessingDuration.WithLabelValues(e.appAddressHex), diff --git a/node/consensus/app/metrics.go b/node/consensus/app/metrics.go index e19fcae..055299c 100644 --- a/node/consensus/app/metrics.go +++ b/node/consensus/app/metrics.go @@ -315,17 +315,6 @@ var ( []string{"app_address", "action"}, // action: "register", "unregister" ) - // Sync status metrics - syncStatusCheck = promauto.NewCounterVec( - prometheus.CounterOpts{ - Namespace: metricsNamespace, - Subsystem: subsystem, - Name: "sync_status_check_total", - Help: "Total number of sync status checks", - }, - []string{"app_address", "result"}, // result: "synced", "syncing" - ) - // Engine state metrics engineState = promauto.NewGaugeVec( prometheus.GaugeOpts{ diff --git a/node/consensus/app/services.go b/node/consensus/app/services.go index 0f93800..ee80ed2 100644 --- a/node/consensus/app/services.go +++ b/node/consensus/app/services.go @@ -3,6 +3,7 @@ package app import ( "bytes" "context" + "time" "github.com/iden3/go-iden3-crypto/poseidon" "github.com/libp2p/go-libp2p/core/peer" @@ -31,7 +32,7 @@ func (e *AppConsensusEngine) GetAppShardFrame( ) var frame *protobufs.AppShardFrame if request.FrameNumber == 0 { - frame, err = e.appTimeReel.GetHead() + frame = *e.forks.FinalizedState().State if frame.Header.FrameNumber == 0 { return nil, errors.Wrap( errors.New("not currently syncable"), @@ -176,6 +177,10 @@ func (e *AppConsensusEngine) authenticateProverFromContext( } if !bytes.Equal(e.pubsub.GetPeerID(), []byte(peerID)) { + if e.peerAuthCacheAllows(peerID) { + return peerID, nil + } + registry, err := e.keyStore.GetKeyRegistry( []byte(peerID), ) @@ -224,7 +229,39 @@ func (e *AppConsensusEngine) authenticateProverFromContext( "invalid peer", ) } + + e.markPeerAuthCache(peerID) } return peerID, nil } + +const appPeerAuthCacheTTL = 10 * time.Second + +func (e *AppConsensusEngine) peerAuthCacheAllows(id peer.ID) bool { + e.peerAuthCacheMu.RLock() + expiry, ok := e.peerAuthCache[string(id)] + e.peerAuthCacheMu.RUnlock() + + if !ok { + return false + } + + if time.Now().After(expiry) { + e.peerAuthCacheMu.Lock() + if current, exists := e.peerAuthCache[string(id)]; exists && + current == expiry { + delete(e.peerAuthCache, string(id)) + } + e.peerAuthCacheMu.Unlock() + return false + } + + return true +} + +func (e *AppConsensusEngine) markPeerAuthCache(id peer.ID) { + e.peerAuthCacheMu.Lock() + e.peerAuthCache[string(id)] = time.Now().Add(appPeerAuthCacheTTL) + e.peerAuthCacheMu.Unlock() +} diff --git a/node/consensus/global/consensus_leader_provider.go b/node/consensus/global/consensus_leader_provider.go index 8f72dcf..75d9afe 100644 --- a/node/consensus/global/consensus_leader_provider.go +++ b/node/consensus/global/consensus_leader_provider.go @@ -71,14 +71,6 @@ func (p *GlobalLeaderProvider) ProveNextState( filter []byte, priorState models.Identity, ) (**protobufs.GlobalFrame, error) { - _, err := p.engine.livenessProvider.Collect(ctx) - if err != nil { - return nil, models.NewNoVoteErrorf("could not collect: %+w", err) - } - - timer := prometheus.NewTimer(frameProvingDuration) - defer timer.ObserveDuration() - prior, err := p.engine.clockStore.GetLatestGlobalClockFrame() if err != nil { frameProvingTotal.WithLabelValues("error").Inc() @@ -104,12 +96,16 @@ func (p *GlobalLeaderProvider) ProveNextState( if latestQC != nil && latestQC.Identity() == priorState { switch { case prior.Header.Rank < latestQC.GetRank(): + // We should never be in this scenario because the consensus + // implementation's safety rules should forbid it, it'll demand sync + // happen out of band. Nevertheless, we note it so we can find it in + // logs if it _did_ happen. return nil, models.NewNoVoteErrorf( "needs sync: prior rank %d behind latest qc rank %d", prior.Header.Rank, latestQC.GetRank(), ) - case prior.Header.Rank == latestQC.GetRank() && + case prior.Header.FrameNumber == latestQC.GetFrameNumber() && latestQC.Identity() != prior.Identity(): peerID, peerErr := p.engine.getRandomProverPeerId() if peerErr != nil { @@ -149,6 +145,14 @@ func (p *GlobalLeaderProvider) ProveNextState( ) } + _, err = p.engine.livenessProvider.Collect(ctx, prior.Header.FrameNumber+1) + if err != nil { + return nil, models.NewNoVoteErrorf("could not collect: %+w", err) + } + + timer := prometheus.NewTimer(frameProvingDuration) + defer timer.ObserveDuration() + // Get prover index provers, err := p.engine.proverRegistry.GetActiveProvers(nil) if err != nil { diff --git a/node/consensus/global/consensus_liveness_provider.go b/node/consensus/global/consensus_liveness_provider.go index 7c6b1f4..1d00932 100644 --- a/node/consensus/global/consensus_liveness_provider.go +++ b/node/consensus/global/consensus_liveness_provider.go @@ -21,6 +21,7 @@ type GlobalLivenessProvider struct { func (p *GlobalLivenessProvider) Collect( ctx context.Context, + frameNumber uint64, ) (GlobalCollectedCommitments, error) { timer := prometheus.NewTimer(shardCommitmentCollectionDuration) defer timer.ObserveDuration() @@ -68,14 +69,6 @@ func (p *GlobalLivenessProvider) Collect( acceptedMessages := []*protobufs.Message{} - frameNumber := uint64(0) - currentFrame, _ := p.engine.globalTimeReel.GetHead() - if currentFrame != nil && currentFrame.Header != nil { - frameNumber = currentFrame.Header.FrameNumber - } - - frameNumber++ - p.engine.logger.Debug( "collected messages, validating", zap.Int("message_count", len(messages)), diff --git a/node/consensus/global/consensus_sync_provider.go b/node/consensus/global/consensus_sync_provider.go deleted file mode 100644 index 5abae3f..0000000 --- a/node/consensus/global/consensus_sync_provider.go +++ /dev/null @@ -1,576 +0,0 @@ -package global - -import ( - "bytes" - "context" - "encoding/hex" - "fmt" - "slices" - "time" - - "github.com/libp2p/go-libp2p/core/crypto" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multiaddr" - mn "github.com/multiformats/go-multiaddr/net" - "github.com/pkg/errors" - "go.uber.org/zap" - "google.golang.org/grpc" - "source.quilibrium.com/quilibrium/monorepo/lifecycle" - "source.quilibrium.com/quilibrium/monorepo/node/internal/frametime" - "source.quilibrium.com/quilibrium/monorepo/node/p2p" - "source.quilibrium.com/quilibrium/monorepo/protobufs" - "source.quilibrium.com/quilibrium/monorepo/types/channel" - "source.quilibrium.com/quilibrium/monorepo/types/tries" -) - -const defaultStateQueueCapacity = 10 - -type syncRequest struct { - frameNumber uint64 - peerId []byte - identity []byte -} - -// GlobalSyncProvider implements SyncProvider -type GlobalSyncProvider struct { - // TODO(2.1.1+): Refactor out direct use of engine - engine *GlobalConsensusEngine - queuedStates chan syncRequest -} - -func NewGlobalSyncProvider( - engine *GlobalConsensusEngine, -) *GlobalSyncProvider { - return &GlobalSyncProvider{ - engine: engine, - queuedStates: make(chan syncRequest, defaultStateQueueCapacity), - } -} - -func (p *GlobalSyncProvider) Start( - ctx lifecycle.SignalerContext, - ready lifecycle.ReadyFunc, -) { - ready() - for { - select { - case <-ctx.Done(): - return - case request := <-p.queuedStates: - finalized := p.engine.forks.FinalizedState() - if request.frameNumber <= - (*p.engine.forks.FinalizedState().State).Header.FrameNumber { - continue - } - p.engine.logger.Info( - "synchronizing with peer", - zap.String("peer", peer.ID(request.peerId).String()), - zap.Uint64("finalized_rank", finalized.Rank), - zap.Uint64("peer_frame", request.frameNumber), - ) - p.processState( - ctx, - request.frameNumber, - request.peerId, - request.identity, - ) - } - } -} - -func (p *GlobalSyncProvider) processState( - ctx context.Context, - frameNumber uint64, - peerID []byte, - identity []byte, -) { - err := p.syncWithPeer( - ctx, - frameNumber, - peerID, - identity, - ) - if err != nil { - p.engine.logger.Error("could not sync with peer", zap.Error(err)) - } -} - -func (p *GlobalSyncProvider) Synchronize( - existing **protobufs.GlobalFrame, - ctx context.Context, -) (<-chan **protobufs.GlobalFrame, <-chan error) { - dataCh := make(chan **protobufs.GlobalFrame, 1) - errCh := make(chan error, 1) - - go func() { - defer func() { - if r := recover(); r != nil { - if errCh != nil { - errCh <- errors.New(fmt.Sprintf("fatal error encountered: %+v", r)) - } - } - }() - defer close(dataCh) - defer close(errCh) - - head, err := p.engine.globalTimeReel.GetHead() - hasFrame := head != nil && err == nil - - peerCount := p.engine.pubsub.GetPeerstoreCount() - - if peerCount < p.engine.config.Engine.MinimumPeersRequired { - p.engine.logger.Info( - "waiting for minimum peers", - zap.Int("current", peerCount), - zap.Int("required", p.engine.config.Engine.MinimumPeersRequired), - ) - - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - - loop: - for { - select { - case <-ctx.Done(): - errCh <- errors.Wrap( - ctx.Err(), - "synchronize cancelled while waiting for peers", - ) - return - case <-ticker.C: - peerCount = p.engine.pubsub.GetPeerstoreCount() - if peerCount >= p.engine.config.Engine.MinimumPeersRequired { - p.engine.logger.Info( - "minimum peers reached", - zap.Int("peers", peerCount), - ) - break loop - } - } - if peerCount >= p.engine.config.Engine.MinimumPeersRequired { - break - } - } - } - - if !hasFrame { - errCh <- errors.New("no frame") - return - } - - err = p.syncWithMesh(ctx) - if err != nil { - dataCh <- existing - errCh <- err - return - } - - if hasFrame { - // Retrieve full frame from store - frameID := p.engine.globalTimeReel.ComputeFrameID(head) - p.engine.frameStoreMu.RLock() - fullFrame, exists := p.engine.frameStore[frameID] - p.engine.frameStoreMu.RUnlock() - - if exists { - dataCh <- &fullFrame - } else if existing != nil { - dataCh <- existing - } - } - - syncStatusCheck.WithLabelValues("synced").Inc() - errCh <- nil - }() - - return dataCh, errCh -} - -func (p *GlobalSyncProvider) syncWithMesh(ctx context.Context) error { - p.engine.logger.Info("synchronizing with peers") - - latest, err := p.engine.globalTimeReel.GetHead() - if err != nil { - return errors.Wrap(err, "sync") - } - - peers, err := p.engine.proverRegistry.GetActiveProvers(nil) - if len(peers) <= 1 || err != nil { - return nil - } - - for _, candidate := range peers { - if bytes.Equal(candidate.Address, p.engine.getProverAddress()) { - continue - } - - registry, err := p.engine.keyStore.GetKeyRegistryByProver( - candidate.Address, - ) - if err != nil { - continue - } - - if registry.IdentityKey == nil || registry.IdentityKey.KeyValue == nil { - continue - } - - pub, err := crypto.UnmarshalEd448PublicKey(registry.IdentityKey.KeyValue) - if err != nil { - p.engine.logger.Warn("error unmarshaling identity key", zap.Error(err)) - continue - } - - peerID, err := peer.IDFromPublicKey(pub) - if err != nil { - p.engine.logger.Warn("error deriving peer id", zap.Error(err)) - continue - } - - head, err := p.engine.globalTimeReel.GetHead() - if err != nil { - return errors.Wrap(err, "sync") - } - - if latest.Header.FrameNumber < head.Header.FrameNumber { - latest = head - } - - err = p.syncWithPeer( - ctx, - latest.Header.FrameNumber, - []byte(peerID), - nil, - ) - if err != nil { - p.engine.logger.Debug("error syncing frame", zap.Error(err)) - } - } - - p.engine.logger.Info( - "returning leader frame", - zap.Uint64("frame_number", latest.Header.FrameNumber), - zap.Duration("frame_age", frametime.GlobalFrameSince(latest)), - ) - - return nil -} - -func (p *GlobalSyncProvider) syncWithPeer( - ctx context.Context, - frameNumber uint64, - peerId []byte, - expectedIdentity []byte, -) error { - p.engine.logger.Info( - "polling peer for new frames", - zap.String("peer_id", peer.ID(peerId).String()), - zap.Uint64("current_frame", frameNumber), - ) - - info := p.engine.peerInfoManager.GetPeerInfo(peerId) - if info == nil { - p.engine.logger.Info( - "no peer info known yet, skipping sync", - zap.String("peer", peer.ID(peerId).String()), - ) - return nil - } - - if len(info.Reachability) == 0 { - p.engine.logger.Info( - "no reachability info known yet, skipping sync", - zap.String("peer", peer.ID(peerId).String()), - ) - return nil - } - - syncTimeout := p.engine.config.Engine.SyncTimeout - for _, s := range info.Reachability[0].StreamMultiaddrs { - creds, err := p2p.NewPeerAuthenticator( - p.engine.logger, - p.engine.config.P2P, - nil, - nil, - nil, - nil, - [][]byte{[]byte(peerId)}, - map[string]channel.AllowedPeerPolicyType{}, - map[string]channel.AllowedPeerPolicyType{}, - ).CreateClientTLSCredentials([]byte(peerId)) - if err != nil { - return errors.Wrap(err, "sync") - } - - ma, err := multiaddr.StringCast(s) - if err != nil { - return errors.Wrap(err, "sync") - } - - mga, err := mn.ToNetAddr(ma) - if err != nil { - return errors.Wrap(err, "sync") - } - - cc, err := grpc.NewClient( - mga.String(), - grpc.WithTransportCredentials(creds), - ) - if err != nil { - p.engine.logger.Debug( - "could not establish direct channel, trying next multiaddr", - zap.String("peer", peer.ID(peerId).String()), - zap.String("multiaddr", ma.String()), - zap.Error(err), - ) - continue - } - defer func() { - if err := cc.Close(); err != nil { - p.engine.logger.Error("error while closing connection", zap.Error(err)) - } - }() - - client := protobufs.NewGlobalServiceClient(cc) - inner: - for { - getCtx, cancelGet := context.WithTimeout(ctx, syncTimeout) - response, err := client.GetGlobalProposal( - getCtx, - &protobufs.GetGlobalProposalRequest{ - FrameNumber: frameNumber, - }, - // The message size limits are swapped because the server is the one - // sending the data. - grpc.MaxCallRecvMsgSize( - p.engine.config.Engine.SyncMessageLimits.MaxSendMsgSize, - ), - grpc.MaxCallSendMsgSize( - p.engine.config.Engine.SyncMessageLimits.MaxRecvMsgSize, - ), - ) - cancelGet() - if err != nil { - p.engine.logger.Debug( - "could not get frame, trying next multiaddr", - zap.String("peer", peer.ID(peerId).String()), - zap.String("multiaddr", ma.String()), - zap.Error(err), - ) - break inner - } - - if response == nil { - p.engine.logger.Debug( - "received no response from peer", - zap.String("peer", peer.ID(peerId).String()), - zap.String("multiaddr", ma.String()), - zap.Error(err), - ) - break inner - } - - if response.Proposal == nil || response.Proposal.State == nil || - response.Proposal.State.Header == nil || - response.Proposal.State.Header.FrameNumber != frameNumber { - p.engine.logger.Debug("received empty response from peer") - return nil - } - if err := response.Proposal.Validate(); err != nil { - p.engine.logger.Debug( - "received invalid response from peer", - zap.Error(err), - ) - return nil - } - - if len(expectedIdentity) != 0 { - if !bytes.Equal( - []byte(response.Proposal.State.Identity()), - expectedIdentity, - ) { - p.engine.logger.Warn( - "aborting sync due to unexpected frame identity", - zap.Uint64("frame_number", frameNumber), - zap.String( - "expected", - hex.EncodeToString(expectedIdentity), - ), - zap.String( - "received", - hex.EncodeToString( - []byte(response.Proposal.State.Identity()), - ), - ), - ) - return errors.New("sync frame identity mismatch") - } - expectedIdentity = nil - } - - p.engine.logger.Info( - "received new leading frame", - zap.Uint64("frame_number", response.Proposal.State.Header.FrameNumber), - zap.Duration( - "frame_age", - frametime.GlobalFrameSince(response.Proposal.State), - ), - ) - - if _, err := p.engine.frameProver.VerifyGlobalFrameHeader( - response.Proposal.State.Header, - p.engine.blsConstructor, - ); err != nil { - return errors.Wrap(err, "sync") - } - - p.engine.globalProposalQueue <- response.Proposal - frameNumber = frameNumber + 1 - } - } - - p.engine.logger.Debug( - "failed to complete sync for all known multiaddrs", - zap.String("peer", peer.ID(peerId).String()), - ) - return nil -} - -func (p *GlobalSyncProvider) hyperSyncWithProver( - ctx context.Context, - prover []byte, - shardKey tries.ShardKey, -) { - registry, err := p.engine.signerRegistry.GetKeyRegistryByProver(prover) - if err == nil && registry != nil && registry.IdentityKey != nil { - peerKey := registry.IdentityKey - pubKey, err := crypto.UnmarshalEd448PublicKey(peerKey.KeyValue) - if err == nil { - peerId, err := peer.IDFromPublicKey(pubKey) - if err == nil { - ch, err := p.engine.pubsub.GetDirectChannel( - ctx, - []byte(peerId), - "sync", - ) - - if err == nil { - defer ch.Close() - client := protobufs.NewHypergraphComparisonServiceClient(ch) - str, err := client.HyperStream(ctx) - if err != nil { - p.engine.logger.Error("error from sync", zap.Error(err)) - } else { - p.hyperSyncVertexAdds(str, shardKey) - p.hyperSyncVertexRemoves(str, shardKey) - p.hyperSyncHyperedgeAdds(str, shardKey) - p.hyperSyncHyperedgeRemoves(str, shardKey) - } - } - } - } - } -} - -func (p *GlobalSyncProvider) hyperSyncVertexAdds( - str protobufs.HypergraphComparisonService_HyperStreamClient, - shardKey tries.ShardKey, -) { - err := p.engine.hypergraph.Sync( - str, - shardKey, - protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, - ) - if err != nil { - p.engine.logger.Error("error from sync", zap.Error(err)) - } - str.CloseSend() -} - -func (p *GlobalSyncProvider) hyperSyncVertexRemoves( - str protobufs.HypergraphComparisonService_HyperStreamClient, - shardKey tries.ShardKey, -) { - err := p.engine.hypergraph.Sync( - str, - shardKey, - protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES, - ) - if err != nil { - p.engine.logger.Error("error from sync", zap.Error(err)) - } - str.CloseSend() -} - -func (p *GlobalSyncProvider) hyperSyncHyperedgeAdds( - str protobufs.HypergraphComparisonService_HyperStreamClient, - shardKey tries.ShardKey, -) { - err := p.engine.hypergraph.Sync( - str, - shardKey, - protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS, - ) - if err != nil { - p.engine.logger.Error("error from sync", zap.Error(err)) - } - str.CloseSend() -} - -func (p *GlobalSyncProvider) hyperSyncHyperedgeRemoves( - str protobufs.HypergraphComparisonService_HyperStreamClient, - shardKey tries.ShardKey, -) { - err := p.engine.hypergraph.Sync( - str, - shardKey, - protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES, - ) - if err != nil { - p.engine.logger.Error("error from sync", zap.Error(err)) - } - str.CloseSend() -} - -func (p *GlobalSyncProvider) AddState( - sourcePeerID []byte, - frameNumber uint64, - expectedIdentity []byte, -) { - // Drop if we're within the threshold - if frameNumber <= - (*p.engine.forks.FinalizedState().State).Header.FrameNumber && - frameNumber != 0 { - p.engine.logger.Debug( - "dropping stale state for sync", - zap.Uint64("frame_requested", frameNumber), - zap.Uint64( - "finalized_frame", - (*p.engine.forks.FinalizedState().State).Header.FrameNumber, - ), - ) - return - } - - // Handle special case: we're at genesis frame on time reel - if frameNumber == 0 { - frameNumber = 1 - expectedIdentity = []byte{} - } - - // Enqueue if we can, otherwise drop it because we'll catch up - select { - case p.queuedStates <- syncRequest{ - frameNumber: frameNumber, - peerId: slices.Clone(sourcePeerID), - identity: slices.Clone(expectedIdentity), - }: - p.engine.logger.Debug( - "enqueued sync request", - zap.String("peer", peer.ID(sourcePeerID).String()), - zap.Uint64("enqueued_frame_number", frameNumber), - ) - default: - p.engine.logger.Debug("no queue capacity, dropping state for sync") - } -} diff --git a/node/consensus/global/global_consensus_engine.go b/node/consensus/global/global_consensus_engine.go index 5e6467f..f2d267f 100644 --- a/node/consensus/global/global_consensus_engine.go +++ b/node/consensus/global/global_consensus_engine.go @@ -39,6 +39,7 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/consensus/aggregator" "source.quilibrium.com/quilibrium/monorepo/node/consensus/provers" "source.quilibrium.com/quilibrium/monorepo/node/consensus/reward" + qsync "source.quilibrium.com/quilibrium/monorepo/node/consensus/sync" consensustime "source.quilibrium.com/quilibrium/monorepo/node/consensus/time" "source.quilibrium.com/quilibrium/monorepo/node/consensus/tracing" "source.quilibrium.com/quilibrium/monorepo/node/consensus/voting" @@ -147,6 +148,7 @@ type GlobalConsensusEngine struct { halt context.CancelFunc // Internal state + proverAddress []byte quit chan struct{} wg sync.WaitGroup minimumProvers func() uint64 @@ -167,6 +169,12 @@ type GlobalConsensusEngine struct { appFrameStore map[string]*protobufs.AppShardFrame appFrameStoreMu sync.RWMutex lowCoverageStreak map[string]*coverageStreak + peerInfoDigestCache map[string]struct{} + peerInfoDigestCacheMu sync.Mutex + keyRegistryDigestCache map[string]struct{} + keyRegistryDigestCacheMu sync.Mutex + peerAuthCache map[string]time.Time + peerAuthCacheMu sync.RWMutex // Transaction cross-shard lock tracking txLockMap map[uint64]map[string]map[string]*LockedTransaction @@ -186,7 +194,7 @@ type GlobalConsensusEngine struct { timeoutAggregator consensus.TimeoutAggregator[*protobufs.ProposalVote] // Provider implementations - syncProvider *GlobalSyncProvider + syncProvider *qsync.SyncProvider[*protobufs.GlobalFrame, *protobufs.GlobalProposal] votingProvider *GlobalVotingProvider leaderProvider *GlobalLeaderProvider livenessProvider *GlobalLivenessProvider @@ -286,6 +294,9 @@ func NewGlobalConsensusEngine( lastProvenFrameTime: time.Now(), blacklistMap: make(map[string]bool), pendingMessages: [][]byte{}, + peerInfoDigestCache: make(map[string]struct{}), + keyRegistryDigestCache: make(map[string]struct{}), + peerAuthCache: make(map[string]time.Time), alertPublicKey: []byte{}, txLockMap: make(map[uint64]map[string]map[string]*LockedTransaction), } @@ -329,8 +340,23 @@ func NewGlobalConsensusEngine( } } + keyId := "q-prover-key" + + key, err := keyManager.GetSigningKey(keyId) + if err != nil { + logger.Error("failed to get key for prover address", zap.Error(err)) + panic(err) + } + + addressBI, err := poseidon.HashBytes(key.Public().([]byte)) + if err != nil { + logger.Error("failed to calculate prover address", zap.Error(err)) + panic(err) + } + + engine.proverAddress = addressBI.FillBytes(make([]byte, 32)) + // Create provider implementations - engine.syncProvider = NewGlobalSyncProvider(engine) engine.votingProvider = &GlobalVotingProvider{engine: engine} engine.leaderProvider = &GlobalLeaderProvider{engine: engine} engine.livenessProvider = &GlobalLivenessProvider{engine: engine} @@ -489,9 +515,6 @@ func NewGlobalConsensusEngine( }) } - // Add sync provider - componentBuilder.AddWorker(engine.syncProvider.Start) - // Add execution engines componentBuilder.AddWorker(engine.executionManager.Start) componentBuilder.AddWorker(engine.globalTimeReel.Start) @@ -635,6 +658,31 @@ func NewGlobalConsensusEngine( engine.forks = forks + engine.syncProvider = qsync.NewSyncProvider[ + *protobufs.GlobalFrame, + *protobufs.GlobalProposal, + ]( + logger, + forks, + proverRegistry, + signerRegistry, + peerInfoManager, + qsync.NewGlobalSyncClient( + frameProver, + blsConstructor, + engine, + config, + ), + hypergraph, + config, + nil, + engine.proverAddress, + ) + + // Add sync provider + componentBuilder.AddWorker(engine.syncProvider.Start) + + // Add consensus componentBuilder.AddWorker(func( ctx lifecycle.SignalerContext, ready lifecycle.ReadyFunc, @@ -966,14 +1014,8 @@ func (e *GlobalConsensusEngine) Stop(force bool) <-chan error { } func (e *GlobalConsensusEngine) GetFrame() *protobufs.GlobalFrame { - // Get the current frame from the time reel - frame, _ := e.globalTimeReel.GetHead() - - if frame == nil { - return nil - } - - return frame.Clone().(*protobufs.GlobalFrame) + frame, _ := e.clockStore.GetLatestGlobalClockFrame() + return frame } func (e *GlobalConsensusEngine) GetDifficulty() uint32 { @@ -1315,7 +1357,7 @@ func (e *GlobalConsensusEngine) materialize( zap.Int("message_index", i), zap.Error(err), ) - return errors.Wrap(err, "materialize") + continue } state = result.State @@ -1375,20 +1417,7 @@ func (e *GlobalConsensusEngine) getPeerID() GlobalPeerID { } func (e *GlobalConsensusEngine) getProverAddress() []byte { - keyId := "q-prover-key" - - key, err := e.keyManager.GetSigningKey(keyId) - if err != nil { - e.logger.Error("failed to get key for prover address", zap.Error(err)) - return []byte{} - } - - addressBI, err := poseidon.HashBytes(key.Public().([]byte)) - if err != nil { - e.logger.Error("failed to calculate prover address", zap.Error(err)) - return []byte{} - } - return addressBI.FillBytes(make([]byte, 32)) + return e.proverAddress } func (e *GlobalConsensusEngine) updateMetrics( @@ -2834,11 +2863,7 @@ func (e *GlobalConsensusEngine) OnQuorumCertificateTriggeredRankChange( zap.Uint64("rank", newRank-1), zap.Uint64("frame_number", qc.GetFrameNumber()), ) - current, err := e.globalTimeReel.GetHead() - if err != nil { - e.logger.Error("could not get time reel head", zap.Error(err)) - return - } + current := (*e.forks.FinalizedState().State) peer, err := e.getRandomProverPeerId() if err != nil { e.logger.Error("could not get random peer", zap.Error(err)) @@ -2852,47 +2877,6 @@ func (e *GlobalConsensusEngine) OnQuorumCertificateTriggeredRankChange( return } - frame.Header.PublicKeySignatureBls48581 = aggregateSig - - err = e.globalTimeReel.Insert(frame) - if err != nil { - e.logger.Error("could not insert frame into time reel", zap.Error(err)) - return - } - - current, err := e.globalTimeReel.GetHead() - if err != nil { - e.logger.Error("could not get time reel head", zap.Error(err)) - return - } - - if !bytes.Equal(frame.Header.Output, current.Header.Output) { - e.logger.Error( - "frames not aligned, might need sync", - zap.Uint64("new_frame_number", frame.Header.FrameNumber), - zap.Uint64("reel_frame_number", current.Header.FrameNumber), - zap.Uint64("new_frame_rank", frame.Header.Rank), - zap.Uint64("reel_frame_rank", current.Header.Rank), - zap.String("new_frame_id", hex.EncodeToString([]byte(frame.Identity()))), - zap.String( - "reel_frame_id", - hex.EncodeToString([]byte(current.Identity())), - ), - ) - - peerID, err := e.getPeerIDOfProver(frame.Header.Prover) - if err != nil { - return - } - - e.syncProvider.AddState( - []byte(peerID), - current.Header.FrameNumber, - []byte(current.Identity()), - ) - return - } - if !bytes.Equal(frame.Header.ParentSelector, parentQC.Selector) { e.logger.Error( "quorum certificate does not match frame parent", @@ -2928,12 +2912,53 @@ func (e *GlobalConsensusEngine) OnQuorumCertificateTriggeredRankChange( return } + frame.Header.PublicKeySignatureBls48581 = aggregateSig + + latest, err := e.clockStore.GetLatestGlobalClockFrame() + if err != nil { + e.logger.Error("could not obtain latest frame", zap.Error(err)) + return + } + + if latest.Header.FrameNumber+1 != frame.Header.FrameNumber || + !bytes.Equal([]byte(latest.Identity()), frame.Header.ParentSelector) { + e.logger.Debug( + "not next frame, cannot advance", + zap.Uint64("latest_frame_number", latest.Header.FrameNumber), + zap.Uint64("new_frame_number", frame.Header.FrameNumber), + zap.String( + "latest_frame_selector", + hex.EncodeToString([]byte(latest.Identity())), + ), + zap.String( + "new_frame_number", + hex.EncodeToString(frame.Header.ParentSelector), + ), + ) + return + } + txn, err = e.clockStore.NewTransaction(false) if err != nil { e.logger.Error("could not create transaction", zap.Error(err)) return } + if err := e.materialize( + txn, + frame.Header.FrameNumber, + frame.Requests, + ); err != nil { + _ = txn.Abort() + e.logger.Error("could not materialize frame requests", zap.Error(err)) + return + } + if err := e.clockStore.PutGlobalClockFrame(frame, txn); err != nil { + _ = txn.Abort() + e.logger.Error("could not put global frame", zap.Error(err)) + return + } + if err := e.clockStore.PutCertifiedGlobalState( &protobufs.GlobalProposal{ State: frame, @@ -2952,6 +2977,11 @@ func (e *GlobalConsensusEngine) OnQuorumCertificateTriggeredRankChange( e.logger.Error("could not commit transaction", zap.Error(err)) txn.Abort() } + + if err := e.checkShardCoverage(frame.GetFrameNumber()); err != nil { + e.logger.Error("could not check shard coverage", zap.Error(err)) + return + } } // OnRankChange implements consensus.Consumer. @@ -3243,14 +3273,10 @@ func (e *GlobalConsensusEngine) getPendingProposals( *protobufs.GlobalFrame, *protobufs.ProposalVote, ] { - pendingFrames, err := e.clockStore.RangeGlobalClockFrames( - frameNumber, - 0xfffffffffffffffe, - ) + root, err := e.clockStore.GetGlobalClockFrame(frameNumber) if err != nil { panic(err) } - defer pendingFrames.Close() result := []*models.SignedProposal[ *protobufs.GlobalFrame, @@ -3258,34 +3284,42 @@ func (e *GlobalConsensusEngine) getPendingProposals( ]{} e.logger.Debug("getting pending proposals", zap.Uint64("start", frameNumber)) - pendingFrames.First() - if !pendingFrames.Valid() { - e.logger.Debug("no valid frame") - return result + + startRank := root.Header.Rank + latestQC, err := e.clockStore.GetLatestQuorumCertificate(nil) + if err != nil { + panic(err) } - value, err := pendingFrames.Value() - if err != nil || value == nil { - e.logger.Debug("value was invalid", zap.Error(err)) - return result + endRank := latestQC.Rank + + parent, err := e.clockStore.GetQuorumCertificate(nil, startRank) + if err != nil { + panic(err) } - previous := value - for pendingFrames.Next(); pendingFrames.Valid(); pendingFrames.Next() { - value, err := pendingFrames.Value() - if err != nil || value == nil { - e.logger.Debug("iter value was invalid or empty", zap.Error(err)) - break - } - - parent, err := e.clockStore.GetQuorumCertificate(nil, previous.GetRank()) + for rank := startRank + 1; rank <= endRank; rank++ { + nextQC, err := e.clockStore.GetQuorumCertificate(nil, rank) if err != nil { - panic(err) + e.logger.Debug("no qc for rank", zap.Error(err)) + continue + } + + value, err := e.clockStore.GetGlobalClockFrameCandidate( + nextQC.FrameNumber, + []byte(nextQC.Identity()), + ) + if err != nil { + e.logger.Debug("no frame for qc", zap.Error(err)) + parent = nextQC + continue } - priorTC, _ := e.clockStore.GetTimeoutCertificate(nil, value.GetRank()-1) var priorTCModel models.TimeoutCertificate = nil - if priorTC != nil { - priorTCModel = priorTC + if parent.Rank != rank-1 { + priorTC, _ := e.clockStore.GetTimeoutCertificate(nil, rank-1) + if priorTC != nil { + priorTCModel = priorTC + } } vote := &protobufs.ProposalVote{ @@ -3313,7 +3347,7 @@ func (e *GlobalConsensusEngine) getPendingProposals( }, Vote: &vote, }) - previous = value + parent = nextQC } return result } diff --git a/node/consensus/global/message_processors.go b/node/consensus/global/message_processors.go index 93fcae0..95ef00f 100644 --- a/node/consensus/global/message_processors.go +++ b/node/consensus/global/message_processors.go @@ -3,6 +3,7 @@ package global import ( "bytes" "context" + "crypto/sha256" "encoding/binary" "encoding/hex" "fmt" @@ -13,6 +14,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" + "google.golang.org/protobuf/proto" "source.quilibrium.com/quilibrium/monorepo/consensus/models" "source.quilibrium.com/quilibrium/monorepo/consensus/verification" "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb" @@ -385,6 +387,10 @@ func (e *GlobalConsensusEngine) handlePeerInfoMessage(message *pb.Message) { return } + if e.isDuplicatePeerInfo(peerInfo) { + return + } + // Validate signature if !e.validatePeerInfoSignature(peerInfo) { e.logger.Debug("invalid peer info signature", @@ -401,6 +407,10 @@ func (e *GlobalConsensusEngine) handlePeerInfoMessage(message *pb.Message) { return } + if e.isDuplicateKeyRegistry(keyRegistry) { + return + } + if err := keyRegistry.Validate(); err != nil { e.logger.Debug("invalid key registry", zap.Error(err)) return @@ -536,6 +546,72 @@ type keyRegistryValidationResult struct { proverAddress []byte } +func (e *GlobalConsensusEngine) isDuplicatePeerInfo( + peerInfo *protobufs.PeerInfo, +) bool { + digest, err := hashPeerInfo(peerInfo) + if err != nil { + e.logger.Warn("failed to hash peer info", zap.Error(err)) + return false + } + + e.peerInfoDigestCacheMu.Lock() + defer e.peerInfoDigestCacheMu.Unlock() + + if _, ok := e.peerInfoDigestCache[digest]; ok { + return true + } + + e.peerInfoDigestCache[digest] = struct{}{} + return false +} + +func (e *GlobalConsensusEngine) isDuplicateKeyRegistry( + keyRegistry *protobufs.KeyRegistry, +) bool { + digest, err := hashKeyRegistry(keyRegistry) + if err != nil { + e.logger.Warn("failed to hash key registry", zap.Error(err)) + return false + } + + e.keyRegistryDigestCacheMu.Lock() + defer e.keyRegistryDigestCacheMu.Unlock() + + if _, ok := e.keyRegistryDigestCache[digest]; ok { + return true + } + + e.keyRegistryDigestCache[digest] = struct{}{} + return false +} + +func hashPeerInfo(peerInfo *protobufs.PeerInfo) (string, error) { + cloned := proto.Clone(peerInfo).(*protobufs.PeerInfo) + cloned.Timestamp = 0 + + data, err := cloned.ToCanonicalBytes() + if err != nil { + return "", err + } + + sum := sha256.Sum256(data) + return hex.EncodeToString(sum[:]), nil +} + +func hashKeyRegistry(keyRegistry *protobufs.KeyRegistry) (string, error) { + cloned := proto.Clone(keyRegistry).(*protobufs.KeyRegistry) + cloned.LastUpdated = 0 + + data, err := cloned.ToCanonicalBytes() + if err != nil { + return "", err + } + + sum := sha256.Sum256(data) + return hex.EncodeToString(sum[:]), nil +} + func (e *GlobalConsensusEngine) validateKeyRegistry( keyRegistry *protobufs.KeyRegistry, ) (*keyRegistryValidationResult, error) { @@ -835,6 +911,8 @@ func (e *GlobalConsensusEngine) handleGlobalProposal( e.logger.Debug( "handling global proposal", + zap.Uint64("rank", proposal.GetRank()), + zap.Uint64("frame_number", proposal.State.GetFrameNumber()), zap.String("id", hex.EncodeToString([]byte(proposal.State.Identity()))), ) @@ -874,7 +952,13 @@ func (e *GlobalConsensusEngine) handleGlobalProposal( // drop proposals if we already processed them if frameNumber <= finalizedFrameNumber || proposal.State.Header.Rank <= finalizedRank { - e.logger.Debug("dropping stale proposal") + e.logger.Debug( + "dropping stale (lower than finalized) proposal", + zap.Uint64("finalized_rank", finalizedRank), + zap.Uint64("finalized_frame_number", finalizedFrameNumber), + zap.Uint64("rank", proposal.GetRank()), + zap.Uint64("frame_number", proposal.State.GetFrameNumber()), + ) return } @@ -887,7 +971,11 @@ func (e *GlobalConsensusEngine) handleGlobalProposal( if qcErr == nil && qc != nil && qc.GetFrameNumber() == frameNumber && qc.Identity() == proposal.State.Identity() { - e.logger.Debug("dropping stale proposal") + e.logger.Debug( + "dropping stale (already committed) proposal", + zap.Uint64("rank", proposal.GetRank()), + zap.Uint64("frame_number", proposal.State.GetFrameNumber()), + ) return } } @@ -904,7 +992,9 @@ func (e *GlobalConsensusEngine) handleGlobalProposal( ) { e.logger.Debug( "parent frame not stored, requesting sync", - zap.Uint64("frame_number", proposal.State.Header.FrameNumber-1), + zap.Uint64("rank", proposal.GetRank()), + zap.Uint64("frame_number", proposal.State.GetFrameNumber()), + zap.Uint64("parent_frame_number", proposal.State.Header.FrameNumber-1), ) e.cacheProposal(proposal) @@ -916,23 +1006,20 @@ func (e *GlobalConsensusEngine) handleGlobalProposal( } } - head, err := e.globalTimeReel.GetHead() - if err != nil { - return - } + head := e.forks.FinalizedState() e.syncProvider.AddState( []byte(peerID), - head.Header.FrameNumber, - []byte(head.Identity()), + (*head.State).Header.FrameNumber, + []byte(head.Identifier), ) return } } - expectedFrame, err := e.globalTimeReel.GetHead() + expectedFrame, err := e.clockStore.GetLatestGlobalClockFrame() if err != nil { - e.logger.Error("could not obtain time reel head", zap.Error(err)) + e.logger.Error("could not obtain latest global frame", zap.Error(err)) return } @@ -968,38 +1055,65 @@ func (e *GlobalConsensusEngine) processProposal( ) bool { e.logger.Debug( "processing proposal", + zap.Uint64("rank", proposal.GetRank()), + zap.Uint64("frame_number", proposal.State.GetFrameNumber()), zap.String("id", hex.EncodeToString([]byte(proposal.State.Identity()))), ) err := e.VerifyQuorumCertificate(proposal.ParentQuorumCertificate) if err != nil { - e.logger.Debug("proposal has invalid qc", zap.Error(err)) + e.logger.Debug( + "proposal has invalid qc", + zap.Uint64("rank", proposal.GetRank()), + zap.Uint64("frame_number", proposal.State.GetFrameNumber()), + zap.Error(err), + ) return false } if proposal.PriorRankTimeoutCertificate != nil { err := e.VerifyTimeoutCertificate(proposal.PriorRankTimeoutCertificate) if err != nil { - e.logger.Debug("proposal has invalid tc", zap.Error(err)) + e.logger.Debug( + "proposal has invalid tc", + zap.Uint64("rank", proposal.GetRank()), + zap.Uint64("frame_number", proposal.State.GetFrameNumber()), + zap.Error(err), + ) return false } } err = e.VerifyVote(&proposal.Vote) if err != nil { - e.logger.Debug("proposal has invalid vote", zap.Error(err)) + e.logger.Debug( + "proposal has invalid vote", + zap.Uint64("rank", proposal.GetRank()), + zap.Uint64("frame_number", proposal.State.GetFrameNumber()), + zap.Error(err), + ) return false } err = proposal.State.Validate() if err != nil { - e.logger.Debug("proposal is not valid", zap.Error(err)) + e.logger.Debug( + "proposal is not valid", + zap.Uint64("rank", proposal.GetRank()), + zap.Uint64("frame_number", proposal.State.GetFrameNumber()), + zap.Error(err), + ) return false } valid, err := e.frameValidator.Validate(proposal.State) if !valid || err != nil { - e.logger.Debug("invalid frame in proposal", zap.Error(err)) + e.logger.Debug( + "invalid frame in proposal", + zap.Uint64("rank", proposal.GetRank()), + zap.Uint64("frame_number", proposal.State.GetFrameNumber()), + zap.Error(err), + ) return false } @@ -1056,6 +1170,7 @@ func (e *GlobalConsensusEngine) cacheProposal( e.logger.Debug( "cached out-of-order proposal", + zap.Uint64("rank", proposal.GetRank()), zap.Uint64("frame_number", frameNumber), zap.String("id", hex.EncodeToString([]byte(proposal.State.Identity()))), ) @@ -1092,6 +1207,7 @@ func (e *GlobalConsensusEngine) drainProposalCache(startFrame uint64) { if !e.processProposal(prop) { e.logger.Debug( "cached proposal failed processing, retaining for retry", + zap.Uint64("rank", prop.GetRank()), zap.Uint64("frame_number", next), ) e.cacheProposal(prop) @@ -1157,7 +1273,7 @@ func (e *GlobalConsensusEngine) trySealParentWithChild( zap.Uint64("child_frame", header.FrameNumber), ) - head, err := e.globalTimeReel.GetHead() + head, err := e.clockStore.GetLatestGlobalClockFrame() if err != nil { e.logger.Error("error fetching time reel head", zap.Error(err)) return @@ -1224,44 +1340,27 @@ func (e *GlobalConsensusEngine) addCertifiedState( parent.State.Header.PublicKeySignatureBls48581 = aggregateSig - err = e.globalTimeReel.Insert(parent.State) - if err != nil { - e.logger.Error("could not insert frame into time reel", zap.Error(err)) - return - } - - current, err := e.globalTimeReel.GetHead() - if err != nil { - e.logger.Error("could not get time reel head", zap.Error(err)) - return - } - - if !bytes.Equal(parent.State.Header.Output, current.Header.Output) { - e.logger.Error( - "frames not aligned", - zap.Uint64("parent_frame_number", parent.State.Header.FrameNumber), - zap.Uint64("new_frame_number", child.State.Header.FrameNumber), - zap.Uint64("reel_frame_number", current.Header.FrameNumber), - zap.Uint64("new_frame_rank", child.State.Header.Rank), - zap.Uint64("reel_frame_rank", current.Header.Rank), - zap.String( - "new_frame_id", - hex.EncodeToString([]byte(child.State.Identity())), - ), - zap.String( - "reel_frame_id", - hex.EncodeToString([]byte(current.Identity())), - ), - ) - return - } - txn, err = e.clockStore.NewTransaction(false) if err != nil { e.logger.Error("could not create transaction", zap.Error(err)) return } + if err := e.materialize( + txn, + parent.State.Header.FrameNumber, + parent.State.Requests, + ); err != nil { + _ = txn.Abort() + e.logger.Error("could not materialize frame requests", zap.Error(err)) + return + } + if err := e.clockStore.PutGlobalClockFrame(parent.State, txn); err != nil { + _ = txn.Abort() + e.logger.Error("could not put global frame", zap.Error(err)) + return + } + if err := e.clockStore.PutCertifiedGlobalState( parent, txn, @@ -1276,6 +1375,11 @@ func (e *GlobalConsensusEngine) addCertifiedState( txn.Abort() return } + + if err := e.checkShardCoverage(parent.State.GetFrameNumber()); err != nil { + e.logger.Error("could not check shard coverage", zap.Error(err)) + return + } } func (e *GlobalConsensusEngine) handleProposal(message *pb.Message) { @@ -1333,6 +1437,12 @@ func (e *GlobalConsensusEngine) handleProposal(message *pb.Message) { proposalProcessedTotal.WithLabelValues("success").Inc() } +func (e *GlobalConsensusEngine) AddProposal( + proposal *protobufs.GlobalProposal, +) { + e.globalProposalQueue <- proposal +} + func (e *GlobalConsensusEngine) handleVote(message *pb.Message) { // Skip our own messages if bytes.Equal(message.From, e.pubsub.GetPeerID()) { diff --git a/node/consensus/global/metrics.go b/node/consensus/global/metrics.go index 0cf4f11..a5e9cb8 100644 --- a/node/consensus/global/metrics.go +++ b/node/consensus/global/metrics.go @@ -450,17 +450,6 @@ var ( []string{"action"}, // action: "register", "unregister" ) - // Sync status metrics - syncStatusCheck = promauto.NewCounterVec( - prometheus.CounterOpts{ - Namespace: metricsNamespace, - Subsystem: subsystem, - Name: "sync_status_check_total", - Help: "Total number of sync status checks", - }, - []string{"result"}, // result: "synced", "syncing" - ) - // Engine state metrics engineState = promauto.NewGauge( prometheus.GaugeOpts{ diff --git a/node/consensus/global/services.go b/node/consensus/global/services.go index 82adbaf..43e8db0 100644 --- a/node/consensus/global/services.go +++ b/node/consensus/global/services.go @@ -5,6 +5,7 @@ import ( "context" "math/big" "slices" + "time" "github.com/iden3/go-iden3-crypto/poseidon" "github.com/libp2p/go-libp2p/core/peer" @@ -36,7 +37,7 @@ func (e *GlobalConsensusEngine) GetGlobalFrame( ) var frame *protobufs.GlobalFrame if request.FrameNumber == 0 { - frame, err = e.globalTimeReel.GetHead() + frame = (*e.forks.FinalizedState().State) if frame.Header.FrameNumber == 0 { return nil, errors.Wrap( errors.New("not currently syncable"), @@ -430,6 +431,10 @@ func (e *GlobalConsensusEngine) authenticateProverFromContext( } if !bytes.Equal(e.pubsub.GetPeerID(), []byte(peerID)) { + if e.peerAuthCacheAllows(peerID) { + return peerID, nil + } + registry, err := e.keyStore.GetKeyRegistry( []byte(peerID), ) @@ -478,7 +483,39 @@ func (e *GlobalConsensusEngine) authenticateProverFromContext( "invalid peer", ) } + + e.markPeerAuthCache(peerID) } return peerID, nil } + +const peerAuthCacheTTL = 10 * time.Second + +func (e *GlobalConsensusEngine) peerAuthCacheAllows(id peer.ID) bool { + e.peerAuthCacheMu.RLock() + expiry, ok := e.peerAuthCache[string(id)] + e.peerAuthCacheMu.RUnlock() + + if !ok { + return false + } + + if time.Now().After(expiry) { + e.peerAuthCacheMu.Lock() + if current, exists := e.peerAuthCache[string(id)]; exists && + current == expiry { + delete(e.peerAuthCache, string(id)) + } + e.peerAuthCacheMu.Unlock() + return false + } + + return true +} + +func (e *GlobalConsensusEngine) markPeerAuthCache(id peer.ID) { + e.peerAuthCacheMu.Lock() + e.peerAuthCache[string(id)] = time.Now().Add(peerAuthCacheTTL) + e.peerAuthCacheMu.Unlock() +} diff --git a/node/consensus/sync/metrics.go b/node/consensus/sync/metrics.go new file mode 100644 index 0000000..6cf0b2a --- /dev/null +++ b/node/consensus/sync/metrics.go @@ -0,0 +1,24 @@ +package sync + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const ( + metricsNamespace = "quilibrium" + subsystem = "sync" +) + +var ( + // Sync status metrics + syncStatusCheck = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: subsystem, + Name: "sync_status_check_total", + Help: "Total number of sync status checks", + }, + []string{"filter", "result"}, // result: "synced", "syncing" + ) +) diff --git a/node/consensus/sync/sync_client.go b/node/consensus/sync/sync_client.go new file mode 100644 index 0000000..5047a60 --- /dev/null +++ b/node/consensus/sync/sync_client.go @@ -0,0 +1,303 @@ +package sync + +import ( + "bytes" + "context" + "encoding/hex" + + "github.com/pkg/errors" + "go.uber.org/zap" + "google.golang.org/grpc" + "source.quilibrium.com/quilibrium/monorepo/config" + "source.quilibrium.com/quilibrium/monorepo/node/internal/frametime" + "source.quilibrium.com/quilibrium/monorepo/protobufs" + tconsensus "source.quilibrium.com/quilibrium/monorepo/types/consensus" + "source.quilibrium.com/quilibrium/monorepo/types/crypto" +) + +var ErrConnectivityFailed = errors.New("connectivity to peer failed") +var ErrInvalidResponse = errors.New("peer returned invalid response") + +type SyncClient[StateT UniqueFrame, ProposalT any] interface { + Sync( + ctx context.Context, + logger *zap.Logger, + cc *grpc.ClientConn, + frameNumber uint64, + expectedIdentity []byte, + ) error +} + +type GlobalSyncClient struct { + frameProver crypto.FrameProver + blsConstructor crypto.BlsConstructor + proposalProcessor ProposalProcessor[*protobufs.GlobalProposal] + config *config.Config +} + +func NewGlobalSyncClient( + frameProver crypto.FrameProver, + blsConstructor crypto.BlsConstructor, + proposalProcessor ProposalProcessor[*protobufs.GlobalProposal], + config *config.Config, +) *GlobalSyncClient { + return &GlobalSyncClient{ + frameProver: frameProver, + config: config, + blsConstructor: blsConstructor, + proposalProcessor: proposalProcessor, + } +} + +func (g *GlobalSyncClient) Sync( + ctx context.Context, + logger *zap.Logger, + cc *grpc.ClientConn, + frameNumber uint64, + expectedIdentity []byte, +) error { + client := protobufs.NewGlobalServiceClient(cc) + + for { + getCtx, cancelGet := context.WithTimeout(ctx, g.config.Engine.SyncTimeout) + response, err := client.GetGlobalProposal( + getCtx, + &protobufs.GetGlobalProposalRequest{ + FrameNumber: frameNumber, + }, + // The message size limits are swapped because the server is the one + // sending the data. + grpc.MaxCallRecvMsgSize( + g.config.Engine.SyncMessageLimits.MaxSendMsgSize, + ), + grpc.MaxCallSendMsgSize( + g.config.Engine.SyncMessageLimits.MaxRecvMsgSize, + ), + ) + cancelGet() + if err != nil { + logger.Debug( + "could not get frame, trying next multiaddr", + zap.Error(err), + ) + return ErrConnectivityFailed + } + + if response == nil { + logger.Debug( + "received no response from peer", + zap.Error(err), + ) + return ErrInvalidResponse + } + + if response.Proposal == nil || response.Proposal.State == nil || + response.Proposal.State.Header == nil || + response.Proposal.State.Header.FrameNumber != frameNumber { + logger.Debug("received empty response from peer") + return ErrInvalidResponse + } + if err := response.Proposal.Validate(); err != nil { + logger.Debug( + "received invalid response from peer", + zap.Error(err), + ) + return ErrInvalidResponse + } + + if len(expectedIdentity) != 0 { + if !bytes.Equal( + []byte(response.Proposal.State.Identity()), + expectedIdentity, + ) { + logger.Warn( + "aborting sync due to unexpected frame identity", + zap.Uint64("frame_number", frameNumber), + zap.String( + "expected", + hex.EncodeToString(expectedIdentity), + ), + zap.String( + "received", + hex.EncodeToString( + []byte(response.Proposal.State.Identity()), + ), + ), + ) + return errors.New("sync frame identity mismatch") + } + expectedIdentity = nil + } + + logger.Info( + "received new leading frame", + zap.Uint64("frame_number", response.Proposal.State.Header.FrameNumber), + zap.Duration( + "frame_age", + frametime.GlobalFrameSince(response.Proposal.State), + ), + ) + + if _, err := g.frameProver.VerifyGlobalFrameHeader( + response.Proposal.State.Header, + g.blsConstructor, + ); err != nil { + logger.Debug( + "received invalid frame from peer", + zap.Error(err), + ) + return ErrInvalidResponse + } + + g.proposalProcessor.AddProposal(response.Proposal) + frameNumber = frameNumber + 1 + } +} + +type AppSyncClient struct { + frameProver crypto.FrameProver + proverRegistry tconsensus.ProverRegistry + blsConstructor crypto.BlsConstructor + proposalProcessor ProposalProcessor[*protobufs.AppShardProposal] + config *config.Config + filter []byte +} + +func NewAppSyncClient( + frameProver crypto.FrameProver, + proverRegistry tconsensus.ProverRegistry, + blsConstructor crypto.BlsConstructor, + proposalProcessor ProposalProcessor[*protobufs.AppShardProposal], + config *config.Config, + filter []byte, +) *AppSyncClient { + return &AppSyncClient{ + frameProver: frameProver, + proverRegistry: proverRegistry, + config: config, + blsConstructor: blsConstructor, + proposalProcessor: proposalProcessor, + filter: filter, + } +} + +func (a *AppSyncClient) Sync( + ctx context.Context, + logger *zap.Logger, + cc *grpc.ClientConn, + frameNumber uint64, + expectedIdentity []byte, +) error { + client := protobufs.NewAppShardServiceClient(cc) + + for { + getCtx, cancelGet := context.WithTimeout(ctx, a.config.Engine.SyncTimeout) + response, err := client.GetAppShardProposal( + getCtx, + &protobufs.GetAppShardProposalRequest{ + Filter: a.filter, + FrameNumber: frameNumber, + }, + // The message size limits are swapped because the server is the one + // sending the data. + grpc.MaxCallRecvMsgSize( + a.config.Engine.SyncMessageLimits.MaxSendMsgSize, + ), + grpc.MaxCallSendMsgSize( + a.config.Engine.SyncMessageLimits.MaxRecvMsgSize, + ), + ) + cancelGet() + if err != nil { + logger.Debug( + "could not get frame, trying next multiaddr", + zap.Error(err), + ) + return ErrConnectivityFailed + } + + if response == nil { + logger.Debug( + "received no response from peer", + zap.Error(err), + ) + return ErrInvalidResponse + } + + if response.Proposal == nil || response.Proposal.State == nil || + response.Proposal.State.Header == nil || + response.Proposal.State.Header.FrameNumber != frameNumber { + logger.Debug("received empty response from peer") + return ErrInvalidResponse + } + if err := response.Proposal.Validate(); err != nil { + logger.Debug( + "received invalid response from peer", + zap.Error(err), + ) + return ErrInvalidResponse + } + + if len(expectedIdentity) != 0 { + if !bytes.Equal( + []byte(response.Proposal.State.Identity()), + expectedIdentity, + ) { + logger.Warn( + "aborting sync due to unexpected frame identity", + zap.Uint64("frame_number", frameNumber), + zap.String( + "expected", + hex.EncodeToString(expectedIdentity), + ), + zap.String( + "received", + hex.EncodeToString( + []byte(response.Proposal.State.Identity()), + ), + ), + ) + return errors.New("sync frame identity mismatch") + } + expectedIdentity = nil + } + + logger.Info( + "received new leading frame", + zap.Uint64("frame_number", response.Proposal.State.Header.FrameNumber), + zap.Duration( + "frame_age", + frametime.AppFrameSince(response.Proposal.State), + ), + ) + + provers, err := a.proverRegistry.GetActiveProvers(a.filter) + if err != nil { + logger.Debug( + "could not obtain active provers", + zap.Error(err), + ) + return ErrInvalidResponse + } + + ids := [][]byte{} + for _, p := range provers { + ids = append(ids, p.Address) + } + + if _, err := a.frameProver.VerifyFrameHeader( + response.Proposal.State.Header, + a.blsConstructor, + ids, + ); err != nil { + logger.Debug( + "received invalid frame from peer", + zap.Error(err), + ) + return ErrInvalidResponse + } + + a.proposalProcessor.AddProposal(response.Proposal) + frameNumber = frameNumber + 1 + } +} diff --git a/node/consensus/sync/sync_provider.go b/node/consensus/sync/sync_provider.go new file mode 100644 index 0000000..c2da4b1 --- /dev/null +++ b/node/consensus/sync/sync_provider.go @@ -0,0 +1,622 @@ +package sync + +import ( + "bytes" + "context" + "encoding/hex" + "fmt" + "math/rand" + "slices" + "time" + + pcrypto "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + mn "github.com/multiformats/go-multiaddr/net" + "github.com/pkg/errors" + "go.uber.org/zap" + "google.golang.org/grpc" + "source.quilibrium.com/quilibrium/monorepo/config" + "source.quilibrium.com/quilibrium/monorepo/consensus" + "source.quilibrium.com/quilibrium/monorepo/consensus/models" + "source.quilibrium.com/quilibrium/monorepo/lifecycle" + "source.quilibrium.com/quilibrium/monorepo/node/p2p" + "source.quilibrium.com/quilibrium/monorepo/protobufs" + "source.quilibrium.com/quilibrium/monorepo/types/channel" + tconsensus "source.quilibrium.com/quilibrium/monorepo/types/consensus" + "source.quilibrium.com/quilibrium/monorepo/types/hypergraph" + tp2p "source.quilibrium.com/quilibrium/monorepo/types/p2p" + "source.quilibrium.com/quilibrium/monorepo/types/tries" +) + +const defaultStateQueueCapacity = 1 + +type syncRequest struct { + frameNumber uint64 + peerId []byte + identity []byte +} + +type UniqueFrame interface { + models.Unique + GetFrameNumber() uint64 +} + +type ProposalProcessor[ProposalT any] interface { + AddProposal(proposal ProposalT) +} + +// SyncProvider implements consensus.SyncProvider +type SyncProvider[StateT UniqueFrame, ProposalT any] struct { + logger *zap.Logger + queuedStates chan syncRequest + forks consensus.Forks[StateT] + proverRegistry tconsensus.ProverRegistry + signerRegistry tconsensus.SignerRegistry + peerInfoManager tp2p.PeerInfoManager + proposalSynchronizer SyncClient[StateT, ProposalT] + hypergraph hypergraph.Hypergraph + config *config.Config + + filter []byte + proverAddress []byte +} + +var _ consensus.SyncProvider[*protobufs.GlobalFrame] = (*SyncProvider[*protobufs.GlobalFrame, *protobufs.GlobalProposal])(nil) + +func NewSyncProvider[StateT UniqueFrame, ProposalT any]( + logger *zap.Logger, + forks consensus.Forks[StateT], + proverRegistry tconsensus.ProverRegistry, + signerRegistry tconsensus.SignerRegistry, + peerInfoManager tp2p.PeerInfoManager, + proposalSynchronizer SyncClient[StateT, ProposalT], + hypergraph hypergraph.Hypergraph, + config *config.Config, + filter []byte, + proverAddress []byte, +) *SyncProvider[StateT, ProposalT] { + return &SyncProvider[StateT, ProposalT]{ + logger: logger, + filter: filter, + forks: forks, + proverRegistry: proverRegistry, + signerRegistry: signerRegistry, + peerInfoManager: peerInfoManager, + proposalSynchronizer: proposalSynchronizer, + hypergraph: hypergraph, + proverAddress: proverAddress, + config: config, + queuedStates: make(chan syncRequest, defaultStateQueueCapacity), + } +} + +func (p *SyncProvider[StateT, ProposalT]) Start( + ctx lifecycle.SignalerContext, + ready lifecycle.ReadyFunc, +) { + ready() + for { + select { + case <-ctx.Done(): + return + case request := <-p.queuedStates: + finalized := p.forks.FinalizedState() + if request.frameNumber <= + (*p.forks.FinalizedState().State).GetFrameNumber() { + continue + } + p.logger.Info( + "synchronizing with peer", + zap.String("peer", peer.ID(request.peerId).String()), + zap.Uint64("finalized_rank", finalized.Rank), + zap.Uint64("peer_frame", request.frameNumber), + ) + p.processState( + ctx, + request.frameNumber, + request.peerId, + request.identity, + ) + case <-time.After(10 * time.Minute): + peerId, err := p.getRandomProverPeerId() + if err != nil { + p.logger.Debug("could not get random prover peer id", zap.Error(err)) + continue + } + + select { + case p.queuedStates <- syncRequest{ + frameNumber: (*p.forks.FinalizedState().State).GetFrameNumber() + 1, + peerId: []byte(peerId), + }: + default: + } + } + } +} + +func (p *SyncProvider[StateT, ProposalT]) processState( + ctx context.Context, + frameNumber uint64, + peerID []byte, + identity []byte, +) { + err := p.syncWithPeer( + ctx, + frameNumber, + peerID, + identity, + ) + if err != nil { + p.logger.Error("could not sync with peer", zap.Error(err)) + } +} + +func (p *SyncProvider[StateT, ProposalT]) Synchronize( + ctx context.Context, + existing *StateT, +) (<-chan *StateT, <-chan error) { + dataCh := make(chan *StateT, 1) + errCh := make(chan error, 1) + + go func() { + defer func() { + if r := recover(); r != nil { + if errCh != nil { + errCh <- errors.New(fmt.Sprintf("fatal error encountered: %+v", r)) + } + } + }() + defer close(dataCh) + defer close(errCh) + + head := p.forks.FinalizedState() + hasFrame := head != nil + + if !hasFrame { + errCh <- errors.New("no frame") + return + } + + err := p.syncWithMesh(ctx) + if err != nil { + dataCh <- existing + errCh <- err + return + } + + if hasFrame { + dataCh <- head.State + } + + syncStatusCheck.WithLabelValues("synced").Inc() + errCh <- nil + }() + + return dataCh, errCh +} + +func (p *SyncProvider[StateT, ProposalT]) syncWithMesh( + ctx context.Context, +) error { + p.logger.Info("synchronizing with peers") + + head := p.forks.FinalizedState() + + peers, err := p.proverRegistry.GetActiveProvers(p.filter) + if len(peers) <= 1 || err != nil { + return nil + } + + for _, candidate := range peers { + if bytes.Equal(candidate.Address, p.proverAddress) { + continue + } + + registry, err := p.signerRegistry.GetKeyRegistryByProver( + candidate.Address, + ) + if err != nil { + continue + } + + if registry.IdentityKey == nil || registry.IdentityKey.KeyValue == nil { + continue + } + + pub, err := pcrypto.UnmarshalEd448PublicKey(registry.IdentityKey.KeyValue) + if err != nil { + p.logger.Warn("error unmarshaling identity key", zap.Error(err)) + continue + } + + peerID, err := peer.IDFromPublicKey(pub) + if err != nil { + p.logger.Warn("error deriving peer id", zap.Error(err)) + continue + } + + err = p.syncWithPeer( + ctx, + (*head.State).GetFrameNumber(), + []byte(peerID), + nil, + ) + if err != nil { + p.logger.Debug("error syncing frame", zap.Error(err)) + } + } + + head = p.forks.FinalizedState() + + p.logger.Info( + "returning leader frame", + zap.Uint64("frame_number", (*head.State).GetFrameNumber()), + zap.Duration( + "frame_age", + time.Since(time.UnixMilli(int64((*head.State).GetTimestamp()))), + ), + ) + + return nil +} + +func (p *SyncProvider[StateT, ProposalT]) syncWithPeer( + ctx context.Context, + frameNumber uint64, + peerId []byte, + expectedIdentity []byte, +) error { + p.logger.Info( + "polling peer for new frames", + zap.String("peer_id", peer.ID(peerId).String()), + zap.Uint64("current_frame", frameNumber), + ) + + info := p.peerInfoManager.GetPeerInfo(peerId) + if info == nil { + p.logger.Info( + "no peer info known yet, skipping sync", + zap.String("peer", peer.ID(peerId).String()), + ) + return nil + } + if len(info.Reachability) == 0 { + p.logger.Info( + "no reachability info known yet, skipping sync", + zap.String("peer", peer.ID(peerId).String()), + ) + return nil + } + + for _, reachability := range info.Reachability { + if !bytes.Equal(reachability.Filter, p.filter) { + continue + } + for _, s := range reachability.StreamMultiaddrs { + cc, err := p.getDirectChannel(peerId, s) + if err != nil { + p.logger.Debug( + "could not establish direct channel, trying next multiaddr", + zap.String("peer", peer.ID(peerId).String()), + zap.String("multiaddr", s), + zap.Error(err), + ) + continue + } + defer func() { + if err := cc.Close(); err != nil { + p.logger.Error("error while closing connection", zap.Error(err)) + } + }() + + err = p.proposalSynchronizer.Sync( + ctx, + p.logger.With( + zap.String("peer", peer.ID(peerId).String()), + zap.String("multiaddr", s), + ), + cc, + frameNumber, + expectedIdentity, + ) + if err != nil { + if errors.Is(err, ErrConnectivityFailed) { + continue + } + + return errors.Wrap(err, "sync") + } + } + break + } + + p.logger.Debug( + "failed to complete sync for all known multiaddrs", + zap.String("peer", peer.ID(peerId).String()), + ) + return nil +} + +func (p *SyncProvider[StateT, ProposalT]) HyperSync( + ctx context.Context, + prover []byte, + shardKey tries.ShardKey, +) { + registry, err := p.signerRegistry.GetKeyRegistryByProver(prover) + if err != nil || registry == nil || registry.IdentityKey == nil { + p.logger.Debug( + "failed to find key registry info for prover", + zap.String("prover", hex.EncodeToString(prover)), + ) + return + } + + peerKey := registry.IdentityKey + pubKey, err := pcrypto.UnmarshalEd448PublicKey(peerKey.KeyValue) + if err != nil { + p.logger.Error( + "could not unmarshal key info", + zap.String("prover", hex.EncodeToString(prover)), + zap.String("prover", hex.EncodeToString(peerKey.KeyValue)), + ) + return + } + + peerId, err := peer.IDFromPublicKey(pubKey) + info := p.peerInfoManager.GetPeerInfo([]byte(peerId)) + if info == nil { + p.logger.Info( + "no peer info known yet, skipping hypersync", + zap.String("peer", peer.ID(peerId).String()), + ) + return + } + if len(info.Reachability) == 0 { + p.logger.Info( + "no reachability info known yet, skipping sync", + zap.String("peer", peer.ID(peerId).String()), + ) + return + } + + for _, reachability := range info.Reachability { + if !bytes.Equal(reachability.Filter, p.filter) { + continue + } + for _, s := range reachability.StreamMultiaddrs { + if err == nil { + ch, err := p.getDirectChannel( + []byte(peerId), + s, + ) + if err != nil { + p.logger.Debug( + "could not establish direct channel, trying next multiaddr", + zap.String("peer", peer.ID(peerId).String()), + zap.String("multiaddr", s), + zap.Error(err), + ) + continue + } + + defer ch.Close() + client := protobufs.NewHypergraphComparisonServiceClient(ch) + str, err := client.HyperStream(ctx) + if err != nil { + p.logger.Error("error from sync", zap.Error(err)) + } else { + p.hyperSyncVertexAdds(str, shardKey) + p.hyperSyncVertexRemoves(str, shardKey) + p.hyperSyncHyperedgeAdds(str, shardKey) + p.hyperSyncHyperedgeRemoves(str, shardKey) + } + + } + } + break + } +} + +func (p *SyncProvider[StateT, ProposalT]) hyperSyncVertexAdds( + str protobufs.HypergraphComparisonService_HyperStreamClient, + shardKey tries.ShardKey, +) { + err := p.hypergraph.Sync( + str, + shardKey, + protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, + ) + if err != nil { + p.logger.Error("error from sync", zap.Error(err)) + } + str.CloseSend() +} + +func (p *SyncProvider[StateT, ProposalT]) hyperSyncVertexRemoves( + str protobufs.HypergraphComparisonService_HyperStreamClient, + shardKey tries.ShardKey, +) { + err := p.hypergraph.Sync( + str, + shardKey, + protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES, + ) + if err != nil { + p.logger.Error("error from sync", zap.Error(err)) + } + str.CloseSend() +} + +func (p *SyncProvider[StateT, ProposalT]) hyperSyncHyperedgeAdds( + str protobufs.HypergraphComparisonService_HyperStreamClient, + shardKey tries.ShardKey, +) { + err := p.hypergraph.Sync( + str, + shardKey, + protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS, + ) + if err != nil { + p.logger.Error("error from sync", zap.Error(err)) + } + str.CloseSend() +} + +func (p *SyncProvider[StateT, ProposalT]) hyperSyncHyperedgeRemoves( + str protobufs.HypergraphComparisonService_HyperStreamClient, + shardKey tries.ShardKey, +) { + err := p.hypergraph.Sync( + str, + shardKey, + protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES, + ) + if err != nil { + p.logger.Error("error from sync", zap.Error(err)) + } + str.CloseSend() +} + +func (p *SyncProvider[StateT, ProposalT]) AddState( + sourcePeerID []byte, + frameNumber uint64, + expectedIdentity []byte, +) { + // Adjust if we're within the threshold + if frameNumber <= + (*p.forks.FinalizedState().State).GetFrameNumber() && + frameNumber != 0 { + frameNumber = frameNumber + 1 + expectedIdentity = nil + } + + // Handle special case: we're at genesis frame on time reel + if frameNumber == 0 { + frameNumber = 1 + expectedIdentity = []byte{} + } + + // Enqueue if we can, otherwise drop it because we'll catch up + select { + case p.queuedStates <- syncRequest{ + frameNumber: frameNumber, + peerId: slices.Clone(sourcePeerID), + identity: slices.Clone(expectedIdentity), + }: + p.logger.Debug( + "enqueued sync request", + zap.String("peer", peer.ID(sourcePeerID).String()), + zap.Uint64("enqueued_frame_number", frameNumber), + ) + default: + p.logger.Debug("no queue capacity, dropping state for sync") + } +} + +func (p *SyncProvider[StateT, ProposalT]) getDirectChannel( + peerId []byte, + multiaddrString string, +) ( + *grpc.ClientConn, + error, +) { + creds, err := p2p.NewPeerAuthenticator( + p.logger, + p.config.P2P, + nil, + nil, + nil, + nil, + [][]byte{peerId}, + map[string]channel.AllowedPeerPolicyType{}, + map[string]channel.AllowedPeerPolicyType{}, + ).CreateClientTLSCredentials(peerId) + if err != nil { + return nil, err + } + + ma, err := multiaddr.StringCast(multiaddrString) + if err != nil { + return nil, err + } + + mga, err := mn.ToNetAddr(ma) + if err != nil { + return nil, err + } + + cc, err := grpc.NewClient( + mga.String(), + grpc.WithTransportCredentials(creds), + ) + return cc, err +} + +func (e *SyncProvider[StateT, ProposalT]) getPeerIDOfProver( + prover []byte, +) (peer.ID, error) { + registry, err := e.signerRegistry.GetKeyRegistryByProver( + prover, + ) + if err != nil { + e.logger.Debug( + "could not get registry for prover", + zap.Error(err), + ) + return "", err + } + + if registry == nil || registry.IdentityKey == nil { + e.logger.Debug("registry for prover not found") + return "", err + } + + pk, err := pcrypto.UnmarshalEd448PublicKey(registry.IdentityKey.KeyValue) + if err != nil { + e.logger.Debug( + "could not parse pub key", + zap.Error(err), + ) + return "", err + } + + id, err := peer.IDFromPublicKey(pk) + if err != nil { + e.logger.Debug( + "could not derive peer id", + zap.Error(err), + ) + return "", err + } + + return id, nil +} + +func (e *SyncProvider[StateT, ProposalT]) getRandomProverPeerId() ( + peer.ID, + error, +) { + provers, err := e.proverRegistry.GetActiveProvers(nil) + if err != nil { + e.logger.Error( + "could not get active provers for sync", + zap.Error(err), + ) + } + if len(provers) == 0 { + return "", err + } + + otherProvers := []*tconsensus.ProverInfo{} + for _, p := range provers { + if bytes.Equal(p.Address, e.proverAddress) { + continue + } + otherProvers = append(otherProvers, p) + } + + index := rand.Intn(len(otherProvers)) + return e.getPeerIDOfProver(otherProvers[index].Address) +} diff --git a/node/datarpc/data_worker_ipc_server.go b/node/datarpc/data_worker_ipc_server.go index ab1d22f..a5c1afa 100644 --- a/node/datarpc/data_worker_ipc_server.go +++ b/node/datarpc/data_worker_ipc_server.go @@ -38,6 +38,7 @@ type DataWorkerIPCServer struct { signerRegistry consensus.SignerRegistry proverRegistry consensus.ProverRegistry peerInfoManager tp2p.PeerInfoManager + pubsub tp2p.PubSub authProvider channel.AuthenticationProvider appConsensusEngineFactory *app.AppConsensusEngineFactory appConsensusEngine *app.AppConsensusEngine @@ -52,6 +53,7 @@ func NewDataWorkerIPCServer( signerRegistry consensus.SignerRegistry, proverRegistry consensus.ProverRegistry, peerInfoManager tp2p.PeerInfoManager, + pubsub tp2p.PubSub, frameProver crypto.FrameProver, appConsensusEngineFactory *app.AppConsensusEngineFactory, logger *zap.Logger, @@ -89,6 +91,7 @@ func NewDataWorkerIPCServer( logger: logger, coreId: coreId, parentProcessId: parentProcessId, + pubsub: pubsub, signer: signer, appConsensusEngineFactory: appConsensusEngineFactory, signerRegistry: signerRegistry, @@ -108,6 +111,7 @@ func (r *DataWorkerIPCServer) Start() error { func (r *DataWorkerIPCServer) Stop() error { r.logger.Info("stopping server gracefully") + r.pubsub.Close() if r.server != nil { r.server.GracefulStop() } diff --git a/node/main.go b/node/main.go index 2e51d4f..5918d65 100644 --- a/node/main.go +++ b/node/main.go @@ -431,6 +431,10 @@ func main() { nodeConfig.Engine.DataWorkerCount = qruntime.WorkerCount( nodeConfig.Engine.DataWorkerCount, true, true, ) + } else { + nodeConfig.Engine.DataWorkerCount = len( + nodeConfig.Engine.DataWorkerP2PMultiaddrs, + ) } if len(nodeConfig.Engine.DataWorkerP2PMultiaddrs) != diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index d6f282b..385b5c1 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -1606,3 +1606,8 @@ func getNetworkNamespace(network uint8) string { return ANNOUNCE_PREFIX + network_name } + +// Close implements p2p.PubSub. +func (b *BlossomSub) Close() error { + return nil +} diff --git a/node/p2p/peer_authenticator.go b/node/p2p/peer_authenticator.go index 98dced4..87cdf09 100644 --- a/node/p2p/peer_authenticator.go +++ b/node/p2p/peer_authenticator.go @@ -14,6 +14,7 @@ import ( "net" "slices" "strings" + "sync" "time" "github.com/cloudflare/circl/sign/ed448" @@ -61,8 +62,15 @@ type PeerAuthenticator struct { // method: "/package.Service/Method" servicePolicies map[string]channel.AllowedPeerPolicyType methodPolicies map[string]channel.AllowedPeerPolicyType + + cacheMu sync.RWMutex + anyProverCache map[string]time.Time + globalProverCache map[string]time.Time + shardProverCache map[string]time.Time } +const authCacheTTL = 10 * time.Second + func NewPeerAuthenticator( logger *zap.Logger, config *config.P2PConfig, @@ -119,6 +127,10 @@ func NewPeerAuthenticator( selfPeerId, servicePolicies, methodPolicies, + sync.RWMutex{}, + make(map[string]time.Time), + make(map[string]time.Time), + make(map[string]time.Time), } } @@ -531,6 +543,11 @@ func (p *PeerAuthenticator) isSelf(id []byte) bool { } func (p *PeerAuthenticator) isAnyProver(id []byte) bool { + key := string(id) + if p.cacheAllows(key, p.anyProverCache) { + return true + } + if p.proverRegistry == nil { p.logger.Error( "request authentication for any prover failed", @@ -574,10 +591,16 @@ func (p *PeerAuthenticator) isAnyProver(id []byte) bool { return false } + p.markCache(key, p.anyProverCache) return true } func (p *PeerAuthenticator) isGlobalProver(id []byte) bool { + key := string(id) + if p.cacheAllows(key, p.globalProverCache) { + return true + } + if p.proverRegistry == nil { p.logger.Error( "request authenticated for global prover failed", @@ -626,6 +649,7 @@ func (p *PeerAuthenticator) isGlobalProver(id []byte) bool { for _, alloc := range info.Allocations { if len(alloc.ConfirmationFilter) == 0 { + p.markCache(key, p.globalProverCache) return true } } @@ -638,6 +662,11 @@ func (p *PeerAuthenticator) isGlobalProver(id []byte) bool { } func (p *PeerAuthenticator) isShardProver(id []byte) bool { + key := string(id) + if p.cacheAllows(key, p.shardProverCache) { + return true + } + if p.proverRegistry == nil { p.logger.Error( "request authentication for shard prover failed", @@ -686,6 +715,7 @@ func (p *PeerAuthenticator) isShardProver(id []byte) bool { for _, alloc := range info.Allocations { if bytes.Equal(alloc.ConfirmationFilter, p.filter) { + p.markCache(key, p.shardProverCache) return true } } @@ -697,6 +727,40 @@ func (p *PeerAuthenticator) isShardProver(id []byte) bool { return false } +func (p *PeerAuthenticator) cacheAllows( + key string, + cache map[string]time.Time, +) bool { + p.cacheMu.RLock() + expiry, ok := cache[key] + p.cacheMu.RUnlock() + + if !ok { + return false + } + + if time.Now().After(expiry) { + p.cacheMu.Lock() + // verify entry still matches before deleting + if current, exists := cache[key]; exists && current == expiry { + delete(cache, key) + } + p.cacheMu.Unlock() + return false + } + + return true +} + +func (p *PeerAuthenticator) markCache( + key string, + cache map[string]time.Time, +) { + p.cacheMu.Lock() + cache[key] = time.Now().Add(authCacheTTL) + p.cacheMu.Unlock() +} + func (p *PeerAuthenticator) policyFor( fullMethod string, ) channel.AllowedPeerPolicyType { diff --git a/node/p2p/peer_info_manager.go b/node/p2p/peer_info_manager.go index 2da0a11..ba9e1ec 100644 --- a/node/p2p/peer_info_manager.go +++ b/node/p2p/peer_info_manager.go @@ -41,7 +41,6 @@ func (m *InMemoryPeerInfoManager) Start( for { select { case info := <-m.peerInfoCh: - m.peerInfoMx.Lock() reachability := []p2p.Reachability{} for _, r := range info.Reachability { reachability = append(reachability, p2p.Reachability{ @@ -58,6 +57,7 @@ func (m *InMemoryPeerInfoManager) Start( }) } seen := time.Now().UnixMilli() + m.peerInfoMx.Lock() m.peerMap[string(info.PeerId)] = &p2p.PeerInfo{ PeerId: info.PeerId, Bandwidth: 100, diff --git a/node/rpc/proxy_blossomsub.go b/node/rpc/proxy_blossomsub.go index 138f640..0ca28e0 100644 --- a/node/rpc/proxy_blossomsub.go +++ b/node/rpc/proxy_blossomsub.go @@ -26,6 +26,7 @@ type ProxyBlossomSub struct { client *PubSubProxyClient conn *grpc.ClientConn logger *zap.Logger + cancel context.CancelFunc coreId uint } @@ -144,10 +145,13 @@ func NewProxyBlossomSub( return nil, errors.Wrap(err, "new proxy blossom sub") } + ctx, cancel := context.WithCancel(context.Background()) + // Create the proxy client - client := NewPubSubProxyClient(conn, logger) + client := NewPubSubProxyClient(ctx, conn, logger) return &ProxyBlossomSub{ + cancel: cancel, client: client, conn: conn, logger: logger, @@ -157,6 +161,7 @@ func NewProxyBlossomSub( // Close closes the proxy connection func (p *ProxyBlossomSub) Close() error { + p.cancel() if p.conn != nil { return p.conn.Close() } diff --git a/node/rpc/pubsub_proxy.go b/node/rpc/pubsub_proxy.go index 70573b1..48bba53 100644 --- a/node/rpc/pubsub_proxy.go +++ b/node/rpc/pubsub_proxy.go @@ -520,6 +520,7 @@ type PubSubProxyClient struct { client protobufs.PubSubProxyClient conn *grpc.ClientConn logger *zap.Logger + ctx context.Context // Track active subscriptions and validators subscriptions map[string]context.CancelFunc @@ -530,8 +531,14 @@ type PubSubProxyClient struct { mu sync.RWMutex } +// Close implements p2p.PubSub. +func (c *PubSubProxyClient) Close() error { + return nil +} + // NewPubSubProxyClient creates a new proxy client func NewPubSubProxyClient( + ctx context.Context, conn *grpc.ClientConn, logger *zap.Logger, ) *PubSubProxyClient { @@ -539,6 +546,7 @@ func NewPubSubProxyClient( client: protobufs.NewPubSubProxyClient(conn), conn: conn, logger: logger, + ctx: ctx, subscriptions: make(map[string]context.CancelFunc), validators: make(map[string]func( peer.ID, @@ -547,99 +555,139 @@ func NewPubSubProxyClient( bitmaskValidators: make(map[string]string), } + // HACK: Kludgy, but the master process spawns the workers almost certainly + // in proxy mode (because manually clustering shouldn't use the proxy feature) + // so we should give it time to start listening before proceeding. Adding + // this buffer gives us a far better chance of not erroring out on the first + // attempt. + time.Sleep(10 * time.Second) + // Initialize validator stream - if err := client.initValidatorStream(); err != nil { + if err := client.initValidatorStream(ctx); err != nil { logger.Error("failed to initialize validator stream", zap.Error(err)) } return client } -func (c *PubSubProxyClient) initValidatorStream() error { - c.validatorStreamMu.Lock() - defer c.validatorStreamMu.Unlock() +func (c *PubSubProxyClient) initValidatorStream(ctx context.Context) error { + backoff := time.Second - stream, err := c.client.ValidatorStream(context.Background()) - if err != nil { - return err - } - - c.validatorStream = stream - - // Start goroutine to handle incoming validation requests - go c.handleValidationRequests() - - return nil -} - -func (c *PubSubProxyClient) handleValidationRequests() { for { - msg, err := c.validatorStream.Recv() - if err != nil { - c.logger.Error("validator stream recv error", zap.Error(err)) - // Try to reconnect - time.Sleep(1 * time.Second) - if err := c.initValidatorStream(); err != nil { + select { + case <-ctx.Done(): + return nil + default: + stream, err := c.client.ValidatorStream(ctx) + if err != nil { c.logger.Error( - "failed to reinitialize validator stream", + "validator stream connect failed, retrying", zap.Error(err), + zap.Duration("retry_in", backoff), ) - } - return - } + select { + case <-ctx.Done(): + return nil + case <-time.After(backoff): + if backoff < 30*time.Second { + backoff *= 2 + if backoff > 30*time.Second { + backoff = 30 * time.Second + } + } + } - switch m := msg.Message.(type) { - case *protobufs.ValidationStreamMessage_ValidationRequest: - req := m.ValidationRequest - - // Look up the validator function - c.mu.RLock() - validator, exists := c.validators[req.ValidatorId] - c.mu.RUnlock() - - if !exists { - c.logger.Warn("received validation request for unknown validator", - zap.String("validator_id", req.ValidatorId)) continue } - // Convert message and call validator - pbMsg := &pb.Message{ - Data: req.Message.Data, - From: req.Message.From, - Seqno: req.Message.Seqno, - Bitmask: req.Message.Bitmask, - Signature: req.Message.Signature, - Key: req.Message.Key, - } - - result := validator(peer.ID(req.PeerId), pbMsg) - - // Send response - var protoResult protobufs.ValidationResponse_ValidationResult - switch result { - case p2p.ValidationResultAccept: - protoResult = protobufs.ValidationResponse_ACCEPT - case p2p.ValidationResultReject: - protoResult = protobufs.ValidationResponse_REJECT - default: - protoResult = protobufs.ValidationResponse_IGNORE - } - - resp := &protobufs.ValidationStreamMessage{ - Message: &protobufs.ValidationStreamMessage_ValidationResponse{ - ValidationResponse: &protobufs.ValidationResponse{ - ValidatorId: req.ValidatorId, - Result: protoResult, - }, - }, - } - c.validatorStreamMu.Lock() - if err := c.validatorStream.Send(resp); err != nil { - c.logger.Error("failed to send validation response", zap.Error(err)) - } + c.validatorStream = stream c.validatorStreamMu.Unlock() + + // Start goroutine to handle incoming validation requests + go c.handleValidationRequests(ctx) + + return nil + } + } +} + +func (c *PubSubProxyClient) handleValidationRequests(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + msg, err := c.validatorStream.Recv() + if err != nil { + c.logger.Error("validator stream recv error", zap.Error(err)) + c.validatorStreamMu.Lock() + c.validatorStream = nil + c.validatorStreamMu.Unlock() + // Try to reconnect + time.Sleep(1 * time.Second) + if err := c.initValidatorStream(ctx); err != nil { + c.logger.Error( + "failed to reinitialize validator stream", + zap.Error(err), + ) + } + return + } + + switch m := msg.Message.(type) { + case *protobufs.ValidationStreamMessage_ValidationRequest: + req := m.ValidationRequest + + // Look up the validator function + c.mu.RLock() + validator, exists := c.validators[req.ValidatorId] + c.mu.RUnlock() + + if !exists { + c.logger.Warn("received validation request for unknown validator", + zap.String("validator_id", req.ValidatorId)) + continue + } + + // Convert message and call validator + pbMsg := &pb.Message{ + Data: req.Message.Data, + From: req.Message.From, + Seqno: req.Message.Seqno, + Bitmask: req.Message.Bitmask, + Signature: req.Message.Signature, + Key: req.Message.Key, + } + + result := validator(peer.ID(req.PeerId), pbMsg) + + // Send response + var protoResult protobufs.ValidationResponse_ValidationResult + switch result { + case p2p.ValidationResultAccept: + protoResult = protobufs.ValidationResponse_ACCEPT + case p2p.ValidationResultReject: + protoResult = protobufs.ValidationResponse_REJECT + default: + protoResult = protobufs.ValidationResponse_IGNORE + } + + resp := &protobufs.ValidationStreamMessage{ + Message: &protobufs.ValidationStreamMessage_ValidationResponse{ + ValidationResponse: &protobufs.ValidationResponse{ + ValidatorId: req.ValidatorId, + Result: protoResult, + }, + }, + } + + c.validatorStreamMu.Lock() + if err := c.validatorStream.Send(resp); err != nil { + c.logger.Error("failed to send validation response", zap.Error(err)) + } + c.validatorStreamMu.Unlock() + } } } } @@ -791,18 +839,19 @@ func (c *PubSubProxyClient) RegisterValidator( } c.validatorStreamMu.Lock() - defer c.validatorStreamMu.Unlock() - if c.validatorStream == nil { + c.validatorStreamMu.Unlock() // Try to initialize stream if not already done - if err := c.initValidatorStream(); err != nil { + if err := c.initValidatorStream(c.ctx); err != nil { c.mu.Lock() delete(c.validators, validatorID) delete(c.bitmaskValidators, bitmaskKey) c.mu.Unlock() return err } + c.validatorStreamMu.Lock() } + defer c.validatorStreamMu.Unlock() if err := c.validatorStream.Send(req); err != nil { c.mu.Lock() diff --git a/node/store/pebble.go b/node/store/pebble.go index 9f9dca4..a67e510 100644 --- a/node/store/pebble.go +++ b/node/store/pebble.go @@ -24,6 +24,8 @@ type PebbleDB struct { var pebbleMigrations = []func(*pebble.Batch) error{ migration_2_1_0_4, migration_2_1_0_5, + migration_2_1_0_8, + migration_2_1_0_81, } func NewPebbleDB( @@ -443,3 +445,15 @@ func migration_2_1_0_5(b *pebble.Batch) error { // We just re-run it again return migration_2_1_0_4(b) } + +func migration_2_1_0_8(b *pebble.Batch) error { + // these migration entries exist solely to advance migration number so all + // nodes are consistent + return nil +} + +func migration_2_1_0_81(b *pebble.Batch) error { + // these migration entries exist solely to advance migration number so all + // nodes are consistent + return nil +} diff --git a/node/worker/manager.go b/node/worker/manager.go index 239e777..1ed0bf0 100644 --- a/node/worker/manager.go +++ b/node/worker/manager.go @@ -477,7 +477,34 @@ func (w *WorkerManager) GetFilterByWorkerId(coreId uint) ([]byte, error) { } func (w *WorkerManager) RangeWorkers() ([]*typesStore.WorkerInfo, error) { - return w.store.RangeWorkers() + w.mu.Lock() + defer w.mu.Unlock() + + workers, err := w.store.RangeWorkers() + if err != nil { + return nil, err + } + + if len(workers) != int(w.config.Engine.DataWorkerCount) { + for i := uint(1); i <= uint(w.config.Engine.DataWorkerCount); i++ { + if _, ok := w.serviceClients[i]; ok { + continue + } + if _, err := w.getIPCOfWorker(i); err != nil { + w.logger.Error( + "could not initialize worker for range", + zap.Uint("core_id", i), + zap.Error(err), + ) + } + } + workers, err = w.store.RangeWorkers() + if err != nil { + return nil, err + } + } + + return workers, nil } // ProposeAllocations invokes a proposal function set by the parent of the @@ -512,14 +539,70 @@ func (w *WorkerManager) loadWorkersFromStore() error { return errors.Wrap(err, "load workers from store") } - if len(workers) != w.config.Engine.DataWorkerCount { - for i := range w.config.Engine.DataWorkerCount { - _, err := w.getIPCOfWorker(uint(i + 1)) - if err != nil { - w.logger.Error("could not obtain IPC for worker", zap.Error(err)) + if len(workers) != int(w.config.Engine.DataWorkerCount) { + existingWorkers := make(map[uint]*typesStore.WorkerInfo, len(workers)) + for _, worker := range workers { + existingWorkers[worker.CoreId] = worker + } + + // Ensure all configured workers exist + for i := uint(1); i <= uint(w.config.Engine.DataWorkerCount); i++ { + if _, ok := existingWorkers[i]; ok { continue } + if _, err := w.getIPCOfWorker(i); err != nil { + w.logger.Error( + "could not obtain IPC for worker", + zap.Uint("core_id", i), + zap.Error(err), + ) + } } + + // Remove workers beyond configured count + for _, worker := range workers { + if worker.CoreId <= uint(w.config.Engine.DataWorkerCount) { + continue + } + + txn, err := w.store.NewTransaction(false) + if err != nil { + w.logger.Error( + "could not create txn to delete worker", + zap.Uint("core_id", worker.CoreId), + zap.Error(err), + ) + continue + } + + if err := w.store.DeleteWorker(txn, worker.CoreId); err != nil { + _ = txn.Abort() + w.logger.Error( + "could not delete worker", + zap.Uint("core_id", worker.CoreId), + zap.Error(err), + ) + } + if err := txn.Commit(); err != nil { + _ = txn.Abort() + w.logger.Error( + "could not commit worker delete", + zap.Uint("core_id", worker.CoreId), + zap.Error(err), + ) + } + + if client, ok := w.serviceClients[worker.CoreId]; ok { + _ = client.Close() + delete(w.serviceClients, worker.CoreId) + } + delete(w.filtersByWorker, worker.CoreId) + delete(w.allocatedWorkers, worker.CoreId) + if len(worker.Filter) > 0 { + delete(w.workersByFilter, string(worker.Filter)) + } + } + workers, err = w.store.RangeWorkers() if err != nil { return errors.Wrap(err, "load workers from store") diff --git a/protobufs/global.go b/protobufs/global.go index c65831c..2610235 100644 --- a/protobufs/global.go +++ b/protobufs/global.go @@ -145,6 +145,10 @@ func (g *GlobalFrame) Source() models.Identity { return models.Identity(g.Header.Prover) } +func (g *GlobalFrame) GetFrameNumber() uint64 { + return g.Header.FrameNumber +} + func (a *AppShardFrame) Clone() models.Unique { return proto.Clone(a).(*AppShardFrame) } @@ -179,6 +183,10 @@ func (a *AppShardFrame) Source() models.Identity { return models.Identity(a.Header.Prover) } +func (a *AppShardFrame) GetFrameNumber() uint64 { + return a.Header.FrameNumber +} + func (s *AppShardProposal) GetRank() uint64 { rank := uint64(0) if s.State != nil && s.State.GetRank() > rank { diff --git a/types/mocks/pubsub.go b/types/mocks/pubsub.go index e3b9d7f..c5ee5fe 100644 --- a/types/mocks/pubsub.go +++ b/types/mocks/pubsub.go @@ -18,6 +18,11 @@ type MockPubSub struct { mock.Mock } +// Close implements p2p.PubSub. +func (m *MockPubSub) Close() error { + return nil +} + // GetOwnMultiaddrs implements p2p.PubSub. func (m *MockPubSub) GetOwnMultiaddrs() []multiaddr.Multiaddr { args := m.Called() diff --git a/types/p2p/pubsub.go b/types/p2p/pubsub.go index 50768b4..e17ee70 100644 --- a/types/p2p/pubsub.go +++ b/types/p2p/pubsub.go @@ -20,6 +20,7 @@ const ( ) type PubSub interface { + Close() error PublishToBitmask(bitmask []byte, data []byte) error Publish(address []byte, data []byte) error Subscribe(bitmask []byte, handler func(message *pb.Message) error) error