small tweaks to sync

This commit is contained in:
Cassandra Heart 2025-12-22 09:05:56 -06:00
parent 6c6480e84a
commit 8dbe94e8dc
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
4 changed files with 266 additions and 34 deletions

View File

@ -28,6 +28,14 @@ func (hg *HypergraphCRDT) HyperStream(
ctx, shutdownCancel := hg.contextWithShutdown(requestCtx)
defer shutdownCancel()
// Apply session-level timeout only if we have a shutdown context
// (i.e., in production, not in tests without shutdown context)
if hg.shutdownCtx != nil {
var timeoutCancel context.CancelFunc
ctx, timeoutCancel = context.WithTimeout(ctx, maxSyncSessionDuration)
defer timeoutCancel()
}
sessionLogger := hg.logger
sessionStart := time.Now()
defer func() {
@ -60,6 +68,13 @@ func (hg *HypergraphCRDT) HyperStream(
hg.syncController.EndSyncSession(peerKey)
}()
// Start idle timeout monitor (only if shutdownCtx is available, i.e., in production)
if hg.shutdownCtx != nil {
idleCtx, idleCancel := context.WithCancel(ctx)
defer idleCancel()
go hg.monitorSyncSessionIdle(idleCtx, peerKey, sessionLogger, shutdownCancel)
}
syncStart := time.Now()
err = hg.syncTreeServer(ctx, stream, sessionLogger)
sessionLogger.Info(
@ -76,6 +91,35 @@ func (hg *HypergraphCRDT) HyperStream(
return err
}
// monitorSyncSessionIdle periodically checks if the sync session has become idle
// and cancels the context if so.
func (hg *HypergraphCRDT) monitorSyncSessionIdle(
ctx context.Context,
peerKey string,
logger *zap.Logger,
cancelFunc context.CancelFunc,
) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if hg.syncController.IsSessionStale(peerKey, maxSyncSessionDuration, syncIdleTimeout) {
logger.Warn(
"sync session idle timeout - forcing termination",
zap.String("peer_id", peerKey),
zap.Duration("session_duration", hg.syncController.SessionDuration(peerKey)),
)
cancelFunc()
return
}
}
}
}
// Sync performs the tree diff and synchronization from the client side.
// The caller (e.g. the client) must initiate the diff from its root.
// After that, both sides exchange queries, branch info, and leaf updates until
@ -514,6 +558,11 @@ type streamManager struct {
snapshot *snapshotHandle
}
// updateActivity updates the last activity timestamp for the stream.
func (s *streamManager) updateActivity() {
s.lastSent = time.Now()
}
type rawVertexSaver interface {
SaveVertexTreeRaw(
txn tries.TreeBackingStoreTransaction,
@ -534,6 +583,10 @@ const (
leafAckMaxTimeout = 10 * time.Minute
leafAckPerLeafBudget = 20 * time.Millisecond // Generous budget for tree building overhead
pruneTxnChunk = 100
// Session-level timeouts
maxSyncSessionDuration = 15 * time.Minute // Maximum total time for a sync session
syncIdleTimeout = 5 * time.Minute // Maximum time without activity before session is killed
)
func leafAckTimeout(count uint64) time.Duration {
@ -693,6 +746,10 @@ func (s *streamManager) rawShardSync(
}
sent++
// Update activity periodically to prevent idle timeout
if sent%100 == 0 {
s.updateActivity()
}
if sent%1000 == 0 {
s.logger.Debug(
"SERVER: raw sync progress",
@ -1078,7 +1135,7 @@ func (s *streamManager) sendLeafData(
return errors.Wrap(err, "send leaf data")
}
s.lastSent = time.Now()
s.updateActivity()
return nil
}
@ -1545,6 +1602,8 @@ func (s *streamManager) queryNext(
return nil, err
}
s.updateActivity()
select {
case <-s.ctx.Done():
return nil, errors.Wrap(
@ -1558,6 +1617,7 @@ func (s *streamManager) queryNext(
"handle query",
)
}
s.updateActivity()
resp = r
return resp, nil
case <-time.After(30 * time.Second):
@ -1935,6 +1995,7 @@ func (s *streamManager) handleQueryNext(
return nil, errors.Wrap(err, "handle query next")
}
s.updateActivity()
branch = branchInfo
return branch, nil
case <-time.After(30 * time.Second):
@ -1979,6 +2040,8 @@ func (s *streamManager) descendIndex(
return nil, nil, errors.Wrap(err, "descend index")
}
s.updateActivity()
select {
case <-s.ctx.Done():
return nil, nil, errors.Wrap(
@ -2004,6 +2067,7 @@ func (s *streamManager) descendIndex(
)
}
s.updateActivity()
local = branchInfo
remote = r
return local, remote, nil

View File

@ -679,10 +679,10 @@ func TestHypergraphSyncWithConcurrentCommits(t *testing.T) {
}
start := time.Now()
dataTrees := make([]*tries.VectorCommitmentTree, 10000)
dataTrees := make([]*tries.VectorCommitmentTree, 1000)
eg := errgroup.Group{}
eg.SetLimit(10000)
for i := 0; i < 10000; i++ {
eg.SetLimit(1000)
for i := 0; i < 1000; i++ {
eg.Go(func() error {
dataTrees[i] = buildDataTree(t, inclusionProver)
return nil
@ -922,6 +922,8 @@ func TestHypergraphSyncWithConcurrentCommits(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
serverRoot := serverHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
// Publish the server's snapshot so clients can sync against this exact state
serverHG.PublishSnapshot(serverRoot)
for i := 0; i < 1; i++ {
go func(idx int) {
defer wg.Done()
@ -941,7 +943,7 @@ func TestHypergraphSyncWithConcurrentCommits(t *testing.T) {
stream,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
serverRoot,
)
require.NoError(t, err)
require.NoError(t, stream.CloseSend())

View File

@ -6,10 +6,14 @@ import (
"time"
)
// maxSessionsPerPeer is the maximum number of concurrent sync sessions
// allowed from a single peer.
const maxSessionsPerPeer = 10
type SyncController struct {
globalSync atomic.Bool
statusMu sync.RWMutex
syncStatus map[string]*SyncInfo
globalSync atomic.Bool
statusMu sync.RWMutex
syncStatus map[string]*SyncInfo
maxActiveSessions int32
activeSessions atomic.Int32
}
@ -20,15 +24,28 @@ func (s *SyncController) TryEstablishSyncSession(peerID string) bool {
}
info := s.getOrCreate(peerID)
if info.inProgress.Swap(true) {
return false
// Try to increment peer's session count (up to maxSessionsPerPeer)
for {
current := info.activeSessions.Load()
if current >= maxSessionsPerPeer {
return false
}
if info.activeSessions.CompareAndSwap(current, current+1) {
break
}
}
if !s.incrementActiveSessions() {
info.inProgress.Store(false)
info.activeSessions.Add(-1)
return false
}
// Record session start time for staleness detection
now := time.Now().UnixNano()
info.lastStartedAt.Store(now)
info.lastActivity.Store(now)
return true
}
@ -42,8 +59,16 @@ func (s *SyncController) EndSyncSession(peerID string) {
info := s.syncStatus[peerID]
s.statusMu.RUnlock()
if info != nil {
if info.inProgress.Swap(false) {
s.decrementActiveSessions()
// Decrement peer's session count
for {
current := info.activeSessions.Load()
if current <= 0 {
return
}
if info.activeSessions.CompareAndSwap(current, current-1) {
s.decrementActiveSessions()
return
}
}
}
}
@ -79,9 +104,11 @@ func (s *SyncController) getOrCreate(peerID string) *SyncInfo {
}
type SyncInfo struct {
Unreachable bool
LastSynced time.Time
inProgress atomic.Bool
Unreachable bool
LastSynced time.Time
activeSessions atomic.Int32 // Number of active sessions for this peer
lastStartedAt atomic.Int64 // Unix nano timestamp when most recent session started
lastActivity atomic.Int64 // Unix nano timestamp of last activity
}
func NewSyncController(maxActiveSessions int) *SyncController {
@ -126,3 +153,156 @@ func (s *SyncController) decrementActiveSessions() {
}
}
}
// UpdateActivity updates the last activity timestamp for a peer's sync session.
// This should be called periodically during sync to prevent idle timeout.
func (s *SyncController) UpdateActivity(peerID string) {
if peerID == "" {
return
}
s.statusMu.RLock()
info := s.syncStatus[peerID]
s.statusMu.RUnlock()
if info != nil && info.activeSessions.Load() > 0 {
info.lastActivity.Store(time.Now().UnixNano())
}
}
// IsSessionStale checks if a peer's sessions have exceeded the maximum duration or idle timeout.
// maxDuration is the maximum total duration for a sync session.
// idleTimeout is the maximum time without activity before sessions are considered stale.
func (s *SyncController) IsSessionStale(peerID string, maxDuration, idleTimeout time.Duration) bool {
if peerID == "" {
return false
}
s.statusMu.RLock()
info := s.syncStatus[peerID]
s.statusMu.RUnlock()
if info == nil || info.activeSessions.Load() <= 0 {
return false
}
now := time.Now().UnixNano()
startedAt := info.lastStartedAt.Load()
lastActivity := info.lastActivity.Load()
// Check if session has exceeded maximum duration
if startedAt > 0 && time.Duration(now-startedAt) > maxDuration {
return true
}
// Check if session has been idle too long
if lastActivity > 0 && time.Duration(now-lastActivity) > idleTimeout {
return true
}
return false
}
// ForceEndSession forcibly ends all sync sessions for a peer, used for cleaning up stale sessions.
// Returns true if any sessions were ended.
func (s *SyncController) ForceEndSession(peerID string) bool {
if peerID == "" {
return false
}
s.statusMu.RLock()
info := s.syncStatus[peerID]
s.statusMu.RUnlock()
if info == nil {
return false
}
// End all sessions for this peer
for {
current := info.activeSessions.Load()
if current <= 0 {
return false
}
if info.activeSessions.CompareAndSwap(current, 0) {
// Decrement global counter by the number of sessions we ended
for i := int32(0); i < current; i++ {
s.decrementActiveSessions()
}
return true
}
}
}
// CleanupStaleSessions finds and forcibly ends all stale sync sessions.
// Returns the list of peer IDs that were cleaned up.
func (s *SyncController) CleanupStaleSessions(maxDuration, idleTimeout time.Duration) []string {
var stale []string
s.statusMu.RLock()
for peerID, info := range s.syncStatus {
if info == nil || info.activeSessions.Load() <= 0 {
continue
}
now := time.Now().UnixNano()
startedAt := info.lastStartedAt.Load()
lastActivity := info.lastActivity.Load()
if startedAt > 0 && time.Duration(now-startedAt) > maxDuration {
stale = append(stale, peerID)
continue
}
if lastActivity > 0 && time.Duration(now-lastActivity) > idleTimeout {
stale = append(stale, peerID)
}
}
s.statusMu.RUnlock()
for _, peerID := range stale {
s.ForceEndSession(peerID)
}
return stale
}
// SessionDuration returns how long since the most recent session started.
// Returns 0 if there are no active sessions.
func (s *SyncController) SessionDuration(peerID string) time.Duration {
if peerID == "" {
return 0
}
s.statusMu.RLock()
info := s.syncStatus[peerID]
s.statusMu.RUnlock()
if info == nil || info.activeSessions.Load() <= 0 {
return 0
}
startedAt := info.lastStartedAt.Load()
if startedAt == 0 {
return 0
}
return time.Duration(time.Now().UnixNano() - startedAt)
}
// ActiveSessionCount returns the number of active sync sessions for a peer.
func (s *SyncController) ActiveSessionCount(peerID string) int32 {
if peerID == "" {
return 0
}
s.statusMu.RLock()
info := s.syncStatus[peerID]
s.statusMu.RUnlock()
if info == nil {
return 0
}
return info.activeSessions.Load()
}

View File

@ -2045,25 +2045,11 @@ func (t *LazyVectorCommitmentTree) Delete(
mergedPrefix = append(mergedPrefix, lastChildIndex)
mergedPrefix = append(mergedPrefix, childBranch.Prefix...)
// Delete the child node from its old location before updating
err := t.Store.DeleteNode(
txn,
t.SetType,
t.PhaseType,
t.ShardKey,
generateKeyFromPath(childBranch.FullPrefix),
childBranch.FullPrefix,
)
if err != nil {
log.Panic("failed to delete old child path", zap.Error(err))
}
childBranch.Prefix = mergedPrefix
childBranch.FullPrefix = n.FullPrefix // Update to parent's position
childBranch.Commitment = nil
// Delete this node from storage
err = t.Store.DeleteNode(
err := t.Store.DeleteNode(
txn,
t.SetType,
t.PhaseType,
@ -2075,14 +2061,14 @@ func (t *LazyVectorCommitmentTree) Delete(
log.Panic("failed to delete path", zap.Error(err))
}
// Insert the merged child at the parent's path
// Insert the merged child at this path
err = t.Store.InsertNode(
txn,
t.SetType,
t.PhaseType,
t.ShardKey,
generateKeyFromPath(childBranch.FullPrefix),
childBranch.FullPrefix,
generateKeyFromPath(n.FullPrefix),
n.FullPrefix,
childBranch,
)
if err != nil {