From 9b2f59327561b778af47149b09a2a84f07aa4348 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Mon, 22 Dec 2025 11:59:24 -0600 Subject: [PATCH] allow local sync, use it for provers with workers --- node/consensus/app/app_consensus_engine.go | 17 ++++- .../global/global_consensus_engine.go | 7 ++ node/consensus/sync/sync_provider.go | 71 +++++++++++++++++++ types/hypergraph/sync.go | 24 +++++-- 4 files changed, 112 insertions(+), 7 deletions(-) diff --git a/node/consensus/app/app_consensus_engine.go b/node/consensus/app/app_consensus_engine.go index 493357b..dd05b3e 100644 --- a/node/consensus/app/app_consensus_engine.go +++ b/node/consensus/app/app_consensus_engine.go @@ -940,6 +940,13 @@ func NewAppConsensusEngine( ) } + // Set self peer ID on hypergraph to allow unlimited self-sync sessions + if hgWithSelfPeer, ok := engine.hyperSync.(interface { + SetSelfPeerID(string) + }); ok { + hgWithSelfPeer.SetSelfPeerID(ps.GetPeerID().String()) + } + return engine, nil } @@ -1050,8 +1057,8 @@ func (e *AppConsensusEngine) computeLocalGlobalProverRoot( } func (e *AppConsensusEngine) triggerGlobalHypersync(proposer []byte, expectedRoot []byte) { - if e.syncProvider == nil || len(proposer) == 0 { - e.logger.Debug("no sync provider or proposer for hypersync") + if e.syncProvider == nil { + e.logger.Debug("no sync provider for hypersync") return } if bytes.Equal(proposer, e.proverAddress) { @@ -1063,6 +1070,10 @@ func (e *AppConsensusEngine) triggerGlobalHypersync(proposer []byte, expectedRoo return } + // Sync from our own master node instead of the proposer to avoid + // overburdening the proposer with sync requests from all workers. + selfPeerID := e.pubsub.GetPeerID() + go func() { defer e.globalProverSyncInProgress.Store(false) @@ -1074,7 +1085,7 @@ func (e *AppConsensusEngine) triggerGlobalHypersync(proposer []byte, expectedRoo L2: intrinsics.GLOBAL_INTRINSIC_ADDRESS, } - e.syncProvider.HyperSync(ctx, proposer, shardKey, nil, expectedRoot) + e.syncProvider.HyperSyncSelf(ctx, selfPeerID, shardKey, nil, expectedRoot) if err := e.proverRegistry.Refresh(); err != nil { e.logger.Warn( "failed to refresh prover registry after hypersync", diff --git a/node/consensus/global/global_consensus_engine.go b/node/consensus/global/global_consensus_engine.go index 93fa8a4..ace72b6 100644 --- a/node/consensus/global/global_consensus_engine.go +++ b/node/consensus/global/global_consensus_engine.go @@ -1028,6 +1028,13 @@ func NewGlobalConsensusEngine( ) } + // Set self peer ID on hypergraph to allow unlimited self-sync sessions + if hgWithSelfPeer, ok := engine.hyperSync.(interface { + SetSelfPeerID(string) + }); ok { + hgWithSelfPeer.SetSelfPeerID(ps.GetPeerID().String()) + } + return engine, nil } diff --git a/node/consensus/sync/sync_provider.go b/node/consensus/sync/sync_provider.go index a12fe26..a90885f 100644 --- a/node/consensus/sync/sync_provider.go +++ b/node/consensus/sync/sync_provider.go @@ -448,6 +448,77 @@ func (p *SyncProvider[StateT, ProposalT]) HyperSync( } } +// HyperSyncSelf syncs from our own master node using our peer ID. +// This is used by workers to sync global prover state from their master +// instead of burdening the proposer. +func (p *SyncProvider[StateT, ProposalT]) HyperSyncSelf( + ctx context.Context, + selfPeerID peer.ID, + shardKey tries.ShardKey, + filter []byte, + expectedRoot []byte, +) { + info := p.peerInfoManager.GetPeerInfo([]byte(selfPeerID)) + if info == nil { + p.logger.Debug( + "no peer info for self, skipping self-sync", + zap.String("peer", selfPeerID.String()), + ) + return + } + if len(info.Reachability) == 0 { + p.logger.Debug( + "no reachability info for self, skipping self-sync", + zap.String("peer", selfPeerID.String()), + ) + return + } + + phaseSyncs := []func( + protobufs.HypergraphComparisonService_HyperStreamClient, + tries.ShardKey, + []byte, + ){ + p.hyperSyncVertexAdds, + p.hyperSyncVertexRemoves, + p.hyperSyncHyperedgeAdds, + p.hyperSyncHyperedgeRemoves, + } + + for _, reachability := range info.Reachability { + if !bytes.Equal(reachability.Filter, filter) { + continue + } + for _, s := range reachability.StreamMultiaddrs { + for _, syncPhase := range phaseSyncs { + ch, err := p.getDirectChannel([]byte(selfPeerID), s) + if err != nil { + p.logger.Debug( + "could not establish direct channel for self-sync, trying next multiaddr", + zap.String("peer", selfPeerID.String()), + zap.String("multiaddr", s), + zap.Error(err), + ) + continue + } + + client := protobufs.NewHypergraphComparisonServiceClient(ch) + str, err := client.HyperStream(ctx) + if err != nil { + p.logger.Error("error from self-sync", zap.Error(err)) + return + } + + syncPhase(str, shardKey, expectedRoot) + if cerr := ch.Close(); cerr != nil { + p.logger.Error("error while closing connection", zap.Error(cerr)) + } + } + } + break + } +} + func (p *SyncProvider[StateT, ProposalT]) hyperSyncVertexAdds( str protobufs.HypergraphComparisonService_HyperStreamClient, shardKey tries.ShardKey, diff --git a/types/hypergraph/sync.go b/types/hypergraph/sync.go index a58a8c8..eb39b9f 100644 --- a/types/hypergraph/sync.go +++ b/types/hypergraph/sync.go @@ -16,6 +16,7 @@ type SyncController struct { syncStatus map[string]*SyncInfo maxActiveSessions int32 activeSessions atomic.Int32 + selfPeerID string } func (s *SyncController) TryEstablishSyncSession(peerID string) bool { @@ -25,10 +26,13 @@ func (s *SyncController) TryEstablishSyncSession(peerID string) bool { info := s.getOrCreate(peerID) - // Try to increment peer's session count (up to maxSessionsPerPeer) + // Allow unlimited sessions from self (our own workers syncing to master) + isSelf := s.selfPeerID != "" && peerID == s.selfPeerID + + // Try to increment peer's session count (up to maxSessionsPerPeer, unless self) for { current := info.activeSessions.Load() - if current >= maxSessionsPerPeer { + if !isSelf && current >= maxSessionsPerPeer { return false } if info.activeSessions.CompareAndSwap(current, current+1) { @@ -36,7 +40,8 @@ func (s *SyncController) TryEstablishSyncSession(peerID string) bool { } } - if !s.incrementActiveSessions() { + // Skip global session limit for self-sync + if !isSelf && !s.incrementActiveSessions() { info.activeSessions.Add(-1) return false } @@ -55,6 +60,8 @@ func (s *SyncController) EndSyncSession(peerID string) { return } + isSelf := s.selfPeerID != "" && peerID == s.selfPeerID + s.statusMu.RLock() info := s.syncStatus[peerID] s.statusMu.RUnlock() @@ -66,7 +73,10 @@ func (s *SyncController) EndSyncSession(peerID string) { return } if info.activeSessions.CompareAndSwap(current, current-1) { - s.decrementActiveSessions() + // Only decrement global counter for non-self sessions + if !isSelf { + s.decrementActiveSessions() + } return } } @@ -122,6 +132,12 @@ func NewSyncController(maxActiveSessions int) *SyncController { } } +// SetSelfPeerID sets the self peer ID for the controller. Sessions from this +// peer ID are allowed unlimited concurrency (for workers syncing to master). +func (s *SyncController) SetSelfPeerID(peerID string) { + s.selfPeerID = peerID +} + func (s *SyncController) incrementActiveSessions() bool { if s.maxActiveSessions <= 0 { return true