package time import ( "bytes" "context" "encoding/hex" "fmt" "math/big" "sync" "time" lru "github.com/hashicorp/golang-lru/v2" "github.com/iden3/go-iden3-crypto/poseidon" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "source.quilibrium.com/quilibrium/monorepo/lifecycle" "source.quilibrium.com/quilibrium/monorepo/protobufs" "source.quilibrium.com/quilibrium/monorepo/types/consensus" "source.quilibrium.com/quilibrium/monorepo/types/store" ) const ( // Default cache size for LRU defaultGlobalCacheSize = 10000 // Maximum tree depth to prevent unbounded growth maxGlobalTreeDepth = 10 ) // TimeReelEventType represents different types of events that can occur in a // time reel type TimeReelEventType int const ( TimeReelEventNewHead TimeReelEventType = iota TimeReelEventForkDetected TimeReelEventEquivocationDetected ) // GlobalEvent represents an event in the global time reel type GlobalEvent struct { Type TimeReelEventType Frame *protobufs.GlobalFrame OldHead *protobufs.GlobalFrame // For fork events Message string } func (n *GlobalEvent) ControlEventData() {} // GlobalFrameNode represents a node in the global frame tree type GlobalFrameNode struct { Frame *protobufs.GlobalFrame Parent *GlobalFrameNode Children map[string]*GlobalFrameNode Depth uint64 } // GlobalPendingFrame represents a frame waiting for its parent type GlobalPendingFrame struct { Frame *protobufs.GlobalFrame ParentSelector []byte Timestamp int64 } // GlobalTimeReel implements a time reel for GlobalFrames type GlobalTimeReel struct { logger *zap.Logger proverRegistry consensus.ProverRegistry mu sync.RWMutex // Tree structure root *GlobalFrameNode // Fast lookup by frame ID (output hash) nodes map[string]*GlobalFrameNode // Fast lookup by frame number framesByNumber map[uint64][]*GlobalFrameNode // Current head of canonical chain head *GlobalFrameNode // Pending frames waiting for parents pendingFrames map[string][]*GlobalPendingFrame // Fork choice parameters forkChoiceParams Params // LRU cache for frame lookups cache *lru.Cache[string, *GlobalFrameNode] // Underlying frame store store store.ClockStore // Event channel with guaranteed delivery eventCh chan GlobalEvent eventDone chan struct{} // Signals event processing complete // Equivocator tracking: frame number -> bit positions that equivocated equivocators map[uint64]map[int]bool // Materialize side effects materializeFunc func( txn store.Transaction, frame *protobufs.GlobalFrame, ) error // Revert side effects revertFunc func( txn store.Transaction, frameNumber uint64, requests []*protobufs.MessageBundle, ) error // Control ctx context.Context // Network-specific consensus toggles genesisFrameNumber uint64 // Archive mode: whether to hold historic frame data archiveMode bool } // NewGlobalTimeReel creates a new global time reel func NewGlobalTimeReel( logger *zap.Logger, proverRegistry consensus.ProverRegistry, clockStore store.ClockStore, network uint8, archiveMode bool, ) (*GlobalTimeReel, error) { cache, err := lru.New[string, *GlobalFrameNode]( defaultGlobalCacheSize, ) if err != nil { return nil, errors.Wrap(err, "new global time reel") } genesisFrameNumber := uint64(0) if network == 0 { genesisFrameNumber = 244200 } return &GlobalTimeReel{ logger: logger, proverRegistry: proverRegistry, store: clockStore, nodes: make(map[string]*GlobalFrameNode), framesByNumber: make(map[uint64][]*GlobalFrameNode), pendingFrames: make(map[string][]*GlobalPendingFrame), forkChoiceParams: DefaultForkChoiceParams, cache: cache, eventCh: make(chan GlobalEvent, 1000), eventDone: make(chan struct{}), equivocators: make(map[uint64]map[int]bool), materializeFunc: func( txn store.Transaction, frame *protobufs.GlobalFrame, ) error { return nil }, revertFunc: func( txn store.Transaction, frameNumber uint64, requests []*protobufs.MessageBundle, ) error { return nil }, genesisFrameNumber: genesisFrameNumber, archiveMode: archiveMode, }, nil } // SetMaterializeFunc sets the materialize side effects function func (g *GlobalTimeReel) SetMaterializeFunc( materializeFunc func( txn store.Transaction, frame *protobufs.GlobalFrame, ) error, ) { g.materializeFunc = materializeFunc } // SetRevertFunc sets the revert side effects function func (g *GlobalTimeReel) SetRevertFunc( revertFunc func( txn store.Transaction, frameNumber uint64, requests []*protobufs.MessageBundle, ) error, ) { g.revertFunc = revertFunc } // Start starts the global time reel func (g *GlobalTimeReel) Start( ctx lifecycle.SignalerContext, ready lifecycle.ReadyFunc, ) { g.ctx = ctx g.logger.Info("starting global time reel") // Warm the in-memory tree/cache from store. if err := g.bootstrapFromStore(); err != nil { g.logger.Error("could not bootstrap from store", zap.Error(err)) ctx.Throw(err) return } ready() <-ctx.Done() g.logger.Info("stopping global time reel") close(g.eventCh) close(g.eventDone) } // sendEvent sends an event with guaranteed delivery func (g *GlobalTimeReel) sendEvent(event GlobalEvent) { // prioritize halts select { case <-g.ctx.Done(): return default: } // This blocks until the event is delivered or halted, guaranteeing order select { case <-g.ctx.Done(): return case g.eventCh <- event: g.logger.Debug( "sent event", zap.Int("type", int(event.Type)), zap.Uint64("frame_number", event.Frame.Header.FrameNumber), zap.String("id", g.ComputeFrameID(event.Frame)), ) } } // Insert inserts a global frame header into the tree structure (non-blocking) func (g *GlobalTimeReel) Insert( frame *protobufs.GlobalFrame, ) error { // Start timing timer := prometheus.NewTimer( frameProcessingDuration.WithLabelValues("global"), ) defer timer.ObserveDuration() g.mu.Lock() defer g.mu.Unlock() // Compute frame ID frameID := g.ComputeFrameID(frame) // Check if frame already exists with same ID if _, exists := g.nodes[frameID]; exists { return nil } // Check for equivocation with different frames at same height nodesAtHeight := g.framesByNumber[frame.Header.FrameNumber] for _, node := range nodesAtHeight { if !isEqualGlobalFrame(node.Frame, frame) && hasOverlappingBits(node.Frame, frame) { g.logger.Warn( "equivocation detected for frame", zap.Uint64("frame_number", frame.Header.FrameNumber), ) // Track which bits equivocated if g.equivocators[frame.Header.FrameNumber] == nil { g.equivocators[frame.Header.FrameNumber] = make(map[int]bool) } // Find overlapping bits and mark them as equivocators existingBits := node.Frame.Header.PublicKeySignatureBls48581 newBits := frame.Header.PublicKeySignatureBls48581 if existingBits != nil && newBits != nil { existingBitmask := existingBits.Bitmask newBitmask := newBits.Bitmask maxLen := len(existingBitmask) if len(newBitmask) > maxLen { maxLen = len(newBitmask) } for i := 0; i < maxLen; i++ { var eByte, nByte byte if i < len(existingBitmask) { eByte = existingBitmask[i] } if i < len(newBitmask) { nByte = newBitmask[i] } overlapping := eByte & nByte for bit := 0; bit < 8; bit++ { if overlapping&(1< g.head.Frame.Header.FrameNumber { // Frame is ahead, add to pending g.addPendingFrame(frame, parentSelector) // Insert frame into tree newNode := &GlobalFrameNode{ Frame: frame, Parent: nil, Children: make(map[string]*GlobalFrameNode), Depth: 1, } // Add to data structures g.nodes[frameID] = newNode g.framesByNumber[frame.Header.FrameNumber] = append( g.framesByNumber[frame.Header.FrameNumber], newNode, ) g.cache.Add(frameID, newNode) // Evaluate fork choice if we have competing branches g.evaluateForkChoice(newNode) return nil } } } // Parent not found, add to pending frames g.addPendingFrame(frame, parentSelector) return nil } // Verify parent selector matches expectedSelector := computeGlobalPoseidonHash(parentNode.Frame.Header.Output) if !bytes.Equal(expectedSelector, frame.Header.ParentSelector) { return errors.New("parent selector mismatch") } // Insert frame into tree newNode := &GlobalFrameNode{ Frame: frame, Parent: parentNode, Children: make(map[string]*GlobalFrameNode), Depth: parentNode.Depth + 1, } // Add to data structures g.nodes[frameID] = newNode g.framesByNumber[frame.Header.FrameNumber] = append( g.framesByNumber[frame.Header.FrameNumber], newNode, ) parentNode.Children[frameID] = newNode g.cache.Add(frameID, newNode) // Process any pending frames that can now be connected g.processPendingFrames(frameID, newNode) // Evaluate fork choice if we have competing branches g.evaluateForkChoice(newNode) // Prune old frames if tree is getting too deep g.pruneOldFrames() // Prune old pending frames periodically g.pruneOldPendingFrames() // Record success framesProcessedTotal.WithLabelValues("global", "success").Inc() // Update tree metrics g.updateTreeMetrics() return nil } // insertGenesisFrame handles genesis frame insertion or pseudo-root in // non-archive mode func (g *GlobalTimeReel) insertGenesisFrame( frame *protobufs.GlobalFrame, frameID string, ) error { if g.root != nil && g.archiveMode { // In archive mode, don't replace existing root return errors.New("genesis frame already exists") } if g.root != nil && !g.archiveMode { // In non-archive mode, check if this frame should replace the current // pseudo-root if frame.Header.FrameNumber >= g.root.Frame.Header.FrameNumber { // This frame is not older than current root, don't replace return errors.New("frame is not older than current root") } // This frame is older, it should become the new pseudo-root g.logger.Info( "non-archive mode: replacing pseudo-root with older frame", zap.Uint64("old_root_frame", g.root.Frame.Header.FrameNumber), zap.Uint64("new_root_frame", frame.Header.FrameNumber), ) } g.root = &GlobalFrameNode{ Frame: frame, Parent: nil, Children: make(map[string]*GlobalFrameNode), Depth: 0, } g.nodes[frameID] = g.root g.framesByNumber[frame.Header.FrameNumber] = []*GlobalFrameNode{g.root} g.head = g.root g.cache.Add(frameID, g.root) // Persist canonical genesis. if err := g.persistCanonicalFrames( []*protobufs.GlobalFrame{frame}, ); err != nil { return errors.Wrap(err, "insert genesis frame") } // Send new head event g.sendEvent(GlobalEvent{ Type: TimeReelEventNewHead, Frame: frame, }) // Process any pending frames that can now be connected g.processPendingFrames(frameID, g.root) // Prune old frames if tree is getting too deep g.pruneOldFrames() return nil } // addPendingFrame adds a frame to the pending list func (g *GlobalTimeReel) addPendingFrame( frame *protobufs.GlobalFrame, parentSelector string, ) { pending := &GlobalPendingFrame{ Frame: frame, ParentSelector: frame.Header.ParentSelector, Timestamp: frame.Header.Timestamp, } g.pendingFrames[parentSelector] = append( g.pendingFrames[parentSelector], pending, ) g.logger.Debug( "added pending frame", zap.Uint64("frame_number", frame.Header.FrameNumber), zap.String( "parent_selector", fmt.Sprintf("%x", frame.Header.ParentSelector), ), ) } // processPendingFrames processes frames that were waiting for the given parent func (g *GlobalTimeReel) processPendingFrames( parentFrameID string, parentNode *GlobalFrameNode, ) { g.logger.Debug( "process pending frame", zap.Uint64("frame_number", parentNode.Frame.Header.FrameNumber), zap.String("id", parentFrameID), ) parentSelector := computeGlobalPoseidonHash(parentNode.Frame.Header.Output) parentSelectorStr := string(parentSelector) pendingList := g.pendingFrames[parentSelectorStr] if len(pendingList) == 0 { return } // Remove from pending list delete(g.pendingFrames, parentSelectorStr) // Process each pending frame for _, pending := range pendingList { frameID := g.ComputeFrameID(pending.Frame) if existing, exists := g.nodes[frameID]; exists { // Re-parent previously pre-inserted orphan if existing.Parent == nil { existing.Parent = parentNode existing.Depth = parentNode.Depth + 1 parentNode.Children[frameID] = existing g.framesByNumber[pending.Frame.Header.FrameNumber] = append( g.framesByNumber[pending.Frame.Header.FrameNumber], existing) g.cache.Add(frameID, existing) g.logger.Debug("reparented pending orphan frame", zap.Uint64("frame_number", pending.Frame.Header.FrameNumber), zap.String("id", frameID), zap.String("parent_id", parentFrameID), ) g.processPendingFrames(frameID, existing) g.evaluateForkChoice(existing) } // Skip if already processed continue } // Create and insert node newNode := &GlobalFrameNode{ Frame: pending.Frame, Parent: parentNode, Children: make(map[string]*GlobalFrameNode), Depth: parentNode.Depth + 1, } g.nodes[frameID] = newNode g.framesByNumber[pending.Frame.Header.FrameNumber] = append( g.framesByNumber[pending.Frame.Header.FrameNumber], newNode) parentNode.Children[frameID] = newNode g.cache.Add(frameID, newNode) g.logger.Debug( "processed pending frame", zap.Uint64("frame_number", pending.Frame.Header.FrameNumber), zap.String("id", frameID), zap.String("parent_id", parentFrameID), ) // Recursively process any frames waiting for this one g.processPendingFrames(frameID, newNode) // Evaluate fork choice g.evaluateForkChoice(newNode) } } // findNodeBySelector finds a node whose output hash matches the selector func (g *GlobalTimeReel) findNodeBySelector(selector []byte) *GlobalFrameNode { for _, node := range g.nodes { expectedSelector := computeGlobalPoseidonHash(node.Frame.Header.Output) if bytes.Equal(expectedSelector, selector) { return node } } return nil } // evaluateForkChoice evaluates fork choice and updates head if necessary func (g *GlobalTimeReel) evaluateForkChoice(newNode *GlobalFrameNode) { if g.head == nil || (!g.archiveMode && newNode.Frame.Header.FrameNumber > g.head.Frame.Header.FrameNumber && newNode.Frame.Header.FrameNumber-g.head.Frame.Header.FrameNumber > maxGlobalTreeDepth) { oldHead := g.head g.head = newNode g.sendHeadEvent(newNode, oldHead) return } // Find all competing branches (leaf nodes) leafNodes := g.findLeafNodes() if len(leafNodes) <= 1 { // No competition, check if we should update head if newNode.Depth > g.head.Depth { oldHead := g.head g.head = newNode g.persistCanonicalFrames([]*protobufs.GlobalFrame{newNode.Frame}) g.sendHeadEvent(newNode, oldHead) } return } // Get maximum depth among leaf nodes maxDepth := uint64(0) for _, leaf := range leafNodes { if leaf.Depth > maxDepth { maxDepth = leaf.Depth } } // Only consider leaf nodes at maximum depth for fork choice var competingLeaves []*GlobalFrameNode for _, leaf := range leafNodes { if leaf.Depth == maxDepth { competingLeaves = append(competingLeaves, leaf) } } // If only one leaf at max depth, make it head if len(competingLeaves) == 1 && competingLeaves[0].Frame.Header.FrameNumber == g.head.Frame.Header.FrameNumber { chosenNode := competingLeaves[0] if chosenNode != g.head { oldHead := g.head g.head = chosenNode // Check if this is a reorganization (fork) if oldHead != nil && !g.isAncestorNode(oldHead, chosenNode) { g.logger.Info( "reorganization detected (single leaf)", zap.Uint64("old_head_frame", oldHead.Frame.Header.FrameNumber), zap.Uint64("new_head_frame", chosenNode.Frame.Header.FrameNumber)) // This is a fork - emit fork detected event first event := GlobalEvent{ Type: TimeReelEventForkDetected, Frame: chosenNode.Frame, OldHead: oldHead.Frame, Message: fmt.Sprintf( "fork detected: old head %d (%x), new head %d (%x)", oldHead.Frame.Header.FrameNumber, poseidon.Sum(oldHead.Frame.Header.Output), chosenNode.Frame.Header.FrameNumber, poseidon.Sum(chosenNode.Frame.Header.Output), ), } g.sendEvent(event) // Find the common ancestor commonAncestor, reverts := g.findCommonAncestor(oldHead, chosenNode) if len(reverts) != 0 { g.rewindFrames(reverts) } // Emit new head events for each frame in the new canonical path // from the frame after the common ancestor to the new head if commonAncestor != nil { g.emitReplayEvents(commonAncestor, chosenNode) } } else { // Regular head advance - emit single new head event headChangesTotal.WithLabelValues("global", "advance").Inc() g.persistCanonicalFrames([]*protobufs.GlobalFrame{newNode.Frame}) event := GlobalEvent{ Type: TimeReelEventNewHead, Frame: chosenNode.Frame, } if oldHead != nil { event.OldHead = oldHead.Frame } g.sendEvent(event) } } return } // Convert competing leaf nodes to branches for fork choice branches := make([]Branch, 0, len(competingLeaves)) nodeToIndex := make(map[*GlobalFrameNode]int) for i, leaf := range competingLeaves { branch := g.nodeToBranch(leaf) branches = append(branches, branch) nodeToIndex[leaf] = i g.logger.Debug( "competing leaf", zap.Int("index", i), zap.Uint64("frame_number", leaf.Frame.Header.FrameNumber), ) } // Get current head index - find which competing leaf is the current head prevChoice := 0 for i, leaf := range competingLeaves { if leaf == g.head { prevChoice = i break } } // Perform fork choice forkChoiceTimer := prometheus.NewTimer( forkChoiceDuration.WithLabelValues("global"), ) // Debug: log branch information for i, branch := range branches { if len(branch.Frames) > 0 { lastFrame := branch.Frames[len(branch.Frames)-1] g.logger.Debug( "fork choice branch", zap.Int("index", i), zap.String("prover", hex.EncodeToString(lastFrame.ProverAddress)), zap.String("distance", lastFrame.Distance.String()), zap.Bool("is_current_head", competingLeaves[i] == g.head)) } } chosenIndex := ForkChoice(branches, g.forkChoiceParams, prevChoice) chosenNode := competingLeaves[chosenIndex] forkChoiceTimer.ObserveDuration() g.logger.Debug( "fork choice result", zap.Int("chosen_index", chosenIndex), zap.Int("prev_choice", prevChoice), ) // Record fork choice metrics forkChoiceEvaluations.WithLabelValues("global").Inc() competingBranches.WithLabelValues("global").Observe(float64(len(branches))) // Update head if it changed if chosenNode != g.head { oldHead := g.head g.head = chosenNode // Check if this is a reorganization (fork) if oldHead != nil && !g.isAncestorNode(oldHead, chosenNode) { g.logger.Info( "reorganization detected", zap.Uint64("old_head_frame", oldHead.Frame.Header.FrameNumber), zap.Uint64("new_head_frame", chosenNode.Frame.Header.FrameNumber)) // Record reorganization metrics headChangesTotal.WithLabelValues("global", "reorganization").Inc() // Calculate reorganization depth commonAncestor, reverts := g.findCommonAncestor(oldHead, chosenNode) if commonAncestor != nil { depth := oldHead.Depth - commonAncestor.Depth reorganizationDepth.WithLabelValues("global").Observe(float64(depth)) } // This is a fork - emit fork detected event first event := GlobalEvent{ Type: TimeReelEventForkDetected, Frame: chosenNode.Frame, OldHead: oldHead.Frame, Message: fmt.Sprintf( "fork detected: old head %d (%x), new head %d (%x)", oldHead.Frame.Header.FrameNumber, poseidon.Sum(oldHead.Frame.Header.Output), chosenNode.Frame.Header.FrameNumber, poseidon.Sum(chosenNode.Frame.Header.Output), ), } g.sendEvent(event) // Rewind all previous frames if len(reverts) != 0 { g.rewindFrames(reverts) } // Emit new head events for each frame in the new canonical path // from the frame after the common ancestor to the new head if commonAncestor != nil { g.emitReplayEvents(commonAncestor, chosenNode) } } else { // Regular head advance - emit single new head event event := GlobalEvent{ Type: TimeReelEventNewHead, Frame: chosenNode.Frame, } if oldHead != nil { event.OldHead = oldHead.Frame } g.sendEvent(event) } } } // findLeafNodes returns all leaf nodes (nodes with no children) that are in the // same connected component as the current head. This prevents spurious fork // choice across disconnected forests (e.g., after a non-archive snap-ahead). func (g *GlobalTimeReel) findLeafNodes() []*GlobalFrameNode { var leaves []*GlobalFrameNode if g.head == nil { // Fallback: no head yet, return all leaves for _, node := range g.nodes { if len(node.Children) == 0 { leaves = append(leaves, node) } } return leaves } headRoot := g.findRoot(g.head) for _, node := range g.nodes { if len(node.Children) != 0 { continue } if g.findRoot(node) == headRoot { leaves = append(leaves, node) } } return leaves } // findRoot walks parents to identify the root of a node func (g *GlobalTimeReel) findRoot(n *GlobalFrameNode) *GlobalFrameNode { cur := n for cur != nil && cur.Parent != nil { cur = cur.Parent } return cur } // nodeToBranch converts a node and its lineage to a Branch for fork choice func (g *GlobalTimeReel) nodeToBranch(node *GlobalFrameNode) Branch { // Build lineage from this node backwards, but limit to maxGlobalTreeDepth // frames var lineage []*GlobalFrameNode current := node depth := 0 for current != nil && depth < maxGlobalTreeDepth { lineage = append([]*GlobalFrameNode{current}, lineage...) current = current.Parent depth++ } // Convert to fork choice frames frames := make([]Frame, 0, len(lineage)) for _, n := range lineage { // Use frame output as prover address for uniqueness. Global frames have no // assigned prover as they should mutually agree. proverAddr := n.Frame.Header.Output if len(proverAddr) > 32 { proverAddr = proverAddr[:32] } // Only frames with signatures should have signature-based seniority // Genesis and frames without signatures get default seniority seniority := g.computeFrameSeniority(n.Frame) g.logger.Debug( "frame seniority in branch", zap.Uint64("frame_number", n.Frame.Header.FrameNumber), zap.Uint64("seniority", seniority), ) frame := Frame{ Distance: big.NewInt(0), Seniority: seniority, ProverAddress: proverAddr, } frames = append(frames, frame) } return Branch{Frames: frames} } // computeFrameSeniority computes seniority for fork choice based on number of // signers, excluding equivocators func (g *GlobalTimeReel) computeFrameSeniority( frame *protobufs.GlobalFrame, ) uint64 { // Genesis frame and frames without signatures get minimal seniority if frame.Header.PublicKeySignatureBls48581 == nil { // Use minimal seniority so genesis doesn't dominate fork choice return SCALE / 64 // Same as 1 signer } // Count bits excluding equivocators equivocatorsAtHeight := g.equivocators[frame.Header.FrameNumber] bitCount := 0 bitmask := frame.Header.PublicKeySignatureBls48581.Bitmask for i := 0; i < len(bitmask); i++ { for bit := 0; bit < 8; bit++ { if bitmask[i]&(1<= minDepthToKeep { // This child is being kept, so clear its parent reference child.Parent = nil } } } // Remove nodes from data structures for _, node := range nodesToPrune { frameID := g.ComputeFrameID(node.Frame) // Remove from nodes map delete(g.nodes, frameID) // Remove from framesByNumber frameNum := node.Frame.Header.FrameNumber if nodeList, exists := g.framesByNumber[frameNum]; exists { // Filter out the node to be pruned var filteredList []*GlobalFrameNode for _, n := range nodeList { if n != node { filteredList = append(filteredList, n) } } if len(filteredList) == 0 { delete(g.framesByNumber, frameNum) } else { g.framesByNumber[frameNum] = filteredList } } // Remove from cache g.cache.Remove(frameID) // Disconnect from parent if node.Parent != nil { delete(node.Parent.Children, frameID) } // Update root if necessary if g.root == node { // Find new root (should be one of the remaining nodes at minimum depth) g.root = nil for _, remaining := range g.nodes { if remaining.Depth == minDepthToKeep { if g.root == nil || remaining.Frame.Header.FrameNumber < g.root.Frame.Header.FrameNumber { g.root = remaining } } } // Clear parent reference for new root if g.root != nil { g.root.Parent = nil } } } g.logger.Info( "pruned old frames", zap.Int("pruned_count", len(nodesToPrune)), zap.Uint64("min_depth_kept", minDepthToKeep), zap.Uint64("head_depth", g.head.Depth)) } // pruneOldPendingFrames removes pending frames that are too old func (g *GlobalTimeReel) pruneOldPendingFrames() { // Prune pending frames older than 1.5 hours const maxPendingAge = 1 * 90 * 60 * 1000 // 1.5 hours in milliseconds currentTime := time.Now().UnixMilli() prunedCount := 0 for selector, pendingList := range g.pendingFrames { var filteredList []*GlobalPendingFrame for _, pending := range pendingList { age := currentTime - pending.Timestamp if age <= maxPendingAge { filteredList = append(filteredList, pending) } else { prunedCount++ } } if len(filteredList) == 0 { delete(g.pendingFrames, selector) } else { g.pendingFrames[selector] = filteredList } } if prunedCount > 0 { g.logger.Info( "pruned old pending frames", zap.Int("pruned_count", prunedCount), zap.Int("remaining_selectors", len(g.pendingFrames))) } } // GetTreeInfo returns debugging information about the tree structure func (g *GlobalTimeReel) GetTreeInfo() map[string]interface{} { g.mu.RLock() defer g.mu.RUnlock() info := map[string]interface{}{ "total_nodes": len(g.nodes), "pending_frames": len(g.pendingFrames), "max_depth": 0, "branch_count": 0, "max_tree_depth": maxTreeDepth, "pruning_needed": false, } if g.head != nil { info["head_depth"] = g.head.Depth info["head_frame_number"] = g.head.Frame.Header.FrameNumber info["pruning_needed"] = g.head.Depth >= maxTreeDepth } // Calculate max depth and branch count leafNodes := g.findLeafNodes() info["branch_count"] = len(leafNodes) maxDepth := uint64(0) minDepth := uint64(0) if len(g.nodes) > 0 { minDepth = ^uint64(0) // max value for _, node := range g.nodes { if node.Depth > maxDepth { maxDepth = node.Depth } if node.Depth < minDepth { minDepth = node.Depth } } } info["max_depth"] = maxDepth info["min_depth"] = minDepth info["tree_span"] = maxDepth - minDepth + 1 // Count pending frames totalPending := 0 for _, pendingList := range g.pendingFrames { totalPending += len(pendingList) } info["total_pending"] = totalPending return info } // updateTreeMetrics updates Prometheus metrics for the tree state func (g *GlobalTimeReel) updateTreeMetrics() { // Count total nodes treeNodeCount.WithLabelValues("global").Set(float64(len(g.nodes))) // Set tree depth if g.head != nil { treeDepth.WithLabelValues("global").Set(float64(g.head.Depth)) } // Count pending frames totalPending := 0 for _, pendingList := range g.pendingFrames { totalPending += len(pendingList) } pendingFramesCount.WithLabelValues("global").Set(float64(totalPending)) // Count unique equivocators uniqueEquivocators := make(map[int]bool) for _, equivocatorsAtHeight := range g.equivocators { for bitPos := range equivocatorsAtHeight { uniqueEquivocators[bitPos] = true } } equivocatorsTracked.WithLabelValues("global").Set( float64(len(uniqueEquivocators)), ) } // bootstrapFromStore loads up to the last maxGlobalTreeDepth canonical frames // from durable storage and fills the in-memory structures and cache. func (g *GlobalTimeReel) bootstrapFromStore() error { g.mu.Lock() defer g.mu.Unlock() latest, err := g.store.GetLatestGlobalClockFrame() if err != nil { if errors.Is(err, store.ErrNotFound) { // Fresh DB — nothing to load. return nil } return errors.Wrap(err, "bootstrap from store") } latestNum := latest.Header.FrameNumber var start uint64 if !g.archiveMode && latestNum+1 > maxGlobalTreeDepth { // Non-archive mode: only load last 10 frames start = latestNum - (maxGlobalTreeDepth - 1) } else { // Archive mode or insufficient frames: load all available start = g.genesisFrameNumber } iter, err := g.store.RangeGlobalClockFrames(start, latestNum) if err != nil { return errors.Wrap(err, "bootstrap from store") } defer iter.Close() var prev *GlobalFrameNode for ok := iter.First(); ok && iter.Valid(); ok = iter.Next() { frame, err := iter.Value() if err != nil { return errors.Wrap(err, "bootstrap from store") } frameID := g.ComputeFrameID(frame) // Create node; parent will be prev if selector matches, else try find by // selector. node := &GlobalFrameNode{ Frame: frame, Parent: nil, Children: make(map[string]*GlobalFrameNode), Depth: 0, } // Try to link to known parent (best effort; first loaded frame may lack // parent in memory). if prev != nil { // Fast path: if this frame points to prev's output, link. expSel := computeGlobalPoseidonHash(prev.Frame.Header.Output) if bytes.Equal(frame.Header.ParentSelector, expSel) { node.Parent = prev node.Depth = prev.Depth + 1 prev.Children[frameID] = node } } if node.Parent == nil { // Slow path: search existing nodes by selector (rare on linear history). if p := g.findNodeBySelector(frame.Header.ParentSelector); p != nil { node.Parent = p node.Depth = p.Depth + 1 p.Children[frameID] = node } else if !g.archiveMode && frame.Header.FrameNumber == start { // Non-archive mode: first frame loaded may not have parent in DB. // Treat it as a pseudo-root with depth 0. node.Depth = 0 g.logger.Info( "non-archive mode: treating first loaded frame as pseudo-root", zap.Uint64("frame_number", frame.Header.FrameNumber), ) } } if g.root == nil || (!g.archiveMode && frame.Header.FrameNumber == start) { // Set root to first loaded frame (actual genesis or pseudo-root in non-archive mode) g.root = node } g.nodes[frameID] = node g.framesByNumber[frame.Header.FrameNumber] = append( g.framesByNumber[frame.Header.FrameNumber], node, ) g.cache.Add(frameID, node) prev = node g.head = node } // Warm up the tree metrics g.updateTreeMetrics() if g.head != nil { g.logger.Info( "bootstrapped time reel from store", zap.Uint64("loaded_from_frame", start), zap.Uint64("loaded_to_frame", g.head.Frame.Header.FrameNumber), zap.Int("loaded_count", len(g.nodes)), zap.Bool("archive_mode", g.archiveMode), ) if !g.archiveMode && g.root != nil { g.logger.Info( "non-archive mode: accepting last 10 frames as valid chain", zap.Uint64("pseudo_root_frame", g.root.Frame.Header.FrameNumber), zap.Uint64("head_frame", g.head.Frame.Header.FrameNumber), ) } } return nil } // persistCanonicalFrames writes a contiguous set of canonical frames to the // store in one txn. In non-archive mode, it also prunes old frames from the // store. func (g *GlobalTimeReel) persistCanonicalFrames( frames []*protobufs.GlobalFrame, ) error { if len(frames) == 0 { return nil } txn, err := g.store.NewTransaction(false) if err != nil { return errors.Wrap(err, "persist canonical frames") } for _, f := range frames { if err := g.materializeFunc(txn, f); err != nil { _ = txn.Abort() return errors.Wrap(err, "persist canonical frames") } if err := g.store.PutGlobalClockFrame(f, txn); err != nil { _ = txn.Abort() return errors.Wrap(err, "persist canonical frames") } } if err := txn.Commit(); err != nil { return errors.Wrap(err, "persist canonical frames") } // In non-archive mode, prune frames older than maxGlobalTreeDepth from store if !g.archiveMode && g.head != nil { // Calculate the oldest frame we want to keep if g.head.Frame.Header.FrameNumber > maxGlobalTreeDepth { oldestToKeep := g.head.Frame.Header.FrameNumber - maxGlobalTreeDepth + 1 err := g.store.DeleteGlobalClockFrameRange(0, oldestToKeep) if err != nil { g.logger.Error("unable to delete frame range", zap.Error(err)) } } } return nil }