allow local sync, use it for provers with workers

This commit is contained in:
Cassandra Heart 2025-12-22 11:59:24 -06:00
parent 8dbe94e8dc
commit 9b2f593275
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
4 changed files with 112 additions and 7 deletions

View File

@ -940,6 +940,13 @@ func NewAppConsensusEngine(
)
}
// Set self peer ID on hypergraph to allow unlimited self-sync sessions
if hgWithSelfPeer, ok := engine.hyperSync.(interface {
SetSelfPeerID(string)
}); ok {
hgWithSelfPeer.SetSelfPeerID(ps.GetPeerID().String())
}
return engine, nil
}
@ -1050,8 +1057,8 @@ func (e *AppConsensusEngine) computeLocalGlobalProverRoot(
}
func (e *AppConsensusEngine) triggerGlobalHypersync(proposer []byte, expectedRoot []byte) {
if e.syncProvider == nil || len(proposer) == 0 {
e.logger.Debug("no sync provider or proposer for hypersync")
if e.syncProvider == nil {
e.logger.Debug("no sync provider for hypersync")
return
}
if bytes.Equal(proposer, e.proverAddress) {
@ -1063,6 +1070,10 @@ func (e *AppConsensusEngine) triggerGlobalHypersync(proposer []byte, expectedRoo
return
}
// Sync from our own master node instead of the proposer to avoid
// overburdening the proposer with sync requests from all workers.
selfPeerID := e.pubsub.GetPeerID()
go func() {
defer e.globalProverSyncInProgress.Store(false)
@ -1074,7 +1085,7 @@ func (e *AppConsensusEngine) triggerGlobalHypersync(proposer []byte, expectedRoo
L2: intrinsics.GLOBAL_INTRINSIC_ADDRESS,
}
e.syncProvider.HyperSync(ctx, proposer, shardKey, nil, expectedRoot)
e.syncProvider.HyperSyncSelf(ctx, selfPeerID, shardKey, nil, expectedRoot)
if err := e.proverRegistry.Refresh(); err != nil {
e.logger.Warn(
"failed to refresh prover registry after hypersync",

View File

@ -1028,6 +1028,13 @@ func NewGlobalConsensusEngine(
)
}
// Set self peer ID on hypergraph to allow unlimited self-sync sessions
if hgWithSelfPeer, ok := engine.hyperSync.(interface {
SetSelfPeerID(string)
}); ok {
hgWithSelfPeer.SetSelfPeerID(ps.GetPeerID().String())
}
return engine, nil
}

View File

@ -448,6 +448,77 @@ func (p *SyncProvider[StateT, ProposalT]) HyperSync(
}
}
// HyperSyncSelf syncs from our own master node using our peer ID.
// This is used by workers to sync global prover state from their master
// instead of burdening the proposer.
func (p *SyncProvider[StateT, ProposalT]) HyperSyncSelf(
ctx context.Context,
selfPeerID peer.ID,
shardKey tries.ShardKey,
filter []byte,
expectedRoot []byte,
) {
info := p.peerInfoManager.GetPeerInfo([]byte(selfPeerID))
if info == nil {
p.logger.Debug(
"no peer info for self, skipping self-sync",
zap.String("peer", selfPeerID.String()),
)
return
}
if len(info.Reachability) == 0 {
p.logger.Debug(
"no reachability info for self, skipping self-sync",
zap.String("peer", selfPeerID.String()),
)
return
}
phaseSyncs := []func(
protobufs.HypergraphComparisonService_HyperStreamClient,
tries.ShardKey,
[]byte,
){
p.hyperSyncVertexAdds,
p.hyperSyncVertexRemoves,
p.hyperSyncHyperedgeAdds,
p.hyperSyncHyperedgeRemoves,
}
for _, reachability := range info.Reachability {
if !bytes.Equal(reachability.Filter, filter) {
continue
}
for _, s := range reachability.StreamMultiaddrs {
for _, syncPhase := range phaseSyncs {
ch, err := p.getDirectChannel([]byte(selfPeerID), s)
if err != nil {
p.logger.Debug(
"could not establish direct channel for self-sync, trying next multiaddr",
zap.String("peer", selfPeerID.String()),
zap.String("multiaddr", s),
zap.Error(err),
)
continue
}
client := protobufs.NewHypergraphComparisonServiceClient(ch)
str, err := client.HyperStream(ctx)
if err != nil {
p.logger.Error("error from self-sync", zap.Error(err))
return
}
syncPhase(str, shardKey, expectedRoot)
if cerr := ch.Close(); cerr != nil {
p.logger.Error("error while closing connection", zap.Error(cerr))
}
}
}
break
}
}
func (p *SyncProvider[StateT, ProposalT]) hyperSyncVertexAdds(
str protobufs.HypergraphComparisonService_HyperStreamClient,
shardKey tries.ShardKey,

View File

@ -16,6 +16,7 @@ type SyncController struct {
syncStatus map[string]*SyncInfo
maxActiveSessions int32
activeSessions atomic.Int32
selfPeerID string
}
func (s *SyncController) TryEstablishSyncSession(peerID string) bool {
@ -25,10 +26,13 @@ func (s *SyncController) TryEstablishSyncSession(peerID string) bool {
info := s.getOrCreate(peerID)
// Try to increment peer's session count (up to maxSessionsPerPeer)
// Allow unlimited sessions from self (our own workers syncing to master)
isSelf := s.selfPeerID != "" && peerID == s.selfPeerID
// Try to increment peer's session count (up to maxSessionsPerPeer, unless self)
for {
current := info.activeSessions.Load()
if current >= maxSessionsPerPeer {
if !isSelf && current >= maxSessionsPerPeer {
return false
}
if info.activeSessions.CompareAndSwap(current, current+1) {
@ -36,7 +40,8 @@ func (s *SyncController) TryEstablishSyncSession(peerID string) bool {
}
}
if !s.incrementActiveSessions() {
// Skip global session limit for self-sync
if !isSelf && !s.incrementActiveSessions() {
info.activeSessions.Add(-1)
return false
}
@ -55,6 +60,8 @@ func (s *SyncController) EndSyncSession(peerID string) {
return
}
isSelf := s.selfPeerID != "" && peerID == s.selfPeerID
s.statusMu.RLock()
info := s.syncStatus[peerID]
s.statusMu.RUnlock()
@ -66,7 +73,10 @@ func (s *SyncController) EndSyncSession(peerID string) {
return
}
if info.activeSessions.CompareAndSwap(current, current-1) {
s.decrementActiveSessions()
// Only decrement global counter for non-self sessions
if !isSelf {
s.decrementActiveSessions()
}
return
}
}
@ -122,6 +132,12 @@ func NewSyncController(maxActiveSessions int) *SyncController {
}
}
// SetSelfPeerID sets the self peer ID for the controller. Sessions from this
// peer ID are allowed unlimited concurrency (for workers syncing to master).
func (s *SyncController) SetSelfPeerID(peerID string) {
s.selfPeerID = peerID
}
func (s *SyncController) incrementActiveSessions() bool {
if s.maxActiveSessions <= 0 {
return true