mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 10:27:26 +08:00
1710 lines
44 KiB
Go
1710 lines
44 KiB
Go
package time
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"math/big"
|
|
"sync"
|
|
"time"
|
|
|
|
lru "github.com/hashicorp/golang-lru/v2"
|
|
"github.com/iden3/go-iden3-crypto/poseidon"
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"go.uber.org/zap"
|
|
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
|
|
"source.quilibrium.com/quilibrium/monorepo/protobufs"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/consensus"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/store"
|
|
)
|
|
|
|
const (
|
|
// Default cache size for LRU
|
|
defaultGlobalCacheSize = 10000
|
|
// Maximum tree depth to prevent unbounded growth
|
|
maxGlobalTreeDepth = 10
|
|
)
|
|
|
|
// TimeReelEventType represents different types of events that can occur in a
|
|
// time reel
|
|
type TimeReelEventType int
|
|
|
|
const (
|
|
TimeReelEventNewHead TimeReelEventType = iota
|
|
TimeReelEventForkDetected
|
|
TimeReelEventEquivocationDetected
|
|
)
|
|
|
|
// GlobalEvent represents an event in the global time reel
|
|
type GlobalEvent struct {
|
|
Type TimeReelEventType
|
|
Frame *protobufs.GlobalFrame
|
|
OldHead *protobufs.GlobalFrame // For fork events
|
|
Message string
|
|
}
|
|
|
|
func (n *GlobalEvent) ControlEventData() {}
|
|
|
|
// GlobalFrameNode represents a node in the global frame tree
|
|
type GlobalFrameNode struct {
|
|
Frame *protobufs.GlobalFrame
|
|
Parent *GlobalFrameNode
|
|
Children map[string]*GlobalFrameNode
|
|
Depth uint64
|
|
}
|
|
|
|
// GlobalPendingFrame represents a frame waiting for its parent
|
|
type GlobalPendingFrame struct {
|
|
Frame *protobufs.GlobalFrame
|
|
ParentSelector []byte
|
|
Timestamp int64
|
|
}
|
|
|
|
// GlobalTimeReel implements a time reel for GlobalFrames
|
|
type GlobalTimeReel struct {
|
|
logger *zap.Logger
|
|
proverRegistry consensus.ProverRegistry
|
|
mu sync.RWMutex
|
|
|
|
// Tree structure
|
|
root *GlobalFrameNode
|
|
// Fast lookup by frame ID (output hash)
|
|
nodes map[string]*GlobalFrameNode
|
|
// Fast lookup by frame number
|
|
framesByNumber map[uint64][]*GlobalFrameNode
|
|
// Current head of canonical chain
|
|
head *GlobalFrameNode
|
|
|
|
// Pending frames waiting for parents
|
|
pendingFrames map[string][]*GlobalPendingFrame
|
|
|
|
// Fork choice parameters
|
|
forkChoiceParams Params
|
|
|
|
// LRU cache for frame lookups
|
|
cache *lru.Cache[string, *GlobalFrameNode]
|
|
|
|
// Underlying frame store
|
|
store store.ClockStore
|
|
|
|
// Event channel with guaranteed delivery
|
|
eventCh chan GlobalEvent
|
|
eventDone chan struct{} // Signals event processing complete
|
|
|
|
// Equivocator tracking: frame number -> bit positions that equivocated
|
|
equivocators map[uint64]map[int]bool
|
|
|
|
// Materialize side effects
|
|
materializeFunc func(
|
|
txn store.Transaction,
|
|
frame *protobufs.GlobalFrame,
|
|
) error
|
|
|
|
// Revert side effects
|
|
revertFunc func(
|
|
txn store.Transaction,
|
|
frameNumber uint64,
|
|
requests []*protobufs.MessageBundle,
|
|
) error
|
|
|
|
// Control
|
|
ctx context.Context
|
|
|
|
// Network-specific consensus toggles
|
|
genesisFrameNumber uint64
|
|
|
|
// Archive mode: whether to hold historic frame data
|
|
archiveMode bool
|
|
}
|
|
|
|
// NewGlobalTimeReel creates a new global time reel
|
|
func NewGlobalTimeReel(
|
|
logger *zap.Logger,
|
|
proverRegistry consensus.ProverRegistry,
|
|
clockStore store.ClockStore,
|
|
network uint8,
|
|
archiveMode bool,
|
|
) (*GlobalTimeReel, error) {
|
|
cache, err := lru.New[string, *GlobalFrameNode](
|
|
defaultGlobalCacheSize,
|
|
)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "new global time reel")
|
|
}
|
|
|
|
genesisFrameNumber := uint64(0)
|
|
|
|
if network == 0 {
|
|
genesisFrameNumber = 244200
|
|
}
|
|
|
|
return &GlobalTimeReel{
|
|
logger: logger,
|
|
proverRegistry: proverRegistry,
|
|
store: clockStore,
|
|
nodes: make(map[string]*GlobalFrameNode),
|
|
framesByNumber: make(map[uint64][]*GlobalFrameNode),
|
|
pendingFrames: make(map[string][]*GlobalPendingFrame),
|
|
forkChoiceParams: DefaultForkChoiceParams,
|
|
cache: cache,
|
|
eventCh: make(chan GlobalEvent, 1000),
|
|
eventDone: make(chan struct{}),
|
|
equivocators: make(map[uint64]map[int]bool),
|
|
materializeFunc: func(
|
|
txn store.Transaction,
|
|
frame *protobufs.GlobalFrame,
|
|
) error {
|
|
return nil
|
|
},
|
|
revertFunc: func(
|
|
txn store.Transaction,
|
|
frameNumber uint64,
|
|
requests []*protobufs.MessageBundle,
|
|
) error {
|
|
return nil
|
|
},
|
|
genesisFrameNumber: genesisFrameNumber,
|
|
archiveMode: archiveMode,
|
|
}, nil
|
|
}
|
|
|
|
// SetMaterializeFunc sets the materialize side effects function
|
|
func (g *GlobalTimeReel) SetMaterializeFunc(
|
|
materializeFunc func(
|
|
txn store.Transaction,
|
|
frame *protobufs.GlobalFrame,
|
|
) error,
|
|
) {
|
|
g.materializeFunc = materializeFunc
|
|
}
|
|
|
|
// SetRevertFunc sets the revert side effects function
|
|
func (g *GlobalTimeReel) SetRevertFunc(
|
|
revertFunc func(
|
|
txn store.Transaction,
|
|
frameNumber uint64,
|
|
requests []*protobufs.MessageBundle,
|
|
) error,
|
|
) {
|
|
g.revertFunc = revertFunc
|
|
}
|
|
|
|
// Start starts the global time reel
|
|
func (g *GlobalTimeReel) Start(
|
|
ctx lifecycle.SignalerContext,
|
|
ready lifecycle.ReadyFunc,
|
|
) {
|
|
g.ctx = ctx
|
|
g.logger.Info("starting global time reel")
|
|
|
|
// Warm the in-memory tree/cache from store.
|
|
if err := g.bootstrapFromStore(); err != nil {
|
|
g.logger.Error("could not bootstrap from store", zap.Error(err))
|
|
ctx.Throw(err)
|
|
return
|
|
}
|
|
|
|
ready()
|
|
<-ctx.Done()
|
|
|
|
g.logger.Info("stopping global time reel")
|
|
close(g.eventCh)
|
|
close(g.eventDone)
|
|
}
|
|
|
|
// sendEvent sends an event with guaranteed delivery
|
|
func (g *GlobalTimeReel) sendEvent(event GlobalEvent) {
|
|
// prioritize halts
|
|
select {
|
|
case <-g.ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
// This blocks until the event is delivered or halted, guaranteeing order
|
|
select {
|
|
case <-g.ctx.Done():
|
|
return
|
|
case g.eventCh <- event:
|
|
g.logger.Debug(
|
|
"sent event",
|
|
zap.Int("type", int(event.Type)),
|
|
zap.Uint64("frame_number", event.Frame.Header.FrameNumber),
|
|
zap.String("id", g.ComputeFrameID(event.Frame)),
|
|
)
|
|
}
|
|
}
|
|
|
|
// Insert inserts a global frame header into the tree structure (non-blocking)
|
|
func (g *GlobalTimeReel) Insert(
|
|
frame *protobufs.GlobalFrame,
|
|
) error {
|
|
// Start timing
|
|
timer := prometheus.NewTimer(
|
|
frameProcessingDuration.WithLabelValues("global"),
|
|
)
|
|
defer timer.ObserveDuration()
|
|
|
|
g.mu.Lock()
|
|
defer g.mu.Unlock()
|
|
|
|
// Compute frame ID
|
|
frameID := g.ComputeFrameID(frame)
|
|
|
|
// Check if frame already exists with same ID
|
|
if _, exists := g.nodes[frameID]; exists {
|
|
return nil
|
|
}
|
|
|
|
// Check for equivocation with different frames at same height
|
|
nodesAtHeight := g.framesByNumber[frame.Header.FrameNumber]
|
|
for _, node := range nodesAtHeight {
|
|
if !isEqualGlobalFrame(node.Frame, frame) &&
|
|
hasOverlappingBits(node.Frame, frame) {
|
|
g.logger.Warn(
|
|
"equivocation detected for frame",
|
|
zap.Uint64("frame_number", frame.Header.FrameNumber),
|
|
)
|
|
|
|
// Track which bits equivocated
|
|
if g.equivocators[frame.Header.FrameNumber] == nil {
|
|
g.equivocators[frame.Header.FrameNumber] = make(map[int]bool)
|
|
}
|
|
|
|
// Find overlapping bits and mark them as equivocators
|
|
existingBits := node.Frame.Header.PublicKeySignatureBls48581
|
|
newBits := frame.Header.PublicKeySignatureBls48581
|
|
if existingBits != nil && newBits != nil {
|
|
existingBitmask := existingBits.Bitmask
|
|
newBitmask := newBits.Bitmask
|
|
maxLen := len(existingBitmask)
|
|
if len(newBitmask) > maxLen {
|
|
maxLen = len(newBitmask)
|
|
}
|
|
|
|
for i := 0; i < maxLen; i++ {
|
|
var eByte, nByte byte
|
|
if i < len(existingBitmask) {
|
|
eByte = existingBitmask[i]
|
|
}
|
|
if i < len(newBitmask) {
|
|
nByte = newBitmask[i]
|
|
}
|
|
overlapping := eByte & nByte
|
|
for bit := 0; bit < 8; bit++ {
|
|
if overlapping&(1<<bit) != 0 {
|
|
g.equivocators[frame.Header.FrameNumber][i*8+bit] = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
g.sendEvent(GlobalEvent{
|
|
Type: TimeReelEventEquivocationDetected,
|
|
Frame: frame,
|
|
Message: fmt.Sprintf(
|
|
"equivocation at frame %d",
|
|
frame.Header.FrameNumber,
|
|
),
|
|
})
|
|
|
|
// Record equivocation metric
|
|
equivocationsDetected.WithLabelValues("global").Inc()
|
|
|
|
// Continue processing, we need to handle fork choice still before the
|
|
// trees update
|
|
}
|
|
}
|
|
|
|
// Handle genesis frame
|
|
if frame.Header.FrameNumber == g.genesisFrameNumber {
|
|
return g.insertGenesisFrame(frame, frameID)
|
|
}
|
|
|
|
// In non-archive mode, if we have no frames yet and this frame is recent
|
|
// enough, accept it as a starting point
|
|
if !g.archiveMode && g.root == nil && len(g.nodes) == 0 {
|
|
// Accept this frame as the initial pseudo-root
|
|
g.logger.Info(
|
|
"non-archive mode: accepting first frame as pseudo-root",
|
|
zap.Uint64("frame_number", frame.Header.FrameNumber),
|
|
)
|
|
return g.insertGenesisFrame(frame, frameID)
|
|
}
|
|
|
|
// Try to find parent
|
|
parentSelector := string(frame.Header.ParentSelector)
|
|
parentNode := g.findNodeBySelector(frame.Header.ParentSelector)
|
|
|
|
if parentNode == nil {
|
|
if !g.archiveMode {
|
|
if g.head != nil {
|
|
// Check if frame is ahead of our head
|
|
if frame.Header.FrameNumber > g.head.Frame.Header.FrameNumber {
|
|
// Frame is ahead, add to pending
|
|
g.addPendingFrame(frame, parentSelector)
|
|
|
|
// Insert frame into tree
|
|
newNode := &GlobalFrameNode{
|
|
Frame: frame,
|
|
Parent: nil,
|
|
Children: make(map[string]*GlobalFrameNode),
|
|
Depth: 1,
|
|
}
|
|
|
|
// Add to data structures
|
|
g.nodes[frameID] = newNode
|
|
g.framesByNumber[frame.Header.FrameNumber] = append(
|
|
g.framesByNumber[frame.Header.FrameNumber],
|
|
newNode,
|
|
)
|
|
g.cache.Add(frameID, newNode)
|
|
|
|
// Evaluate fork choice if we have competing branches
|
|
g.evaluateForkChoice(newNode)
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Parent not found, add to pending frames
|
|
g.addPendingFrame(frame, parentSelector)
|
|
return nil
|
|
}
|
|
|
|
// Verify parent selector matches
|
|
expectedSelector := computeGlobalPoseidonHash(parentNode.Frame.Header.Output)
|
|
if !bytes.Equal(expectedSelector, frame.Header.ParentSelector) {
|
|
return errors.New("parent selector mismatch")
|
|
}
|
|
|
|
// Insert frame into tree
|
|
newNode := &GlobalFrameNode{
|
|
Frame: frame,
|
|
Parent: parentNode,
|
|
Children: make(map[string]*GlobalFrameNode),
|
|
Depth: parentNode.Depth + 1,
|
|
}
|
|
|
|
// Add to data structures
|
|
g.nodes[frameID] = newNode
|
|
g.framesByNumber[frame.Header.FrameNumber] = append(
|
|
g.framesByNumber[frame.Header.FrameNumber],
|
|
newNode,
|
|
)
|
|
parentNode.Children[frameID] = newNode
|
|
g.cache.Add(frameID, newNode)
|
|
|
|
// Process any pending frames that can now be connected
|
|
g.processPendingFrames(frameID, newNode)
|
|
|
|
// Evaluate fork choice if we have competing branches
|
|
g.evaluateForkChoice(newNode)
|
|
|
|
// Prune old frames if tree is getting too deep
|
|
g.pruneOldFrames()
|
|
|
|
// Prune old pending frames periodically
|
|
g.pruneOldPendingFrames()
|
|
|
|
// Record success
|
|
framesProcessedTotal.WithLabelValues("global", "success").Inc()
|
|
|
|
// Update tree metrics
|
|
g.updateTreeMetrics()
|
|
|
|
return nil
|
|
}
|
|
|
|
// insertGenesisFrame handles genesis frame insertion or pseudo-root in
|
|
// non-archive mode
|
|
func (g *GlobalTimeReel) insertGenesisFrame(
|
|
frame *protobufs.GlobalFrame,
|
|
frameID string,
|
|
) error {
|
|
if g.root != nil && g.archiveMode {
|
|
// In archive mode, don't replace existing root
|
|
return errors.New("genesis frame already exists")
|
|
}
|
|
|
|
if g.root != nil && !g.archiveMode {
|
|
// In non-archive mode, check if this frame should replace the current
|
|
// pseudo-root
|
|
if frame.Header.FrameNumber >= g.root.Frame.Header.FrameNumber {
|
|
// This frame is not older than current root, don't replace
|
|
return errors.New("frame is not older than current root")
|
|
}
|
|
// This frame is older, it should become the new pseudo-root
|
|
g.logger.Info(
|
|
"non-archive mode: replacing pseudo-root with older frame",
|
|
zap.Uint64("old_root_frame", g.root.Frame.Header.FrameNumber),
|
|
zap.Uint64("new_root_frame", frame.Header.FrameNumber),
|
|
)
|
|
}
|
|
|
|
g.root = &GlobalFrameNode{
|
|
Frame: frame,
|
|
Parent: nil,
|
|
Children: make(map[string]*GlobalFrameNode),
|
|
Depth: 0,
|
|
}
|
|
|
|
g.nodes[frameID] = g.root
|
|
g.framesByNumber[frame.Header.FrameNumber] = []*GlobalFrameNode{g.root}
|
|
g.head = g.root
|
|
g.cache.Add(frameID, g.root)
|
|
|
|
// Persist canonical genesis.
|
|
if err := g.persistCanonicalFrames(
|
|
[]*protobufs.GlobalFrame{frame},
|
|
); err != nil {
|
|
return errors.Wrap(err, "insert genesis frame")
|
|
}
|
|
|
|
// Send new head event
|
|
g.sendEvent(GlobalEvent{
|
|
Type: TimeReelEventNewHead,
|
|
Frame: frame,
|
|
})
|
|
|
|
// Process any pending frames that can now be connected
|
|
g.processPendingFrames(frameID, g.root)
|
|
|
|
// Prune old frames if tree is getting too deep
|
|
g.pruneOldFrames()
|
|
|
|
return nil
|
|
}
|
|
|
|
// addPendingFrame adds a frame to the pending list
|
|
func (g *GlobalTimeReel) addPendingFrame(
|
|
frame *protobufs.GlobalFrame,
|
|
parentSelector string,
|
|
) {
|
|
pending := &GlobalPendingFrame{
|
|
Frame: frame,
|
|
ParentSelector: frame.Header.ParentSelector,
|
|
Timestamp: frame.Header.Timestamp,
|
|
}
|
|
|
|
g.pendingFrames[parentSelector] = append(
|
|
g.pendingFrames[parentSelector],
|
|
pending,
|
|
)
|
|
|
|
g.logger.Debug(
|
|
"added pending frame",
|
|
zap.Uint64("frame_number", frame.Header.FrameNumber),
|
|
zap.String(
|
|
"parent_selector",
|
|
fmt.Sprintf("%x", frame.Header.ParentSelector),
|
|
),
|
|
)
|
|
}
|
|
|
|
// processPendingFrames processes frames that were waiting for the given parent
|
|
func (g *GlobalTimeReel) processPendingFrames(
|
|
parentFrameID string,
|
|
parentNode *GlobalFrameNode,
|
|
) {
|
|
g.logger.Debug(
|
|
"process pending frame",
|
|
zap.Uint64("frame_number", parentNode.Frame.Header.FrameNumber),
|
|
zap.String("id", parentFrameID),
|
|
)
|
|
parentSelector := computeGlobalPoseidonHash(parentNode.Frame.Header.Output)
|
|
parentSelectorStr := string(parentSelector)
|
|
|
|
pendingList := g.pendingFrames[parentSelectorStr]
|
|
if len(pendingList) == 0 {
|
|
return
|
|
}
|
|
|
|
// Remove from pending list
|
|
delete(g.pendingFrames, parentSelectorStr)
|
|
|
|
// Process each pending frame
|
|
for _, pending := range pendingList {
|
|
frameID := g.ComputeFrameID(pending.Frame)
|
|
|
|
if existing, exists := g.nodes[frameID]; exists {
|
|
// Re-parent previously pre-inserted orphan
|
|
if existing.Parent == nil {
|
|
existing.Parent = parentNode
|
|
existing.Depth = parentNode.Depth + 1
|
|
parentNode.Children[frameID] = existing
|
|
g.framesByNumber[pending.Frame.Header.FrameNumber] = append(
|
|
g.framesByNumber[pending.Frame.Header.FrameNumber], existing)
|
|
|
|
g.cache.Add(frameID, existing)
|
|
|
|
g.logger.Debug("reparented pending orphan frame",
|
|
zap.Uint64("frame_number", pending.Frame.Header.FrameNumber),
|
|
zap.String("id", frameID),
|
|
zap.String("parent_id", parentFrameID),
|
|
)
|
|
|
|
g.processPendingFrames(frameID, existing)
|
|
|
|
g.evaluateForkChoice(existing)
|
|
}
|
|
|
|
// Skip if already processed
|
|
continue
|
|
}
|
|
|
|
// Create and insert node
|
|
newNode := &GlobalFrameNode{
|
|
Frame: pending.Frame,
|
|
Parent: parentNode,
|
|
Children: make(map[string]*GlobalFrameNode),
|
|
Depth: parentNode.Depth + 1,
|
|
}
|
|
|
|
g.nodes[frameID] = newNode
|
|
g.framesByNumber[pending.Frame.Header.FrameNumber] = append(
|
|
g.framesByNumber[pending.Frame.Header.FrameNumber], newNode)
|
|
parentNode.Children[frameID] = newNode
|
|
g.cache.Add(frameID, newNode)
|
|
|
|
g.logger.Debug(
|
|
"processed pending frame",
|
|
zap.Uint64("frame_number", pending.Frame.Header.FrameNumber),
|
|
zap.String("id", frameID),
|
|
zap.String("parent_id", parentFrameID),
|
|
)
|
|
|
|
// Recursively process any frames waiting for this one
|
|
g.processPendingFrames(frameID, newNode)
|
|
|
|
// Evaluate fork choice
|
|
g.evaluateForkChoice(newNode)
|
|
}
|
|
}
|
|
|
|
// findNodeBySelector finds a node whose output hash matches the selector
|
|
func (g *GlobalTimeReel) findNodeBySelector(selector []byte) *GlobalFrameNode {
|
|
for _, node := range g.nodes {
|
|
expectedSelector := computeGlobalPoseidonHash(node.Frame.Header.Output)
|
|
if bytes.Equal(expectedSelector, selector) {
|
|
return node
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// evaluateForkChoice evaluates fork choice and updates head if necessary
|
|
func (g *GlobalTimeReel) evaluateForkChoice(newNode *GlobalFrameNode) {
|
|
if g.head == nil || (!g.archiveMode &&
|
|
newNode.Frame.Header.FrameNumber > g.head.Frame.Header.FrameNumber &&
|
|
newNode.Frame.Header.FrameNumber-g.head.Frame.Header.FrameNumber >
|
|
maxGlobalTreeDepth) {
|
|
oldHead := g.head
|
|
g.head = newNode
|
|
g.sendHeadEvent(newNode, oldHead)
|
|
return
|
|
}
|
|
|
|
// Find all competing branches (leaf nodes)
|
|
leafNodes := g.findLeafNodes()
|
|
if len(leafNodes) <= 1 {
|
|
// No competition, check if we should update head
|
|
if newNode.Depth > g.head.Depth {
|
|
oldHead := g.head
|
|
g.head = newNode
|
|
|
|
g.persistCanonicalFrames([]*protobufs.GlobalFrame{newNode.Frame})
|
|
|
|
g.sendHeadEvent(newNode, oldHead)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Get maximum depth among leaf nodes
|
|
maxDepth := uint64(0)
|
|
for _, leaf := range leafNodes {
|
|
if leaf.Depth > maxDepth {
|
|
maxDepth = leaf.Depth
|
|
}
|
|
}
|
|
|
|
// Only consider leaf nodes at maximum depth for fork choice
|
|
var competingLeaves []*GlobalFrameNode
|
|
for _, leaf := range leafNodes {
|
|
if leaf.Depth == maxDepth {
|
|
competingLeaves = append(competingLeaves, leaf)
|
|
}
|
|
}
|
|
|
|
// If only one leaf at max depth, make it head
|
|
if len(competingLeaves) == 1 &&
|
|
competingLeaves[0].Frame.Header.FrameNumber == g.head.Frame.Header.FrameNumber {
|
|
chosenNode := competingLeaves[0]
|
|
if chosenNode != g.head {
|
|
oldHead := g.head
|
|
g.head = chosenNode
|
|
|
|
// Check if this is a reorganization (fork)
|
|
if oldHead != nil && !g.isAncestorNode(oldHead, chosenNode) {
|
|
g.logger.Info(
|
|
"reorganization detected (single leaf)",
|
|
zap.Uint64("old_head_frame", oldHead.Frame.Header.FrameNumber),
|
|
zap.Uint64("new_head_frame", chosenNode.Frame.Header.FrameNumber))
|
|
// This is a fork - emit fork detected event first
|
|
event := GlobalEvent{
|
|
Type: TimeReelEventForkDetected,
|
|
Frame: chosenNode.Frame,
|
|
OldHead: oldHead.Frame,
|
|
Message: fmt.Sprintf(
|
|
"fork detected: old head %d (%x), new head %d (%x)",
|
|
oldHead.Frame.Header.FrameNumber,
|
|
poseidon.Sum(oldHead.Frame.Header.Output),
|
|
chosenNode.Frame.Header.FrameNumber,
|
|
poseidon.Sum(chosenNode.Frame.Header.Output),
|
|
),
|
|
}
|
|
g.sendEvent(event)
|
|
|
|
// Find the common ancestor
|
|
commonAncestor, reverts := g.findCommonAncestor(oldHead, chosenNode)
|
|
|
|
if len(reverts) != 0 {
|
|
g.rewindFrames(reverts)
|
|
}
|
|
|
|
// Emit new head events for each frame in the new canonical path
|
|
// from the frame after the common ancestor to the new head
|
|
if commonAncestor != nil {
|
|
g.emitReplayEvents(commonAncestor, chosenNode)
|
|
}
|
|
} else {
|
|
// Regular head advance - emit single new head event
|
|
headChangesTotal.WithLabelValues("global", "advance").Inc()
|
|
|
|
g.persistCanonicalFrames([]*protobufs.GlobalFrame{newNode.Frame})
|
|
|
|
event := GlobalEvent{
|
|
Type: TimeReelEventNewHead,
|
|
Frame: chosenNode.Frame,
|
|
}
|
|
if oldHead != nil {
|
|
event.OldHead = oldHead.Frame
|
|
}
|
|
g.sendEvent(event)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// Convert competing leaf nodes to branches for fork choice
|
|
branches := make([]Branch, 0, len(competingLeaves))
|
|
nodeToIndex := make(map[*GlobalFrameNode]int)
|
|
|
|
for i, leaf := range competingLeaves {
|
|
branch := g.nodeToBranch(leaf)
|
|
branches = append(branches, branch)
|
|
nodeToIndex[leaf] = i
|
|
g.logger.Debug(
|
|
"competing leaf",
|
|
zap.Int("index", i),
|
|
zap.Uint64("frame_number", leaf.Frame.Header.FrameNumber),
|
|
)
|
|
}
|
|
|
|
// Get current head index - find which competing leaf is the current head
|
|
prevChoice := 0
|
|
for i, leaf := range competingLeaves {
|
|
if leaf == g.head {
|
|
prevChoice = i
|
|
break
|
|
}
|
|
}
|
|
|
|
// Perform fork choice
|
|
forkChoiceTimer := prometheus.NewTimer(
|
|
forkChoiceDuration.WithLabelValues("global"),
|
|
)
|
|
|
|
// Debug: log branch information
|
|
for i, branch := range branches {
|
|
if len(branch.Frames) > 0 {
|
|
lastFrame := branch.Frames[len(branch.Frames)-1]
|
|
g.logger.Debug(
|
|
"fork choice branch",
|
|
zap.Int("index", i),
|
|
zap.String("prover", hex.EncodeToString(lastFrame.ProverAddress)),
|
|
zap.String("distance", lastFrame.Distance.String()),
|
|
zap.Bool("is_current_head", competingLeaves[i] == g.head))
|
|
}
|
|
}
|
|
|
|
chosenIndex := ForkChoice(branches, g.forkChoiceParams, prevChoice)
|
|
chosenNode := competingLeaves[chosenIndex]
|
|
forkChoiceTimer.ObserveDuration()
|
|
|
|
g.logger.Debug(
|
|
"fork choice result",
|
|
zap.Int("chosen_index", chosenIndex),
|
|
zap.Int("prev_choice", prevChoice),
|
|
)
|
|
|
|
// Record fork choice metrics
|
|
forkChoiceEvaluations.WithLabelValues("global").Inc()
|
|
competingBranches.WithLabelValues("global").Observe(float64(len(branches)))
|
|
|
|
// Update head if it changed
|
|
if chosenNode != g.head {
|
|
oldHead := g.head
|
|
g.head = chosenNode
|
|
|
|
// Check if this is a reorganization (fork)
|
|
if oldHead != nil && !g.isAncestorNode(oldHead, chosenNode) {
|
|
g.logger.Info(
|
|
"reorganization detected",
|
|
zap.Uint64("old_head_frame", oldHead.Frame.Header.FrameNumber),
|
|
zap.Uint64("new_head_frame", chosenNode.Frame.Header.FrameNumber))
|
|
|
|
// Record reorganization metrics
|
|
headChangesTotal.WithLabelValues("global", "reorganization").Inc()
|
|
|
|
// Calculate reorganization depth
|
|
commonAncestor, reverts := g.findCommonAncestor(oldHead, chosenNode)
|
|
if commonAncestor != nil {
|
|
depth := oldHead.Depth - commonAncestor.Depth
|
|
reorganizationDepth.WithLabelValues("global").Observe(float64(depth))
|
|
}
|
|
|
|
// This is a fork - emit fork detected event first
|
|
event := GlobalEvent{
|
|
Type: TimeReelEventForkDetected,
|
|
Frame: chosenNode.Frame,
|
|
OldHead: oldHead.Frame,
|
|
Message: fmt.Sprintf(
|
|
"fork detected: old head %d (%x), new head %d (%x)",
|
|
oldHead.Frame.Header.FrameNumber,
|
|
poseidon.Sum(oldHead.Frame.Header.Output),
|
|
chosenNode.Frame.Header.FrameNumber,
|
|
poseidon.Sum(chosenNode.Frame.Header.Output),
|
|
),
|
|
}
|
|
g.sendEvent(event)
|
|
|
|
// Rewind all previous frames
|
|
if len(reverts) != 0 {
|
|
g.rewindFrames(reverts)
|
|
}
|
|
|
|
// Emit new head events for each frame in the new canonical path
|
|
// from the frame after the common ancestor to the new head
|
|
if commonAncestor != nil {
|
|
g.emitReplayEvents(commonAncestor, chosenNode)
|
|
}
|
|
} else {
|
|
// Regular head advance - emit single new head event
|
|
event := GlobalEvent{
|
|
Type: TimeReelEventNewHead,
|
|
Frame: chosenNode.Frame,
|
|
}
|
|
if oldHead != nil {
|
|
event.OldHead = oldHead.Frame
|
|
}
|
|
g.sendEvent(event)
|
|
}
|
|
}
|
|
}
|
|
|
|
// findLeafNodes returns all leaf nodes (nodes with no children) that are in the
|
|
// same connected component as the current head. This prevents spurious fork
|
|
// choice across disconnected forests (e.g., after a non-archive snap-ahead).
|
|
func (g *GlobalTimeReel) findLeafNodes() []*GlobalFrameNode {
|
|
var leaves []*GlobalFrameNode
|
|
if g.head == nil {
|
|
// Fallback: no head yet, return all leaves
|
|
for _, node := range g.nodes {
|
|
if len(node.Children) == 0 {
|
|
leaves = append(leaves, node)
|
|
}
|
|
}
|
|
|
|
return leaves
|
|
}
|
|
|
|
headRoot := g.findRoot(g.head)
|
|
for _, node := range g.nodes {
|
|
if len(node.Children) != 0 {
|
|
continue
|
|
}
|
|
|
|
if g.findRoot(node) == headRoot {
|
|
leaves = append(leaves, node)
|
|
}
|
|
}
|
|
|
|
return leaves
|
|
}
|
|
|
|
// findRoot walks parents to identify the root of a node
|
|
func (g *GlobalTimeReel) findRoot(n *GlobalFrameNode) *GlobalFrameNode {
|
|
cur := n
|
|
for cur != nil && cur.Parent != nil {
|
|
cur = cur.Parent
|
|
}
|
|
return cur
|
|
}
|
|
|
|
// nodeToBranch converts a node and its lineage to a Branch for fork choice
|
|
func (g *GlobalTimeReel) nodeToBranch(node *GlobalFrameNode) Branch {
|
|
// Build lineage from this node backwards, but limit to maxGlobalTreeDepth
|
|
// frames
|
|
var lineage []*GlobalFrameNode
|
|
current := node
|
|
depth := 0
|
|
|
|
for current != nil && depth < maxGlobalTreeDepth {
|
|
lineage = append([]*GlobalFrameNode{current}, lineage...)
|
|
current = current.Parent
|
|
depth++
|
|
}
|
|
|
|
// Convert to fork choice frames
|
|
frames := make([]Frame, 0, len(lineage))
|
|
for _, n := range lineage {
|
|
// Use frame output as prover address for uniqueness. Global frames have no
|
|
// assigned prover as they should mutually agree.
|
|
proverAddr := n.Frame.Header.Output
|
|
if len(proverAddr) > 32 {
|
|
proverAddr = proverAddr[:32]
|
|
}
|
|
|
|
// Only frames with signatures should have signature-based seniority
|
|
// Genesis and frames without signatures get default seniority
|
|
seniority := g.computeFrameSeniority(n.Frame)
|
|
|
|
g.logger.Debug(
|
|
"frame seniority in branch",
|
|
zap.Uint64("frame_number", n.Frame.Header.FrameNumber),
|
|
zap.Uint64("seniority", seniority),
|
|
)
|
|
|
|
frame := Frame{
|
|
Distance: big.NewInt(0),
|
|
Seniority: seniority,
|
|
ProverAddress: proverAddr,
|
|
}
|
|
frames = append(frames, frame)
|
|
}
|
|
|
|
return Branch{Frames: frames}
|
|
}
|
|
|
|
// computeFrameSeniority computes seniority for fork choice based on number of
|
|
// signers, excluding equivocators
|
|
func (g *GlobalTimeReel) computeFrameSeniority(
|
|
frame *protobufs.GlobalFrame,
|
|
) uint64 {
|
|
// Genesis frame and frames without signatures get minimal seniority
|
|
if frame.Header.PublicKeySignatureBls48581 == nil {
|
|
// Use minimal seniority so genesis doesn't dominate fork choice
|
|
return SCALE / 64 // Same as 1 signer
|
|
}
|
|
|
|
// Count bits excluding equivocators
|
|
equivocatorsAtHeight := g.equivocators[frame.Header.FrameNumber]
|
|
bitCount := 0
|
|
bitmask := frame.Header.PublicKeySignatureBls48581.Bitmask
|
|
|
|
for i := 0; i < len(bitmask); i++ {
|
|
for bit := 0; bit < 8; bit++ {
|
|
if bitmask[i]&(1<<bit) != 0 {
|
|
// Check if this bit position equivocated
|
|
bitPos := i*8 + bit
|
|
if equivocatorsAtHeight == nil || !equivocatorsAtHeight[bitPos] {
|
|
bitCount++
|
|
} else {
|
|
g.logger.Debug(
|
|
"excluding equivocator from seniority",
|
|
zap.Uint64("frame_number", frame.Header.FrameNumber),
|
|
zap.Int("bit_position", bitPos),
|
|
)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if bitCount == 0 {
|
|
return 0 // evicted or all equivocators
|
|
}
|
|
|
|
// Return seniority proportional to number of signers, normalized to SCALE
|
|
maxSigners := uint64(64)
|
|
// Simple linear scaling: more signers = higher seniority
|
|
// To avoid overflow, divide SCALE first
|
|
result := (SCALE / maxSigners) * uint64(bitCount)
|
|
return result
|
|
}
|
|
|
|
// sendHeadEvent sends a head update event
|
|
func (g *GlobalTimeReel) sendHeadEvent(
|
|
newHead *GlobalFrameNode,
|
|
oldHead *GlobalFrameNode,
|
|
) {
|
|
eventType := TimeReelEventNewHead
|
|
var message string
|
|
|
|
if oldHead != nil && !g.isAncestorNode(oldHead, newHead) {
|
|
eventType = TimeReelEventForkDetected
|
|
message = fmt.Sprintf(
|
|
"fork detected: old head %d (%x), new head %d (%x)",
|
|
oldHead.Frame.Header.FrameNumber,
|
|
poseidon.Sum(oldHead.Frame.Header.Output),
|
|
newHead.Frame.Header.FrameNumber,
|
|
poseidon.Sum(newHead.Frame.Header.Output),
|
|
)
|
|
}
|
|
|
|
event := GlobalEvent{
|
|
Type: eventType,
|
|
Frame: newHead.Frame,
|
|
Message: message,
|
|
}
|
|
if oldHead != nil {
|
|
event.OldHead = oldHead.Frame
|
|
}
|
|
g.sendEvent(event)
|
|
}
|
|
|
|
// isAncestorNode checks if ancestor is an ancestor of descendant
|
|
func (g *GlobalTimeReel) isAncestorNode(
|
|
ancestor, descendant *GlobalFrameNode,
|
|
) bool {
|
|
current := descendant
|
|
for current != nil {
|
|
if current == ancestor {
|
|
return true
|
|
}
|
|
current = current.Parent
|
|
}
|
|
return false
|
|
}
|
|
|
|
// findCommonAncestor finds the most recent common ancestor of two nodes
|
|
func (g *GlobalTimeReel) findCommonAncestor(
|
|
node1, node2 *GlobalFrameNode,
|
|
) (*GlobalFrameNode, []*GlobalFrameNode) {
|
|
// Build path from node1 to root
|
|
path1 := make(map[*GlobalFrameNode]bool)
|
|
current := node1
|
|
for current != nil {
|
|
path1[current] = true
|
|
current = current.Parent
|
|
}
|
|
|
|
// Walk from node2 to root and find first common node
|
|
current = node2
|
|
for current != nil {
|
|
if path1[current] {
|
|
g.logger.Info(
|
|
"found common ancestor",
|
|
zap.Uint64("ancestor_frame", current.Frame.Header.FrameNumber),
|
|
zap.Uint64("node1_frame", node1.Frame.Header.FrameNumber),
|
|
zap.Uint64("node2_frame", node2.Frame.Header.FrameNumber))
|
|
prev := []*GlobalFrameNode{node1}
|
|
walk := node1
|
|
for walk != current {
|
|
prev = append(prev, walk.Parent)
|
|
walk = walk.Parent
|
|
}
|
|
return current, prev
|
|
}
|
|
current = current.Parent
|
|
}
|
|
|
|
g.logger.Warn(
|
|
"no common ancestor found",
|
|
zap.Uint64("node1_frame", node1.Frame.Header.FrameNumber),
|
|
zap.Uint64("node2_frame", node2.Frame.Header.FrameNumber),
|
|
)
|
|
return nil, nil
|
|
}
|
|
|
|
// emitReplayEvents emits new head events for each frame in the path from
|
|
// commonAncestor to newHead
|
|
func (g *GlobalTimeReel) emitReplayEvents(
|
|
commonAncestor, newHead *GlobalFrameNode,
|
|
) {
|
|
// Build path from newHead back to commonAncestor
|
|
var reversePath []*GlobalFrameNode
|
|
current := newHead
|
|
for current != nil && current != commonAncestor {
|
|
reversePath = append(reversePath, current)
|
|
current = current.Parent
|
|
}
|
|
|
|
// Reverse the path to get correct order (from ancestor toward new head)
|
|
replayPath := make([]*GlobalFrameNode, len(reversePath))
|
|
for i, node := range reversePath {
|
|
replayPath[len(reversePath)-1-i] = node
|
|
}
|
|
|
|
// Persist the canonical replay path in order.
|
|
frames := make([]*protobufs.GlobalFrame, 0, len(replayPath))
|
|
for _, n := range replayPath {
|
|
frames = append(frames, n.Frame)
|
|
}
|
|
if err := g.persistCanonicalFrames(frames); err != nil {
|
|
g.logger.Warn(
|
|
"failed to persist canonical replay path",
|
|
zap.Error(err),
|
|
zap.Int("replay_len", len(frames)),
|
|
)
|
|
}
|
|
|
|
g.logger.Info(
|
|
"emitting replay events",
|
|
zap.Int("replay_path_length", len(replayPath)),
|
|
zap.String("common_ancestor", fmt.Sprintf(
|
|
"frame_%d",
|
|
commonAncestor.Frame.Header.FrameNumber,
|
|
)),
|
|
zap.String("common_ancestor_selector", fmt.Sprintf(
|
|
"%x",
|
|
computeGlobalPoseidonHash(commonAncestor.Frame.Header.Output),
|
|
)),
|
|
zap.String("new_head", fmt.Sprintf(
|
|
"frame_%d",
|
|
newHead.Frame.Header.FrameNumber,
|
|
)),
|
|
zap.String("new_head_selector", fmt.Sprintf(
|
|
"%x",
|
|
computeGlobalPoseidonHash(newHead.Frame.Header.Output),
|
|
)),
|
|
)
|
|
|
|
// Emit new head events for each frame in the replay path sequentially
|
|
for _, node := range replayPath {
|
|
g.logger.Info(
|
|
"emitting replay event for frame",
|
|
zap.Uint64("frame_number", node.Frame.Header.FrameNumber))
|
|
event := GlobalEvent{
|
|
Type: TimeReelEventNewHead,
|
|
Frame: node.Frame,
|
|
}
|
|
g.sendEvent(event)
|
|
}
|
|
}
|
|
|
|
// rewindFrames reverts side effects for frames in the old branch being unwound
|
|
func (g *GlobalTimeReel) rewindFrames(revertNodes []*GlobalFrameNode) {
|
|
if len(revertNodes) == 0 {
|
|
return
|
|
}
|
|
|
|
g.logger.Info(
|
|
"rewinding frames",
|
|
zap.Int("rewind_path_length", len(revertNodes)),
|
|
)
|
|
|
|
// Create a transaction for reverting side effects
|
|
txn, err := g.store.NewTransaction(false)
|
|
if err != nil {
|
|
g.logger.Error(
|
|
"failed to create transaction for rewind",
|
|
zap.Error(err),
|
|
)
|
|
return
|
|
}
|
|
|
|
// Process each frame in the revert list (already in correct order)
|
|
for _, node := range revertNodes {
|
|
if node.Frame.Header.FrameNumber == 244200 {
|
|
continue
|
|
}
|
|
|
|
g.logger.Info(
|
|
"reverting frame",
|
|
zap.Uint64("frame_number", node.Frame.Header.FrameNumber),
|
|
)
|
|
|
|
// Call revert function to undo side effects
|
|
if err := g.revertFunc(
|
|
txn,
|
|
node.Frame.Header.FrameNumber,
|
|
node.Frame.Requests,
|
|
); err != nil {
|
|
txn.Abort()
|
|
g.logger.Error(
|
|
"failed to revert frame side effects",
|
|
zap.Uint64("frame_number", node.Frame.Header.FrameNumber),
|
|
zap.Error(err),
|
|
)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Commit the revert transaction
|
|
if err := txn.Commit(); err != nil {
|
|
g.logger.Error(
|
|
"failed to commit revert transaction",
|
|
zap.Error(err),
|
|
)
|
|
return
|
|
}
|
|
|
|
g.logger.Info(
|
|
"successfully rewound frames",
|
|
zap.Int("reverted_count", len(revertNodes)),
|
|
)
|
|
}
|
|
|
|
// ComputeFrameID computes a unique ID for a frame
|
|
func (g *GlobalTimeReel) ComputeFrameID(
|
|
frame *protobufs.GlobalFrame,
|
|
) string {
|
|
// Create a unique identifier based on frame contents
|
|
data := fmt.Sprintf("%d:%d:%x:%x",
|
|
frame.Header.FrameNumber,
|
|
frame.Header.Timestamp,
|
|
frame.Header.Output,
|
|
frame.Header.ParentSelector,
|
|
)
|
|
hash := computeGlobalPoseidonHash([]byte(data))
|
|
return fmt.Sprintf("%x", hash)
|
|
}
|
|
|
|
// GetHead returns the current head frame
|
|
func (g *GlobalTimeReel) GetHead() (*protobufs.GlobalFrame, error) {
|
|
g.mu.RLock()
|
|
defer g.mu.RUnlock()
|
|
|
|
if g.head == nil {
|
|
return nil, errors.New("no head frame")
|
|
}
|
|
|
|
return g.head.Frame, nil
|
|
}
|
|
|
|
// GetFrame retrieves a frame by frame ID
|
|
func (g *GlobalTimeReel) GetFrame(frameID string) (
|
|
*protobufs.GlobalFrame,
|
|
error,
|
|
) {
|
|
g.mu.RLock()
|
|
defer g.mu.RUnlock()
|
|
|
|
// Check cache first
|
|
if node, ok := g.cache.Get(frameID); ok {
|
|
return node.Frame, nil
|
|
}
|
|
|
|
// Check main storage
|
|
node, exists := g.nodes[frameID]
|
|
if !exists {
|
|
return nil, errors.New("frame not found")
|
|
}
|
|
|
|
// Add to cache
|
|
g.cache.Add(frameID, node)
|
|
return node.Frame, nil
|
|
}
|
|
|
|
// GetFramesByNumber retrieves all frames at a specific frame number
|
|
func (g *GlobalTimeReel) GetFramesByNumber(frameNumber uint64) (
|
|
[]*protobufs.GlobalFrame,
|
|
error,
|
|
) {
|
|
g.mu.RLock()
|
|
defer g.mu.RUnlock()
|
|
|
|
nodes := g.framesByNumber[frameNumber]
|
|
if len(nodes) == 0 {
|
|
return nil, errors.New("no frames found at frame number")
|
|
}
|
|
|
|
frames := make([]*protobufs.GlobalFrame, len(nodes))
|
|
for i, node := range nodes {
|
|
frames[i] = node.Frame
|
|
}
|
|
|
|
return frames, nil
|
|
}
|
|
|
|
// GetLineage returns the full lineage of frames from genesis to the head
|
|
func (g *GlobalTimeReel) GetLineage() ([]*protobufs.GlobalFrame, error) {
|
|
g.mu.RLock()
|
|
defer g.mu.RUnlock()
|
|
|
|
if g.head == nil {
|
|
return nil, errors.New("no head frame")
|
|
}
|
|
|
|
return g.getNodeLineage(g.head), nil
|
|
}
|
|
|
|
// GetNodeLineage returns the lineage for a specific node
|
|
func (g *GlobalTimeReel) GetNodeLineage(frameID string) (
|
|
[]*protobufs.GlobalFrame,
|
|
error,
|
|
) {
|
|
g.mu.RLock()
|
|
defer g.mu.RUnlock()
|
|
|
|
node, exists := g.nodes[frameID]
|
|
if !exists {
|
|
return nil, errors.New("frame not found")
|
|
}
|
|
|
|
return g.getNodeLineage(node), nil
|
|
}
|
|
|
|
// getNodeLineage returns the lineage from genesis to the specified node
|
|
func (g *GlobalTimeReel) getNodeLineage(
|
|
node *GlobalFrameNode,
|
|
) []*protobufs.GlobalFrame {
|
|
var lineage []*protobufs.GlobalFrame
|
|
current := node
|
|
for current != nil {
|
|
lineage = append([]*protobufs.GlobalFrame{current.Frame}, lineage...)
|
|
current = current.Parent
|
|
}
|
|
return lineage
|
|
}
|
|
|
|
// GetEventCh returns the event channel
|
|
func (g *GlobalTimeReel) GetEventCh() <-chan GlobalEvent {
|
|
return g.eventCh
|
|
}
|
|
|
|
// GetChildFrames returns all known child frames of a given parent frame
|
|
func (g *GlobalTimeReel) GetChildFrames(parentFrameID string) (
|
|
[]*protobufs.GlobalFrame,
|
|
error,
|
|
) {
|
|
g.mu.RLock()
|
|
defer g.mu.RUnlock()
|
|
|
|
parent, exists := g.nodes[parentFrameID]
|
|
if !exists {
|
|
return nil, errors.New("parent frame not found")
|
|
}
|
|
|
|
children := make([]*protobufs.GlobalFrame, 0, len(parent.Children))
|
|
for _, child := range parent.Children {
|
|
children = append(children, child.Frame)
|
|
}
|
|
|
|
return children, nil
|
|
}
|
|
|
|
// GetPendingFrames returns information about frames waiting for parents
|
|
func (
|
|
g *GlobalTimeReel,
|
|
) GetPendingFrames() map[string][]*protobufs.GlobalFrame {
|
|
g.mu.RLock()
|
|
defer g.mu.RUnlock()
|
|
|
|
result := make(map[string][]*protobufs.GlobalFrame)
|
|
for selector, pendingList := range g.pendingFrames {
|
|
frames := make([]*protobufs.GlobalFrame, len(pendingList))
|
|
for i, pending := range pendingList {
|
|
frames[i] = pending.Frame
|
|
}
|
|
result[selector] = frames
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// GetBranchTips returns all leaf nodes (potential heads)
|
|
func (g *GlobalTimeReel) GetBranchTips() (
|
|
[]*protobufs.GlobalFrame,
|
|
error,
|
|
) {
|
|
g.mu.RLock()
|
|
defer g.mu.RUnlock()
|
|
|
|
leafNodes := g.findLeafNodes()
|
|
tips := make([]*protobufs.GlobalFrame, len(leafNodes))
|
|
for i, node := range leafNodes {
|
|
tips[i] = node.Frame
|
|
}
|
|
|
|
return tips, nil
|
|
}
|
|
|
|
// SetForkChoiceParams updates the fork choice parameters
|
|
func (g *GlobalTimeReel) SetForkChoiceParams(params Params) {
|
|
g.mu.Lock()
|
|
defer g.mu.Unlock()
|
|
g.forkChoiceParams = params
|
|
}
|
|
|
|
// pruneOldFrames removes frames older than maxGlobalTreeDepth from the
|
|
// in-memory cache to prevent unbounded memory growth. The store handles its own
|
|
// pruning based on archive mode.
|
|
func (g *GlobalTimeReel) pruneOldFrames() {
|
|
if g.head == nil || g.head.Depth < maxGlobalTreeDepth {
|
|
return // Not enough frames to prune
|
|
}
|
|
|
|
// Calculate the minimum depth to keep
|
|
minDepthToKeep := g.head.Depth - maxGlobalTreeDepth + 1
|
|
|
|
// Find all nodes that should be pruned (depth < minDepthToKeep)
|
|
var nodesToPrune []*GlobalFrameNode
|
|
for _, node := range g.nodes {
|
|
if node.Depth < minDepthToKeep {
|
|
nodesToPrune = append(nodesToPrune, node)
|
|
}
|
|
}
|
|
|
|
if len(nodesToPrune) == 0 {
|
|
return // Nothing to prune
|
|
}
|
|
|
|
// First, clear parent references from children of nodes being pruned
|
|
// This ensures proper garbage collection
|
|
for _, node := range nodesToPrune {
|
|
for _, child := range node.Children {
|
|
if child.Depth >= minDepthToKeep {
|
|
// This child is being kept, so clear its parent reference
|
|
child.Parent = nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Remove nodes from data structures
|
|
for _, node := range nodesToPrune {
|
|
frameID := g.ComputeFrameID(node.Frame)
|
|
|
|
// Remove from nodes map
|
|
delete(g.nodes, frameID)
|
|
|
|
// Remove from framesByNumber
|
|
frameNum := node.Frame.Header.FrameNumber
|
|
if nodeList, exists := g.framesByNumber[frameNum]; exists {
|
|
// Filter out the node to be pruned
|
|
var filteredList []*GlobalFrameNode
|
|
for _, n := range nodeList {
|
|
if n != node {
|
|
filteredList = append(filteredList, n)
|
|
}
|
|
}
|
|
if len(filteredList) == 0 {
|
|
delete(g.framesByNumber, frameNum)
|
|
} else {
|
|
g.framesByNumber[frameNum] = filteredList
|
|
}
|
|
}
|
|
|
|
// Remove from cache
|
|
g.cache.Remove(frameID)
|
|
|
|
// Disconnect from parent
|
|
if node.Parent != nil {
|
|
delete(node.Parent.Children, frameID)
|
|
}
|
|
|
|
// Update root if necessary
|
|
if g.root == node {
|
|
// Find new root (should be one of the remaining nodes at minimum depth)
|
|
g.root = nil
|
|
for _, remaining := range g.nodes {
|
|
if remaining.Depth == minDepthToKeep {
|
|
if g.root == nil ||
|
|
remaining.Frame.Header.FrameNumber <
|
|
g.root.Frame.Header.FrameNumber {
|
|
g.root = remaining
|
|
}
|
|
}
|
|
}
|
|
// Clear parent reference for new root
|
|
if g.root != nil {
|
|
g.root.Parent = nil
|
|
}
|
|
}
|
|
}
|
|
|
|
g.logger.Info(
|
|
"pruned old frames",
|
|
zap.Int("pruned_count", len(nodesToPrune)),
|
|
zap.Uint64("min_depth_kept", minDepthToKeep),
|
|
zap.Uint64("head_depth", g.head.Depth))
|
|
}
|
|
|
|
// pruneOldPendingFrames removes pending frames that are too old
|
|
func (g *GlobalTimeReel) pruneOldPendingFrames() {
|
|
// Prune pending frames older than 1.5 hours
|
|
const maxPendingAge = 1 * 90 * 60 * 1000 // 1.5 hours in milliseconds
|
|
currentTime := time.Now().UnixMilli()
|
|
|
|
prunedCount := 0
|
|
for selector, pendingList := range g.pendingFrames {
|
|
var filteredList []*GlobalPendingFrame
|
|
for _, pending := range pendingList {
|
|
age := currentTime - pending.Timestamp
|
|
if age <= maxPendingAge {
|
|
filteredList = append(filteredList, pending)
|
|
} else {
|
|
prunedCount++
|
|
}
|
|
}
|
|
|
|
if len(filteredList) == 0 {
|
|
delete(g.pendingFrames, selector)
|
|
} else {
|
|
g.pendingFrames[selector] = filteredList
|
|
}
|
|
}
|
|
|
|
if prunedCount > 0 {
|
|
g.logger.Info(
|
|
"pruned old pending frames",
|
|
zap.Int("pruned_count", prunedCount),
|
|
zap.Int("remaining_selectors", len(g.pendingFrames)))
|
|
}
|
|
}
|
|
|
|
// GetTreeInfo returns debugging information about the tree structure
|
|
func (g *GlobalTimeReel) GetTreeInfo() map[string]interface{} {
|
|
g.mu.RLock()
|
|
defer g.mu.RUnlock()
|
|
|
|
info := map[string]interface{}{
|
|
"total_nodes": len(g.nodes),
|
|
"pending_frames": len(g.pendingFrames),
|
|
"max_depth": 0,
|
|
"branch_count": 0,
|
|
"max_tree_depth": maxTreeDepth,
|
|
"pruning_needed": false,
|
|
}
|
|
|
|
if g.head != nil {
|
|
info["head_depth"] = g.head.Depth
|
|
info["head_frame_number"] = g.head.Frame.Header.FrameNumber
|
|
info["pruning_needed"] = g.head.Depth >= maxTreeDepth
|
|
}
|
|
|
|
// Calculate max depth and branch count
|
|
leafNodes := g.findLeafNodes()
|
|
info["branch_count"] = len(leafNodes)
|
|
|
|
maxDepth := uint64(0)
|
|
minDepth := uint64(0)
|
|
if len(g.nodes) > 0 {
|
|
minDepth = ^uint64(0) // max value
|
|
for _, node := range g.nodes {
|
|
if node.Depth > maxDepth {
|
|
maxDepth = node.Depth
|
|
}
|
|
if node.Depth < minDepth {
|
|
minDepth = node.Depth
|
|
}
|
|
}
|
|
}
|
|
info["max_depth"] = maxDepth
|
|
info["min_depth"] = minDepth
|
|
info["tree_span"] = maxDepth - minDepth + 1
|
|
|
|
// Count pending frames
|
|
totalPending := 0
|
|
for _, pendingList := range g.pendingFrames {
|
|
totalPending += len(pendingList)
|
|
}
|
|
info["total_pending"] = totalPending
|
|
|
|
return info
|
|
}
|
|
|
|
// updateTreeMetrics updates Prometheus metrics for the tree state
|
|
func (g *GlobalTimeReel) updateTreeMetrics() {
|
|
// Count total nodes
|
|
treeNodeCount.WithLabelValues("global").Set(float64(len(g.nodes)))
|
|
|
|
// Set tree depth
|
|
if g.head != nil {
|
|
treeDepth.WithLabelValues("global").Set(float64(g.head.Depth))
|
|
}
|
|
|
|
// Count pending frames
|
|
totalPending := 0
|
|
for _, pendingList := range g.pendingFrames {
|
|
totalPending += len(pendingList)
|
|
}
|
|
pendingFramesCount.WithLabelValues("global").Set(float64(totalPending))
|
|
|
|
// Count unique equivocators
|
|
uniqueEquivocators := make(map[int]bool)
|
|
for _, equivocatorsAtHeight := range g.equivocators {
|
|
for bitPos := range equivocatorsAtHeight {
|
|
uniqueEquivocators[bitPos] = true
|
|
}
|
|
}
|
|
equivocatorsTracked.WithLabelValues("global").Set(
|
|
float64(len(uniqueEquivocators)),
|
|
)
|
|
}
|
|
|
|
// bootstrapFromStore loads up to the last maxGlobalTreeDepth canonical frames
|
|
// from durable storage and fills the in-memory structures and cache.
|
|
func (g *GlobalTimeReel) bootstrapFromStore() error {
|
|
g.mu.Lock()
|
|
defer g.mu.Unlock()
|
|
|
|
latest, err := g.store.GetLatestGlobalClockFrame()
|
|
if err != nil {
|
|
if errors.Is(err, store.ErrNotFound) {
|
|
// Fresh DB — nothing to load.
|
|
return nil
|
|
}
|
|
return errors.Wrap(err, "bootstrap from store")
|
|
}
|
|
latestNum := latest.Header.FrameNumber
|
|
|
|
var start uint64
|
|
if !g.archiveMode && latestNum+1 > maxGlobalTreeDepth {
|
|
// Non-archive mode: only load last 10 frames
|
|
start = latestNum - (maxGlobalTreeDepth - 1)
|
|
} else {
|
|
// Archive mode or insufficient frames: load all available
|
|
start = g.genesisFrameNumber
|
|
}
|
|
|
|
iter, err := g.store.RangeGlobalClockFrames(start, latestNum)
|
|
if err != nil {
|
|
return errors.Wrap(err, "bootstrap from store")
|
|
}
|
|
defer iter.Close()
|
|
|
|
var prev *GlobalFrameNode
|
|
|
|
for ok := iter.First(); ok && iter.Valid(); ok = iter.Next() {
|
|
frame, err := iter.Value()
|
|
if err != nil {
|
|
return errors.Wrap(err, "bootstrap from store")
|
|
}
|
|
frameID := g.ComputeFrameID(frame)
|
|
|
|
// Create node; parent will be prev if selector matches, else try find by
|
|
// selector.
|
|
node := &GlobalFrameNode{
|
|
Frame: frame,
|
|
Parent: nil,
|
|
Children: make(map[string]*GlobalFrameNode),
|
|
Depth: 0,
|
|
}
|
|
|
|
// Try to link to known parent (best effort; first loaded frame may lack
|
|
// parent in memory).
|
|
if prev != nil {
|
|
// Fast path: if this frame points to prev's output, link.
|
|
expSel := computeGlobalPoseidonHash(prev.Frame.Header.Output)
|
|
if bytes.Equal(frame.Header.ParentSelector, expSel) {
|
|
node.Parent = prev
|
|
node.Depth = prev.Depth + 1
|
|
prev.Children[frameID] = node
|
|
}
|
|
}
|
|
if node.Parent == nil {
|
|
// Slow path: search existing nodes by selector (rare on linear history).
|
|
if p := g.findNodeBySelector(frame.Header.ParentSelector); p != nil {
|
|
node.Parent = p
|
|
node.Depth = p.Depth + 1
|
|
p.Children[frameID] = node
|
|
} else if !g.archiveMode && frame.Header.FrameNumber == start {
|
|
// Non-archive mode: first frame loaded may not have parent in DB.
|
|
// Treat it as a pseudo-root with depth 0.
|
|
node.Depth = 0
|
|
g.logger.Info(
|
|
"non-archive mode: treating first loaded frame as pseudo-root",
|
|
zap.Uint64("frame_number", frame.Header.FrameNumber),
|
|
)
|
|
}
|
|
}
|
|
|
|
if g.root == nil || (!g.archiveMode && frame.Header.FrameNumber == start) {
|
|
// Set root to first loaded frame (actual genesis or pseudo-root in non-archive mode)
|
|
g.root = node
|
|
}
|
|
|
|
g.nodes[frameID] = node
|
|
g.framesByNumber[frame.Header.FrameNumber] = append(
|
|
g.framesByNumber[frame.Header.FrameNumber],
|
|
node,
|
|
)
|
|
g.cache.Add(frameID, node)
|
|
|
|
prev = node
|
|
g.head = node
|
|
}
|
|
|
|
// Warm up the tree metrics
|
|
g.updateTreeMetrics()
|
|
|
|
if g.head != nil {
|
|
g.logger.Info(
|
|
"bootstrapped time reel from store",
|
|
zap.Uint64("loaded_from_frame", start),
|
|
zap.Uint64("loaded_to_frame", g.head.Frame.Header.FrameNumber),
|
|
zap.Int("loaded_count", len(g.nodes)),
|
|
zap.Bool("archive_mode", g.archiveMode),
|
|
)
|
|
|
|
if !g.archiveMode && g.root != nil {
|
|
g.logger.Info(
|
|
"non-archive mode: accepting last 10 frames as valid chain",
|
|
zap.Uint64("pseudo_root_frame", g.root.Frame.Header.FrameNumber),
|
|
zap.Uint64("head_frame", g.head.Frame.Header.FrameNumber),
|
|
)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// persistCanonicalFrames writes a contiguous set of canonical frames to the
|
|
// store in one txn. In non-archive mode, it also prunes old frames from the
|
|
// store.
|
|
func (g *GlobalTimeReel) persistCanonicalFrames(
|
|
frames []*protobufs.GlobalFrame,
|
|
) error {
|
|
if len(frames) == 0 {
|
|
return nil
|
|
}
|
|
|
|
txn, err := g.store.NewTransaction(false)
|
|
if err != nil {
|
|
return errors.Wrap(err, "persist canonical frames")
|
|
}
|
|
|
|
for _, f := range frames {
|
|
if err := g.materializeFunc(txn, f); err != nil {
|
|
_ = txn.Abort()
|
|
return errors.Wrap(err, "persist canonical frames")
|
|
}
|
|
if err := g.store.PutGlobalClockFrame(f, txn); err != nil {
|
|
_ = txn.Abort()
|
|
return errors.Wrap(err, "persist canonical frames")
|
|
}
|
|
}
|
|
|
|
if err := txn.Commit(); err != nil {
|
|
return errors.Wrap(err, "persist canonical frames")
|
|
}
|
|
|
|
// In non-archive mode, prune frames older than maxGlobalTreeDepth from store
|
|
if !g.archiveMode && g.head != nil {
|
|
// Calculate the oldest frame we want to keep
|
|
if g.head.Frame.Header.FrameNumber > maxGlobalTreeDepth {
|
|
oldestToKeep := g.head.Frame.Header.FrameNumber - maxGlobalTreeDepth + 1
|
|
err := g.store.DeleteGlobalClockFrameRange(0, oldestToKeep)
|
|
if err != nil {
|
|
g.logger.Error("unable to delete frame range", zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|