diff --git a/hypergraph/sync.go b/hypergraph/sync.go index 6d16d4f..265b3c0 100644 --- a/hypergraph/sync.go +++ b/hypergraph/sync.go @@ -28,6 +28,14 @@ func (hg *HypergraphCRDT) HyperStream( ctx, shutdownCancel := hg.contextWithShutdown(requestCtx) defer shutdownCancel() + // Apply session-level timeout only if we have a shutdown context + // (i.e., in production, not in tests without shutdown context) + if hg.shutdownCtx != nil { + var timeoutCancel context.CancelFunc + ctx, timeoutCancel = context.WithTimeout(ctx, maxSyncSessionDuration) + defer timeoutCancel() + } + sessionLogger := hg.logger sessionStart := time.Now() defer func() { @@ -60,6 +68,13 @@ func (hg *HypergraphCRDT) HyperStream( hg.syncController.EndSyncSession(peerKey) }() + // Start idle timeout monitor (only if shutdownCtx is available, i.e., in production) + if hg.shutdownCtx != nil { + idleCtx, idleCancel := context.WithCancel(ctx) + defer idleCancel() + go hg.monitorSyncSessionIdle(idleCtx, peerKey, sessionLogger, shutdownCancel) + } + syncStart := time.Now() err = hg.syncTreeServer(ctx, stream, sessionLogger) sessionLogger.Info( @@ -76,6 +91,35 @@ func (hg *HypergraphCRDT) HyperStream( return err } +// monitorSyncSessionIdle periodically checks if the sync session has become idle +// and cancels the context if so. +func (hg *HypergraphCRDT) monitorSyncSessionIdle( + ctx context.Context, + peerKey string, + logger *zap.Logger, + cancelFunc context.CancelFunc, +) { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if hg.syncController.IsSessionStale(peerKey, maxSyncSessionDuration, syncIdleTimeout) { + logger.Warn( + "sync session idle timeout - forcing termination", + zap.String("peer_id", peerKey), + zap.Duration("session_duration", hg.syncController.SessionDuration(peerKey)), + ) + cancelFunc() + return + } + } + } +} + // Sync performs the tree diff and synchronization from the client side. // The caller (e.g. the client) must initiate the diff from its root. // After that, both sides exchange queries, branch info, and leaf updates until @@ -514,6 +558,11 @@ type streamManager struct { snapshot *snapshotHandle } +// updateActivity updates the last activity timestamp for the stream. +func (s *streamManager) updateActivity() { + s.lastSent = time.Now() +} + type rawVertexSaver interface { SaveVertexTreeRaw( txn tries.TreeBackingStoreTransaction, @@ -534,6 +583,10 @@ const ( leafAckMaxTimeout = 10 * time.Minute leafAckPerLeafBudget = 20 * time.Millisecond // Generous budget for tree building overhead pruneTxnChunk = 100 + + // Session-level timeouts + maxSyncSessionDuration = 15 * time.Minute // Maximum total time for a sync session + syncIdleTimeout = 5 * time.Minute // Maximum time without activity before session is killed ) func leafAckTimeout(count uint64) time.Duration { @@ -693,6 +746,10 @@ func (s *streamManager) rawShardSync( } sent++ + // Update activity periodically to prevent idle timeout + if sent%100 == 0 { + s.updateActivity() + } if sent%1000 == 0 { s.logger.Debug( "SERVER: raw sync progress", @@ -1078,7 +1135,7 @@ func (s *streamManager) sendLeafData( return errors.Wrap(err, "send leaf data") } - s.lastSent = time.Now() + s.updateActivity() return nil } @@ -1545,6 +1602,8 @@ func (s *streamManager) queryNext( return nil, err } + s.updateActivity() + select { case <-s.ctx.Done(): return nil, errors.Wrap( @@ -1558,6 +1617,7 @@ func (s *streamManager) queryNext( "handle query", ) } + s.updateActivity() resp = r return resp, nil case <-time.After(30 * time.Second): @@ -1935,6 +1995,7 @@ func (s *streamManager) handleQueryNext( return nil, errors.Wrap(err, "handle query next") } + s.updateActivity() branch = branchInfo return branch, nil case <-time.After(30 * time.Second): @@ -1979,6 +2040,8 @@ func (s *streamManager) descendIndex( return nil, nil, errors.Wrap(err, "descend index") } + s.updateActivity() + select { case <-s.ctx.Done(): return nil, nil, errors.Wrap( @@ -2004,6 +2067,7 @@ func (s *streamManager) descendIndex( ) } + s.updateActivity() local = branchInfo remote = r return local, remote, nil diff --git a/node/rpc/hypergraph_sync_rpc_server_test.go b/node/rpc/hypergraph_sync_rpc_server_test.go index 38ccf86..e5cd4b0 100644 --- a/node/rpc/hypergraph_sync_rpc_server_test.go +++ b/node/rpc/hypergraph_sync_rpc_server_test.go @@ -679,10 +679,10 @@ func TestHypergraphSyncWithConcurrentCommits(t *testing.T) { } start := time.Now() - dataTrees := make([]*tries.VectorCommitmentTree, 10000) + dataTrees := make([]*tries.VectorCommitmentTree, 1000) eg := errgroup.Group{} - eg.SetLimit(10000) - for i := 0; i < 10000; i++ { + eg.SetLimit(1000) + for i := 0; i < 1000; i++ { eg.Go(func() error { dataTrees[i] = buildDataTree(t, inclusionProver) return nil @@ -922,6 +922,8 @@ func TestHypergraphSyncWithConcurrentCommits(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) serverRoot := serverHG.GetVertexAddsSet(shardKey).GetTree().Commit(false) + // Publish the server's snapshot so clients can sync against this exact state + serverHG.PublishSnapshot(serverRoot) for i := 0; i < 1; i++ { go func(idx int) { defer wg.Done() @@ -941,7 +943,7 @@ func TestHypergraphSyncWithConcurrentCommits(t *testing.T) { stream, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, - nil, + serverRoot, ) require.NoError(t, err) require.NoError(t, stream.CloseSend()) diff --git a/types/hypergraph/sync.go b/types/hypergraph/sync.go index d754a9e..a58a8c8 100644 --- a/types/hypergraph/sync.go +++ b/types/hypergraph/sync.go @@ -6,10 +6,14 @@ import ( "time" ) +// maxSessionsPerPeer is the maximum number of concurrent sync sessions +// allowed from a single peer. +const maxSessionsPerPeer = 10 + type SyncController struct { - globalSync atomic.Bool - statusMu sync.RWMutex - syncStatus map[string]*SyncInfo + globalSync atomic.Bool + statusMu sync.RWMutex + syncStatus map[string]*SyncInfo maxActiveSessions int32 activeSessions atomic.Int32 } @@ -20,15 +24,28 @@ func (s *SyncController) TryEstablishSyncSession(peerID string) bool { } info := s.getOrCreate(peerID) - if info.inProgress.Swap(true) { - return false + + // Try to increment peer's session count (up to maxSessionsPerPeer) + for { + current := info.activeSessions.Load() + if current >= maxSessionsPerPeer { + return false + } + if info.activeSessions.CompareAndSwap(current, current+1) { + break + } } if !s.incrementActiveSessions() { - info.inProgress.Store(false) + info.activeSessions.Add(-1) return false } + // Record session start time for staleness detection + now := time.Now().UnixNano() + info.lastStartedAt.Store(now) + info.lastActivity.Store(now) + return true } @@ -42,8 +59,16 @@ func (s *SyncController) EndSyncSession(peerID string) { info := s.syncStatus[peerID] s.statusMu.RUnlock() if info != nil { - if info.inProgress.Swap(false) { - s.decrementActiveSessions() + // Decrement peer's session count + for { + current := info.activeSessions.Load() + if current <= 0 { + return + } + if info.activeSessions.CompareAndSwap(current, current-1) { + s.decrementActiveSessions() + return + } } } } @@ -79,9 +104,11 @@ func (s *SyncController) getOrCreate(peerID string) *SyncInfo { } type SyncInfo struct { - Unreachable bool - LastSynced time.Time - inProgress atomic.Bool + Unreachable bool + LastSynced time.Time + activeSessions atomic.Int32 // Number of active sessions for this peer + lastStartedAt atomic.Int64 // Unix nano timestamp when most recent session started + lastActivity atomic.Int64 // Unix nano timestamp of last activity } func NewSyncController(maxActiveSessions int) *SyncController { @@ -126,3 +153,156 @@ func (s *SyncController) decrementActiveSessions() { } } } + +// UpdateActivity updates the last activity timestamp for a peer's sync session. +// This should be called periodically during sync to prevent idle timeout. +func (s *SyncController) UpdateActivity(peerID string) { + if peerID == "" { + return + } + + s.statusMu.RLock() + info := s.syncStatus[peerID] + s.statusMu.RUnlock() + + if info != nil && info.activeSessions.Load() > 0 { + info.lastActivity.Store(time.Now().UnixNano()) + } +} + +// IsSessionStale checks if a peer's sessions have exceeded the maximum duration or idle timeout. +// maxDuration is the maximum total duration for a sync session. +// idleTimeout is the maximum time without activity before sessions are considered stale. +func (s *SyncController) IsSessionStale(peerID string, maxDuration, idleTimeout time.Duration) bool { + if peerID == "" { + return false + } + + s.statusMu.RLock() + info := s.syncStatus[peerID] + s.statusMu.RUnlock() + + if info == nil || info.activeSessions.Load() <= 0 { + return false + } + + now := time.Now().UnixNano() + startedAt := info.lastStartedAt.Load() + lastActivity := info.lastActivity.Load() + + // Check if session has exceeded maximum duration + if startedAt > 0 && time.Duration(now-startedAt) > maxDuration { + return true + } + + // Check if session has been idle too long + if lastActivity > 0 && time.Duration(now-lastActivity) > idleTimeout { + return true + } + + return false +} + +// ForceEndSession forcibly ends all sync sessions for a peer, used for cleaning up stale sessions. +// Returns true if any sessions were ended. +func (s *SyncController) ForceEndSession(peerID string) bool { + if peerID == "" { + return false + } + + s.statusMu.RLock() + info := s.syncStatus[peerID] + s.statusMu.RUnlock() + + if info == nil { + return false + } + + // End all sessions for this peer + for { + current := info.activeSessions.Load() + if current <= 0 { + return false + } + if info.activeSessions.CompareAndSwap(current, 0) { + // Decrement global counter by the number of sessions we ended + for i := int32(0); i < current; i++ { + s.decrementActiveSessions() + } + return true + } + } +} + +// CleanupStaleSessions finds and forcibly ends all stale sync sessions. +// Returns the list of peer IDs that were cleaned up. +func (s *SyncController) CleanupStaleSessions(maxDuration, idleTimeout time.Duration) []string { + var stale []string + + s.statusMu.RLock() + for peerID, info := range s.syncStatus { + if info == nil || info.activeSessions.Load() <= 0 { + continue + } + + now := time.Now().UnixNano() + startedAt := info.lastStartedAt.Load() + lastActivity := info.lastActivity.Load() + + if startedAt > 0 && time.Duration(now-startedAt) > maxDuration { + stale = append(stale, peerID) + continue + } + + if lastActivity > 0 && time.Duration(now-lastActivity) > idleTimeout { + stale = append(stale, peerID) + } + } + s.statusMu.RUnlock() + + for _, peerID := range stale { + s.ForceEndSession(peerID) + } + + return stale +} + +// SessionDuration returns how long since the most recent session started. +// Returns 0 if there are no active sessions. +func (s *SyncController) SessionDuration(peerID string) time.Duration { + if peerID == "" { + return 0 + } + + s.statusMu.RLock() + info := s.syncStatus[peerID] + s.statusMu.RUnlock() + + if info == nil || info.activeSessions.Load() <= 0 { + return 0 + } + + startedAt := info.lastStartedAt.Load() + if startedAt == 0 { + return 0 + } + + return time.Duration(time.Now().UnixNano() - startedAt) +} + +// ActiveSessionCount returns the number of active sync sessions for a peer. +func (s *SyncController) ActiveSessionCount(peerID string) int32 { + if peerID == "" { + return 0 + } + + s.statusMu.RLock() + info := s.syncStatus[peerID] + s.statusMu.RUnlock() + + if info == nil { + return 0 + } + + return info.activeSessions.Load() +} diff --git a/types/tries/lazy_proof_tree.go b/types/tries/lazy_proof_tree.go index d7fc85a..b2de5a8 100644 --- a/types/tries/lazy_proof_tree.go +++ b/types/tries/lazy_proof_tree.go @@ -2045,25 +2045,11 @@ func (t *LazyVectorCommitmentTree) Delete( mergedPrefix = append(mergedPrefix, lastChildIndex) mergedPrefix = append(mergedPrefix, childBranch.Prefix...) - // Delete the child node from its old location before updating - err := t.Store.DeleteNode( - txn, - t.SetType, - t.PhaseType, - t.ShardKey, - generateKeyFromPath(childBranch.FullPrefix), - childBranch.FullPrefix, - ) - if err != nil { - log.Panic("failed to delete old child path", zap.Error(err)) - } - childBranch.Prefix = mergedPrefix - childBranch.FullPrefix = n.FullPrefix // Update to parent's position childBranch.Commitment = nil // Delete this node from storage - err = t.Store.DeleteNode( + err := t.Store.DeleteNode( txn, t.SetType, t.PhaseType, @@ -2075,14 +2061,14 @@ func (t *LazyVectorCommitmentTree) Delete( log.Panic("failed to delete path", zap.Error(err)) } - // Insert the merged child at the parent's path + // Insert the merged child at this path err = t.Store.InsertNode( txn, t.SetType, t.PhaseType, t.ShardKey, - generateKeyFromPath(childBranch.FullPrefix), - childBranch.FullPrefix, + generateKeyFromPath(n.FullPrefix), + n.FullPrefix, childBranch, ) if err != nil {