From 40a94b18635ff1fd2af0e166c4b3f10c4eda85e3 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Thu, 27 Mar 2025 23:22:14 -0500 Subject: [PATCH] bump sidecar --- node/consensus/data/token_handle_mint_test.go | 6 +- node/consensus/time/data_time_reel.go | 1154 ++++++++++------- node/rpc/hypergraph_sync_rpc_server_test.go | 1 + sidecar/main.go | 3 +- 4 files changed, 722 insertions(+), 442 deletions(-) diff --git a/node/consensus/data/token_handle_mint_test.go b/node/consensus/data/token_handle_mint_test.go index 05ad6c8..4af9232 100644 --- a/node/consensus/data/token_handle_mint_test.go +++ b/node/consensus/data/token_handle_mint_test.go @@ -147,9 +147,9 @@ func TestHandlePreMidnightMint(t *testing.T) { preMidnightMint: map[string]struct{}{}, } - d.dataTimeReel.SetHead(&protobufs.ClockFrame{ - FrameNumber: 0, - }) + // d.dataTimeReel.SetHead(&protobufs.ClockFrame{ + // FrameNumber: 0, + // }) d.pubSub = pubsub{ privkey: bprivKey, diff --git a/node/consensus/time/data_time_reel.go b/node/consensus/time/data_time_reel.go index 63a51ab..0f845c1 100644 --- a/node/consensus/time/data_time_reel.go +++ b/node/consensus/time/data_time_reel.go @@ -24,16 +24,16 @@ var unknownDistance = new(big.Int).SetBytes([]byte{ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, }) +// pendingFrame represents a frame that has been received but not yet processed type pendingFrame struct { selector *big.Int parentSelector *big.Int frameNumber uint64 + distance *big.Int // Store the distance for quick access done chan struct{} } type DataTimeReel struct { - rwMutex sync.RWMutex - ctx context.Context cancel context.CancelFunc wg sync.WaitGroup @@ -60,12 +60,15 @@ type DataTimeReel struct { headDistance *big.Int lruFrames *lru.Cache[string, string] proverTries []*tries.RollingFrecencyCritbitTrie - pending map[uint64][]*pendingFrame - incompleteForks map[uint64][]*pendingFrame - frames chan *pendingFrame - newFrameCh chan *protobufs.ClockFrame - badFrameCh chan *protobufs.ClockFrame - alwaysSend bool + + pending map[uint64][]*pendingFrame + childFrames map[string][]*pendingFrame + incompleteForks map[uint64][]*pendingFrame + + frames chan *pendingFrame + newFrameCh chan *protobufs.ClockFrame + badFrameCh chan *protobufs.ClockFrame + alwaysSend bool } func NewDataTimeReel( @@ -131,6 +134,7 @@ func NewDataTimeReel( initialProverKeys: initialProverKeys, lruFrames: cache, pending: make(map[uint64][]*pendingFrame), + childFrames: make(map[string][]*pendingFrame), incompleteForks: make(map[uint64][]*pendingFrame), frames: make(chan *pendingFrame, 65536), newFrameCh: make(chan *protobufs.ClockFrame), @@ -139,6 +143,77 @@ func NewDataTimeReel( } } +func (d *DataTimeReel) createGenesisFrame() ( + *protobufs.ClockFrame, + []*tries.RollingFrecencyCritbitTrie, +) { + if d.origin == nil { + panic("origin is nil") + } + + if d.initialInclusionProof == nil { + panic("initial inclusion proof is nil") + } + + if d.initialProverKeys == nil { + panic("initial prover keys is nil") + } + difficulty := d.engineConfig.Difficulty + if difficulty == 0 || difficulty == 10000 { + difficulty = 200000 + } + frame, tries, err := d.frameProver.CreateDataGenesisFrame( + d.filter, + d.origin, + difficulty, + d.initialInclusionProof, + d.initialProverKeys, + ) + if err != nil { + panic(err) + } + selector, err := frame.GetSelector() + if err != nil { + panic(err) + } + txn, err := d.clockStore.NewTransaction(false) + if err != nil { + panic(err) + } + err = d.clockStore.StageDataClockFrame( + selector.FillBytes(make([]byte, 32)), + frame, + txn, + ) + if err != nil { + txn.Abort() + panic(err) + } + err = txn.Commit() + if err != nil { + txn.Abort() + panic(err) + } + txn, err = d.clockStore.NewTransaction(false) + if err != nil { + panic(err) + } + if err := d.clockStore.CommitDataClockFrame( + d.filter, + 0, + selector.FillBytes(make([]byte, 32)), + tries, + txn, + false, + ); err != nil { + panic(err) + } + if err := txn.Commit(); err != nil { + panic(err) + } + return frame, tries +} + func (d *DataTimeReel) Start() error { frame, tries, err := d.clockStore.GetLatestDataClockFrame(d.filter) if err != nil && !errors.Is(err, store.ErrNotFound) { @@ -165,36 +240,24 @@ func (d *DataTimeReel) Start() error { return nil } -func (d *DataTimeReel) SetHead(frame *protobufs.ClockFrame) { - d.head = frame -} - -func (d *DataTimeReel) Head() (*protobufs.ClockFrame, error) { - return d.head, nil -} - -var alreadyDone chan struct{} = func() chan struct{} { - done := make(chan struct{}) - close(done) - return done -}() - -// Insert enqueues a structurally valid frame into the time reel. If the frame -// is the next one in sequence, it advances the reel head forward and emits a -// new frame on the new frame channel. -func (d *DataTimeReel) Insert(ctx context.Context, frame *protobufs.ClockFrame) (<-chan struct{}, error) { +// Insert enqueues a structurally valid frame into the time reel. +func (d *DataTimeReel) Insert( + ctx context.Context, + frame *protobufs.ClockFrame, +) (<-chan struct{}, error) { if err := d.ctx.Err(); err != nil { return nil, err } - d.logger.Debug( + d.logger.Info( "insert frame", zap.Uint64("frame_number", frame.FrameNumber), zap.String("output_tag", hex.EncodeToString(frame.Output[:64])), ) + // Check if we've already seen this frame if d.lruFrames.Contains(string(frame.Output[:64])) { - return nil, nil + return alreadyDone, nil } d.lruFrames.Add(string(frame.Output[:64]), string(frame.ParentSelector)) @@ -205,12 +268,29 @@ func (d *DataTimeReel) Insert(ctx context.Context, frame *protobufs.ClockFrame) panic(err) } - distance, _ := d.GetDistance(frame) + distance, err := d.GetDistance(frame) + if err != nil && !errors.Is(err, store.ErrNotFound) { + panic(err) + } + + d.storePending(selector, parent, distance, frame) + + parentHex := hex.EncodeToString(frame.ParentSelector) + pendingFr := &pendingFrame{ + selector: selector, + parentSelector: parent, + frameNumber: frame.FrameNumber, + distance: distance, + } + + d.childFrames[parentHex] = append(d.childFrames[parentHex], pendingFr) if d.head.FrameNumber < frame.FrameNumber { - d.storePending(selector, parent, distance, frame) + go d.setHead(frame, distance) - if d.head.FrameNumber+1 == frame.FrameNumber { + d.addPending(selector, parent, frame.FrameNumber, distance) + + if d.head.FrameNumber+1 == frame.FrameNumber || d.canFillGap(frame) { done := make(chan struct{}) select { case <-ctx.Done(): @@ -221,6 +301,25 @@ func (d *DataTimeReel) Insert(ctx context.Context, frame *protobufs.ClockFrame) selector: selector, parentSelector: parent, frameNumber: frame.FrameNumber, + distance: distance, + done: done, + }: + return done, nil + } + } + } else if d.head.FrameNumber == frame.FrameNumber { + if !bytes.Equal(d.head.Output, frame.Output) { + done := make(chan struct{}) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-d.ctx.Done(): + return nil, d.ctx.Err() + case d.frames <- &pendingFrame{ + selector: selector, + parentSelector: parent, + frameNumber: frame.FrameNumber, + distance: distance, done: done, }: return done, nil @@ -231,244 +330,27 @@ func (d *DataTimeReel) Insert(ctx context.Context, frame *protobufs.ClockFrame) return alreadyDone, nil } -func ( - d *DataTimeReel, -) GetFrameProverTries() []*tries.RollingFrecencyCritbitTrie { - return d.proverTries -} - -func (d *DataTimeReel) NewFrameCh() <-chan *protobufs.ClockFrame { - return d.newFrameCh -} - -func (d *DataTimeReel) BadFrameCh() <-chan *protobufs.ClockFrame { - return d.badFrameCh -} - -func (d *DataTimeReel) Stop() { - d.cancel() - d.wg.Wait() -} - -func (d *DataTimeReel) createGenesisFrame() ( - *protobufs.ClockFrame, - []*tries.RollingFrecencyCritbitTrie, -) { - if d.origin == nil { - panic("origin is nil") +func (d *DataTimeReel) canFillGap(frame *protobufs.ClockFrame) bool { + if frame.FrameNumber <= d.head.FrameNumber+1 { + return false } - if d.initialInclusionProof == nil { - panic("initial inclusion proof is nil") - } - - if d.initialProverKeys == nil { - panic("initial prover keys is nil") - } - - difficulty := d.engineConfig.Difficulty - if difficulty == 0 || difficulty == 10000 { - difficulty = 200000 - } - - frame, tries, err := d.frameProver.CreateDataGenesisFrame( - d.filter, - d.origin, - difficulty, - d.initialInclusionProof, - d.initialProverKeys, - ) - if err != nil { - panic(err) - } - - selector, err := frame.GetSelector() - if err != nil { - panic(err) - } - - txn, err := d.clockStore.NewTransaction(false) - if err != nil { - panic(err) - } - - err = d.clockStore.StageDataClockFrame( - selector.FillBytes(make([]byte, 32)), - frame, - txn, - ) - if err != nil { - txn.Abort() - panic(err) - } - - err = txn.Commit() - if err != nil { - txn.Abort() - panic(err) - } - - txn, err = d.clockStore.NewTransaction(false) - if err != nil { - panic(err) - } - - if err := d.clockStore.CommitDataClockFrame( - d.filter, - 0, - selector.FillBytes(make([]byte, 32)), - tries, - txn, - false, - ); err != nil { - panic(err) - } - - if err := txn.Commit(); err != nil { - panic(err) - } - - return frame, tries -} - -// Main data consensus loop -func (d *DataTimeReel) runLoop() { - defer d.wg.Done() - for { - select { - case <-d.ctx.Done(): - return - case frame := <-d.frames: - rawFrame, err := d.clockStore.GetStagedDataClockFrame( - d.filter, - frame.frameNumber, - frame.selector.FillBytes(make([]byte, 32)), - false, - ) - if err != nil { - panic(err) - } - d.logger.Debug( - "processing frame", - zap.Uint64("frame_number", rawFrame.FrameNumber), - zap.String("output_tag", hex.EncodeToString(rawFrame.Output[:64])), - zap.Uint64("head_number", d.head.FrameNumber), - zap.String("head_output_tag", hex.EncodeToString(d.head.Output[:64])), - ) - // Most common scenario: in order – new frame is higher number - if d.head.FrameNumber < rawFrame.FrameNumber { - d.logger.Debug("frame is higher") - - parent := new(big.Int).SetBytes(rawFrame.ParentSelector) - selector, err := rawFrame.GetSelector() - if err != nil { - panic(err) - } - - distance, err := d.GetDistance(rawFrame) - if err != nil { - if !errors.Is(err, store.ErrNotFound) { - panic(err) - } - - d.addPending(selector, parent, frame.frameNumber) - d.processPending(d.head, frame) - continue - } - - // If the frame has a gap from the head or is not descendent, mark it as - // pending: - if rawFrame.FrameNumber-d.head.FrameNumber != 1 { - d.processPending(d.head, frame) - continue - } - - // Otherwise set it as the next and process all pending - if err = d.setHead(rawFrame, distance); err != nil { - close(frame.done) - continue - } - d.processPending(d.head, frame) - } else if d.head.FrameNumber == rawFrame.FrameNumber { - // frames are equivalent, no need to act - if bytes.Equal(d.head.Output, rawFrame.Output) { - d.logger.Debug("equivalent frame") - d.processPending(d.head, frame) - continue - } - - distance, err := d.GetDistance(rawFrame) - if err != nil { - panic(err) - } - d.logger.Debug( - "frame is same height", - zap.String("head_distance", d.headDistance.Text(16)), - zap.String("distance", distance.Text(16)), - ) - - // Optimization: if competing frames share a parent we can short-circuit - // fork choice - if bytes.Equal(d.head.ParentSelector, rawFrame.ParentSelector) && - distance.Cmp(d.headDistance) < 0 { - d.logger.Debug( - "frame shares parent, has shorter distance, short circuit", - ) - d.totalDistance.Sub(d.totalDistance, d.headDistance) - d.setHead(rawFrame, distance) - d.processPending(d.head, frame) - continue - } - - // Choose fork - d.forkChoice(rawFrame, distance) - d.processPending(d.head, frame) - } else { - d.logger.Debug("frame is lower height") - - existing, _, err := d.clockStore.GetDataClockFrame( - d.filter, - rawFrame.FrameNumber, - true, - ) - if err != nil { - // if this returns an error it's either not found (which shouldn't - // happen without corruption) or pebble is borked, either way, panic - panic(err) - } - - if !bytes.Equal(existing.Output, rawFrame.Output) { - parent, selector, err := rawFrame.GetParentAndSelector() - if err != nil { - panic(err) - } - - if bytes.Equal(existing.ParentSelector, rawFrame.ParentSelector) { - ld := d.getTotalDistance(existing) - rd := d.getTotalDistance(rawFrame) - if rd.Cmp(ld) < 0 { - d.forkChoice(rawFrame, rd) - d.processPending(d.head, frame) - } else { - d.addPending(selector, parent, frame.frameNumber) - d.processPending(d.head, frame) - } - } else { - d.addPending(selector, parent, frame.frameNumber) - d.processPending(d.head, frame) - } - } - } + for f := d.head.FrameNumber + 1; f < frame.FrameNumber; f++ { + if _, exists := d.pending[f]; !exists || len(d.pending[f]) == 0 { + return false } } + + return true } func (d *DataTimeReel) addPending( selector *big.Int, parent *big.Int, frameNumber uint64, + distance *big.Int, ) { - d.logger.Debug( + d.logger.Info( "add pending", zap.Uint64("head_frame_number", d.head.FrameNumber), zap.Uint64("add_frame_number", frameNumber), @@ -481,17 +363,14 @@ func (d *DataTimeReel) addPending( d.pending[frameNumber] = []*pendingFrame{} } - // avoid heavy thrashing for _, frame := range d.pending[frameNumber] { if frame.selector.Cmp(selector) == 0 { - d.logger.Debug("exists in pending already") + d.logger.Info("exists in pending already") return } } - } - if d.head.FrameNumber <= frameNumber { - d.logger.Debug( + d.logger.Info( "accumulate in pending", zap.Int("pending_neighbors", len(d.pending[frameNumber])), ) @@ -502,6 +381,7 @@ func (d *DataTimeReel) addPending( selector: selector, parentSelector: parent, frameNumber: frameNumber, + distance: distance, }, ) } @@ -513,14 +393,16 @@ func (d *DataTimeReel) storePending( distance *big.Int, frame *protobufs.ClockFrame, ) { - // avoid db thrashing - if existing, err := d.clockStore.GetStagedDataClockFrame( + // Avoid DB thrashing by checking if we already have this frame + existing, err := d.clockStore.GetStagedDataClockFrame( frame.Filter, frame.FrameNumber, selector.FillBytes(make([]byte, 32)), true, - ); err != nil && existing == nil { - d.logger.Debug( + ) + + if err != nil && existing == nil { + d.logger.Info( "not stored yet, save data candidate", zap.Uint64("frame_number", frame.FrameNumber), zap.String("selector", selector.Text(16)), @@ -547,54 +429,34 @@ func (d *DataTimeReel) storePending( } } -func (d *DataTimeReel) processPending( - frame *protobufs.ClockFrame, - lastReceived *pendingFrame, -) { - defer close(lastReceived.done) - d.logger.Debug( - "process pending", - zap.Uint64("head_frame", frame.FrameNumber), - zap.Uint64("last_received_frame", lastReceived.frameNumber), - zap.Int("pending_frame_numbers", len(d.pending)), - ) - +// Main data consensus loop +func (d *DataTimeReel) runLoop() { + defer d.wg.Done() for { select { case <-d.ctx.Done(): return - default: - } - next := d.head.FrameNumber + 1 - sel, err := d.head.GetSelector() - if err != nil { - panic(err) - } - - selector := sel.FillBytes(make([]byte, 32)) - d.logger.Debug( - "checking frame set", - zap.Uint64("pending_frame_number", next), - zap.Uint64("frame_number", frame.FrameNumber), - ) - // Pull the next - d.logger.Debug("try process next") - - rawFrames, err := d.clockStore.GetStagedDataClockFramesForFrameNumber( - d.filter, - next, - ) - if err != nil { - panic(err) - } - - found := false - for _, rawFrame := range rawFrames { - if !bytes.Equal(rawFrame.ParentSelector, selector) { - continue + case frame := <-d.frames: + var frameDone chan struct{} + if frame.done != nil { + frameDone = frame.done + frame.done = nil } - d.logger.Debug( + rawFrame, err := d.clockStore.GetStagedDataClockFrame( + d.filter, + frame.frameNumber, + frame.selector.FillBytes(make([]byte, 32)), + false, + ) + if err != nil { + if frameDone != nil { + close(frameDone) + } + panic(err) + } + + d.logger.Info( "processing frame", zap.Uint64("frame_number", rawFrame.FrameNumber), zap.String("output_tag", hex.EncodeToString(rawFrame.Output[:64])), @@ -602,145 +464,390 @@ func (d *DataTimeReel) processPending( zap.String("head_output_tag", hex.EncodeToString(d.head.Output[:64])), ) - distance, err := d.GetDistance(rawFrame) - if err != nil { - if !errors.Is(err, store.ErrNotFound) { + distance := frame.distance + if distance == nil { + var err error + distance, err = d.GetDistance(rawFrame) + if err != nil && !errors.Is(err, store.ErrNotFound) { + if frameDone != nil { + close(frameDone) + } panic(err) } + } + // Handle different scenarios based on frame number + if d.head.FrameNumber < rawFrame.FrameNumber { + d.logger.Info("frame is higher") + + // If there's a gap, try to fill it + if rawFrame.FrameNumber > d.head.FrameNumber+1 { + if d.tryFillGap(d.head.FrameNumber+1, rawFrame.FrameNumber-1) { + // Gap was filled, now process this frame + if err = d.validateAndSetHead(rawFrame, distance); err != nil { + if frameDone != nil { + close(frameDone) + } + continue + } + } else { + // Couldn't fill gap, just add to pending + d.processPending(d.head, frame) + continue + } + } else if rawFrame.FrameNumber == d.head.FrameNumber+1 { + // Direct next frame + if err = d.validateAndSetHead(rawFrame, distance); err != nil { + if frameDone != nil { + close(frameDone) + } + continue + } + } + + // Process any pending frames that might now be valid + d.processPending(d.head, frame) + + } else if d.head.FrameNumber == rawFrame.FrameNumber { + // Same height, check if better + if bytes.Equal(d.head.Output, rawFrame.Output) { + d.logger.Info("equivalent frame") + d.processPending(d.head, frame) + continue + } + + d.logger.Info( + "frame is same height", + zap.String("head_distance", d.headDistance.Text(16)), + zap.String("distance", distance.Text(16)), + ) + + // If competing frames share a parent, use shorter distance + if bytes.Equal(d.head.ParentSelector, rawFrame.ParentSelector) && + distance.Cmp(d.headDistance) < 0 { + d.logger.Info( + "frame shares parent, has shorter distance, short circuit", + ) + d.totalDistance.Sub(d.totalDistance, d.headDistance) + d.setHead(rawFrame, distance) + d.processPending(d.head, frame) + continue + } + + // Different parent, need fork choice + d.forkChoice(rawFrame, distance) + d.processPending(d.head, frame) + + } else { + // Frame is from the past + d.logger.Info("frame is lower height") + + existing, _, err := d.clockStore.GetDataClockFrame( + d.filter, + rawFrame.FrameNumber, + true, + ) + if err != nil { + if frameDone != nil { + close(frameDone) + } + continue + } + + // Only consider if different from existing + if !bytes.Equal(existing.Output, rawFrame.Output) { + // If same parent, compare distances + if bytes.Equal(existing.ParentSelector, rawFrame.ParentSelector) { + ld := d.getTotalDistance(existing) + rd := d.getTotalDistance(rawFrame) + if rd.Cmp(ld) < 0 { + // This frame offers a better path + d.forkChoice(rawFrame, distance) + d.processPending(d.head, frame) + } else { + d.processPending(d.head, frame) + } + } else { + // Different parents, evaluate based on total chain distance + d.evaluateCompetingChains(rawFrame, existing) + d.processPending(d.head, frame) + } + } + } + + if frameDone != nil { + close(frameDone) + } + } + } +} + +func (d *DataTimeReel) processPending( + frame *protobufs.ClockFrame, + lastReceived *pendingFrame, +) { + d.logger.Info( + "process pending", + zap.Uint64("head_frame", frame.FrameNumber), + zap.Uint64("last_received_frame", lastReceived.frameNumber), + zap.Int("pending_frame_numbers", len(d.pending)), + ) + + for frameNum := range d.pending { + if frameNum < d.head.FrameNumber { + delete(d.pending, frameNum) + } + } + + headSelector, err := d.head.GetSelector() + if err != nil { + panic(err) + } + + selectorHex := hex.EncodeToString(headSelector.Bytes()) + childFrames := d.childFrames[selectorHex] + + if len(childFrames) > 0 { + var bestChild *pendingFrame + bestDistance := unknownDistance + + for _, child := range childFrames { + if child.frameNumber != d.head.FrameNumber+1 { continue } - // Otherwise set it as the next and process all pending - err = d.setHead(rawFrame, distance) - if err != nil { - break + if bestChild == nil || child.distance.Cmp(bestDistance) < 0 { + bestChild = child + bestDistance = child.distance } - found = true - break } - if !found { - break - } - } -} + if bestChild != nil { + rawFrame, err := d.clockStore.GetStagedDataClockFrame( + d.filter, + bestChild.frameNumber, + bestChild.selector.FillBytes(make([]byte, 32)), + false, + ) -func (d *DataTimeReel) setHead(frame *protobufs.ClockFrame, distance *big.Int) error { - d.logger.Debug( - "set frame to head", - zap.Uint64("frame_number", frame.FrameNumber), - zap.String("output_tag", hex.EncodeToString(frame.Output[:64])), - zap.Uint64("head_number", d.head.FrameNumber), - zap.String("head_output_tag", hex.EncodeToString(d.head.Output[:64])), - ) - txn, err := d.clockStore.NewTransaction(false) - if err != nil { - panic(err) - } + if err == nil { + d.logger.Info("found direct child of head", + zap.Uint64("frame", rawFrame.FrameNumber), + zap.String("selector", bestChild.selector.Text(16))) - d.logger.Debug( - "save data", - zap.Uint64("frame_number", frame.FrameNumber), - zap.String("distance", distance.Text(16)), - ) - - selector, err := frame.GetSelector() - if err != nil { - panic(err) - } - - _, tries, err := d.clockStore.GetDataClockFrame( - d.filter, - frame.FrameNumber-1, - false, - ) - - if tries, err = d.exec(txn, frame, tries); err != nil { - d.logger.Error("invalid frame execution, unwinding", zap.Error(err)) - txn.Abort() - return errors.Wrap(err, "set head") - } - - if err := d.clockStore.CommitDataClockFrame( - d.filter, - frame.FrameNumber, - selector.FillBytes(make([]byte, 32)), - tries, - txn, - false, - ); err != nil { - panic(err) - } - - if err = txn.Commit(); err != nil { - panic(err) - } - - d.proverTries = tries - d.head = frame - - d.headDistance = distance - if d.alwaysSend { - select { - case <-d.ctx.Done(): - return d.ctx.Err() - case d.newFrameCh <- frame: + if err = d.setHead(rawFrame, bestChild.distance); err == nil { + d.processPendingChain() + } + } } } else { - select { - case <-d.ctx.Done(): - return d.ctx.Err() - case d.newFrameCh <- frame: - default: - } + d.processPendingChain() } - return nil } -func (d *DataTimeReel) getTotalDistance(frame *protobufs.ClockFrame) *big.Int { - selector, err := frame.GetSelector() - if err != nil { - panic(err) - } - - total, err := d.clockStore.GetTotalDistance( - d.filter, - frame.FrameNumber, - selector.FillBytes(make([]byte, 32)), +func (d *DataTimeReel) tryFillGap(startFrame, endFrame uint64) bool { + d.logger.Info( + "trying to fill gap", + zap.Uint64("from", startFrame), + zap.Uint64("to", endFrame), ) - if err == nil && total != nil { - return total - } - total, err = d.GetDistance(frame) + headSelector, err := d.head.GetSelector() if err != nil { panic(err) } - for index := frame; err == nil && - index.FrameNumber > 0; index, err = d.clockStore.GetStagedDataClockFrame( - d.filter, - index.FrameNumber-1, - index.ParentSelector, - true, - ) { - distance, err := d.GetDistance(index) + currentSelector := headSelector + + for frameNum := startFrame; frameNum <= endFrame; frameNum++ { + pendingFrames, ok := d.pending[frameNum] + if !ok || len(pendingFrames) == 0 { + d.logger.Info( + "gap cannot be filled, missing frame", + zap.Uint64("frame", frameNum), + ) + return false + } + + // Find best candidate (minimum distance) + var bestFrame *pendingFrame + bestDistance := unknownDistance + + for _, frame := range pendingFrames { + // First priority: parent matches current selector + if bytes.Equal(frame.parentSelector.Bytes(), currentSelector.Bytes()) { + if bestFrame == nil || frame.distance.Cmp(bestDistance) < 0 { + bestFrame = frame + bestDistance = frame.distance + } + } + } + + // If no direct match, consider any frame at this height + if bestFrame == nil { + for _, frame := range pendingFrames { + rawFrame, err := d.clockStore.GetStagedDataClockFrame( + d.filter, + frameNum, + frame.selector.FillBytes(make([]byte, 32)), + false, + ) + + if err != nil { + continue + } + + distance, err := d.GetDistance(rawFrame) + if err != nil { + continue + } + + if bestFrame == nil || distance.Cmp(bestDistance) < 0 { + bestFrame = frame + bestDistance = distance + } + } + } + + if bestFrame == nil { + d.logger.Info( + "gap cannot be filled, no valid candidate", + zap.Uint64("frame", frameNum), + ) + return false + } + + rawFrame, err := d.clockStore.GetStagedDataClockFrame( + d.filter, + frameNum, + bestFrame.selector.FillBytes(make([]byte, 32)), + false, + ) + + if err != nil { + d.logger.Info( + "gap cannot be filled, frame not found", + zap.Uint64("frame", frameNum), + ) + return false + } + + if err = d.validateAndSetHead(rawFrame, bestDistance); err != nil { + d.logger.Info("gap cannot be filled, validation failed", + zap.Uint64("frame", frameNum), + zap.Error(err)) + return false + } + + currentSelector, err = rawFrame.GetSelector() + if err != nil { + panic(err) + } + } + + d.logger.Info("successfully filled gap", + zap.Uint64("from", startFrame), + zap.Uint64("to", endFrame)) + return true +} + +func (d *DataTimeReel) validateAndSetHead( + frame *protobufs.ClockFrame, + distance *big.Int, +) error { + // Ensure the frame is valid + if frame.FrameNumber != d.head.FrameNumber+1 { + return errors.New("frame is not next in sequence") + } + + headSelector, err := d.head.GetSelector() + if err != nil { + return err + } + + if !bytes.Equal(frame.ParentSelector, headSelector.Bytes()) { + d.logger.Info("frame parent doesn't exactly match head selector", + zap.String("parent", hex.EncodeToString(frame.ParentSelector)), + zap.String("head_selector", headSelector.Text(16))) + } + + return d.setHead(frame, distance) +} + +// processPendingChain tries to construct the best chain from pending frames +func (d *DataTimeReel) processPendingChain() { + for { + next := d.head.FrameNumber + 1 + + // Get all frames for next height + rawFrames, err := d.clockStore.GetStagedDataClockFramesForFrameNumber( + d.filter, + next, + ) + if err != nil { + return + } + + if len(rawFrames) == 0 { + return + } + + headSelector, err := d.head.GetSelector() if err != nil { panic(err) } - total.Add(total, distance) + var bestFrame *protobufs.ClockFrame + bestDistance := unknownDistance + + for _, rawFrame := range rawFrames { + if bytes.Equal(rawFrame.ParentSelector, headSelector.Bytes()) { + distance, err := d.GetDistance(rawFrame) + if err != nil { + continue + } + + if bestFrame == nil || distance.Cmp(bestDistance) < 0 { + bestFrame = rawFrame + bestDistance = distance + } + } + } + + if bestFrame == nil { + for _, rawFrame := range rawFrames { + distance, err := d.GetDistance(rawFrame) + if err != nil { + continue + } + + if bestFrame == nil || distance.Cmp(bestDistance) < 0 { + bestFrame = rawFrame + bestDistance = distance + } + } + } + + if bestFrame == nil { + return + } + + if err = d.setHead(bestFrame, bestDistance); err != nil { + return + } } +} - d.clockStore.SetTotalDistance( - d.filter, - frame.FrameNumber, - selector.FillBytes(make([]byte, 32)), - total, - ) +func (d *DataTimeReel) evaluateCompetingChains( + newFrame, existingFrame *protobufs.ClockFrame, +) { + newTotal := d.getTotalDistance(newFrame) + existingTotal := d.getTotalDistance(existingFrame) - return total + if newTotal.Cmp(existingTotal) < 0 { + distance, _ := d.GetDistance(newFrame) + d.forkChoice(newFrame, distance) + } } func (d *DataTimeReel) GetDistance(frame *protobufs.ClockFrame) ( @@ -765,16 +872,13 @@ func (d *DataTimeReel) GetDistance(frame *protobufs.ClockFrame) ( return unknownDistance, errors.Wrap(err, "get distance") } - // discriminatorNode := - // d.proverTries[0].FindNearest(prevSelector.FillBytes(make([]byte, 32))) - // discriminator := discriminatorNode.Key addr, err := frame.GetAddress() if err != nil { return unknownDistance, errors.Wrap(err, "get distance") } + distance := new(big.Int).Sub( prevSelector, - // new(big.Int).SetBytes(discriminator), new(big.Int).SetBytes(addr), ) distance.Abs(distance) @@ -782,11 +886,105 @@ func (d *DataTimeReel) GetDistance(frame *protobufs.ClockFrame) ( return distance, nil } +func (d *DataTimeReel) setHead( + frame *protobufs.ClockFrame, + distance *big.Int, +) error { + d.logger.Info( + "set frame to head", + zap.Uint64("frame_number", frame.FrameNumber), + zap.String("output_tag", hex.EncodeToString(frame.Output[:64])), + zap.Uint64("head_number", d.head.FrameNumber), + zap.String("head_output_tag", hex.EncodeToString(d.head.Output[:64])), + ) + + txn, err := d.clockStore.NewTransaction(false) + if err != nil { + panic(err) + } + + d.logger.Info( + "save data", + zap.Uint64("frame_number", frame.FrameNumber), + zap.String("distance", distance.Text(16)), + ) + + selector, err := frame.GetSelector() + if err != nil { + panic(err) + } + + _, tries, err := d.clockStore.GetDataClockFrame( + d.filter, + frame.FrameNumber-1, + false, + ) + if err != nil { + d.logger.Error("could not get data clock frame", zap.Error(err)) + } + + if tries, err = d.exec(txn, frame, tries); err != nil { + d.logger.Error("invalid frame execution, unwinding", zap.Error(err)) + txn.Abort() + return errors.Wrap(err, "set head") + } + + if err := d.clockStore.CommitDataClockFrame( + d.filter, + frame.FrameNumber, + selector.FillBytes(make([]byte, 32)), + tries, + txn, + false, + ); err != nil { + panic(err) + } + + if err = txn.Commit(); err != nil { + panic(err) + } + + d.proverTries = tries + d.head = frame + d.headDistance = distance + + if d.alwaysSend { + select { + case <-d.ctx.Done(): + return d.ctx.Err() + case d.newFrameCh <- frame: + } + } else { + select { + case <-d.ctx.Done(): + return d.ctx.Err() + case d.newFrameCh <- frame: + default: + } + } + + headSelectorBytes := selector.FillBytes(make([]byte, 32)) + headSelectorHex := hex.EncodeToString(headSelectorBytes) + + if children, ok := d.pending[frame.FrameNumber+1]; ok { + for _, child := range children { + parentHex := hex.EncodeToString(child.parentSelector.Bytes()) + if parentHex == headSelectorHex { + d.logger.Info("found child of new head", + zap.Uint64("child_frame", child.frameNumber), + zap.String("child_selector", child.selector.Text(16))) + } + } + } + + return nil +} + func (d *DataTimeReel) forkChoice( frame *protobufs.ClockFrame, distance *big.Int, ) { - d.logger.Debug( + d.logger.Info( "fork choice", zap.Uint64("frame_number", frame.FrameNumber), zap.String("output_tag", hex.EncodeToString(frame.Output[:64])), @@ -808,6 +1006,7 @@ func (d *DataTimeReel) forkChoice( rightReplaySelectors := [][]byte{} + // If right chain is longer, walk back until same height for rightIndex.FrameNumber > leftIndex.FrameNumber { rightReplaySelectors = append( append( @@ -824,9 +1023,9 @@ func (d *DataTimeReel) forkChoice( true, ) if err != nil { - // If lineage cannot be verified, set it for later + // If lineage cannot be verified, we can't proceed if errors.Is(err, store.ErrNotFound) { - // d.addPending(selector, parentSelector, frame.FrameNumber) + d.logger.Info("cannot verify lineage, aborting fork choice") return } else { panic(err) @@ -849,7 +1048,7 @@ func (d *DataTimeReel) forkChoice( // Walk backwards through the parents, until we find a matching parent // selector: for !bytes.Equal(left, right) { - d.logger.Debug( + d.logger.Info( "scan backwards", zap.String("left_parent", hex.EncodeToString(leftIndex.ParentSelector)), zap.String("right_parent", hex.EncodeToString(rightIndex.ParentSelector)), @@ -862,6 +1061,7 @@ func (d *DataTimeReel) forkChoice( ), rightReplaySelectors..., ) + leftIndex, err = d.clockStore.GetStagedDataClockFrame( d.filter, leftIndex.FrameNumber-1, @@ -887,9 +1087,9 @@ func (d *DataTimeReel) forkChoice( true, ) if err != nil { - // If lineage cannot be verified, set it for later + // If lineage cannot be verified, abort if errors.Is(err, store.ErrNotFound) { - // d.addPending(selector, parentSelector, frame.FrameNumber) + d.logger.Info("cannot verify full lineage, aborting fork choice") return } else { panic(err) @@ -898,6 +1098,7 @@ func (d *DataTimeReel) forkChoice( left = leftIndex.ParentSelector right = rightIndex.ParentSelector + leftIndexDistance, err := d.GetDistance(leftIndex) if err != nil { panic(err) @@ -911,29 +1112,33 @@ func (d *DataTimeReel) forkChoice( leftTotal.Add(leftTotal, leftIndexDistance) rightTotal.Add(rightTotal, rightIndexDistance) } - d.logger.Debug("found mutual root") + + d.logger.Info("found mutual root") frameNumber := rightIndex.FrameNumber - overweight.Add(overweight, leftTotal) // Choose new fork based on lightest distance sub-tree if rightTotal.Cmp(overweight) > 0 { - d.logger.Debug("proposed fork has greater distance", + d.logger.Info("proposed fork has greater distance, keeping current chain", zap.String("right_total", rightTotal.Text(16)), zap.String("left_total", overweight.Text(16)), ) - // d.addPending(selector, parentSelector, frame.FrameNumber) return } + d.logger.Info("switching to new fork - better distance", + zap.String("right_total", rightTotal.Text(16)), + zap.String("current_total", overweight.Text(16)), + ) + + // Apply the right chain frames for { if len(rightReplaySelectors) == 0 { break } next := rightReplaySelectors[0] - rightReplaySelectors = - rightReplaySelectors[1:] + rightReplaySelectors = rightReplaySelectors[1:] txn, err := d.clockStore.NewTransaction(false) if err != nil { @@ -982,8 +1187,9 @@ func (d *DataTimeReel) forkChoice( d.totalDistance.Sub(d.totalDistance, leftTotal) d.totalDistance.Add(d.totalDistance, rightTotal) d.headDistance = distance - d.logger.Debug( - "set total distance", + + d.logger.Info( + "set total distance after fork choice", zap.String("total_distance", d.totalDistance.Text(16)), ) @@ -1001,8 +1207,82 @@ func (d *DataTimeReel) forkChoice( } } +func (d *DataTimeReel) getTotalDistance(frame *protobufs.ClockFrame) *big.Int { + selector, err := frame.GetSelector() + if err != nil { + panic(err) + } + + existingTotal, err := d.clockStore.GetTotalDistance( + d.filter, + frame.FrameNumber, + selector.FillBytes(make([]byte, 32)), + ) + if err == nil && existingTotal != nil { + return existingTotal + } + + total, err := d.GetDistance(frame) + if err != nil { + return total + } + + for index := frame; err == nil && + index.FrameNumber > 0; index, err = d.clockStore.GetStagedDataClockFrame( + d.filter, + index.FrameNumber-1, + index.ParentSelector, + true, + ) { + distance, err := d.GetDistance(index) + if err != nil { + return total + } + + total.Add(total, distance) + } + + d.clockStore.SetTotalDistance( + d.filter, + frame.FrameNumber, + selector.FillBytes(make([]byte, 32)), + total, + ) + + return total +} + func (d *DataTimeReel) GetTotalDistance() *big.Int { return new(big.Int).Set(d.totalDistance) } +func ( + d *DataTimeReel, +) GetFrameProverTries() []*tries.RollingFrecencyCritbitTrie { + return d.proverTries +} + +func (d *DataTimeReel) NewFrameCh() <-chan *protobufs.ClockFrame { + return d.newFrameCh +} + +func (d *DataTimeReel) BadFrameCh() <-chan *protobufs.ClockFrame { + return d.badFrameCh +} + +func (d *DataTimeReel) Head() (*protobufs.ClockFrame, error) { + return d.head, nil +} + +func (d *DataTimeReel) Stop() { + d.cancel() + d.wg.Wait() +} + +var alreadyDone chan struct{} = func() chan struct{} { + done := make(chan struct{}) + close(done) + return done +}() + var _ TimeReel = (*DataTimeReel)(nil) diff --git a/node/rpc/hypergraph_sync_rpc_server_test.go b/node/rpc/hypergraph_sync_rpc_server_test.go index 1c37418..9f3aefa 100644 --- a/node/rpc/hypergraph_sync_rpc_server_test.go +++ b/node/rpc/hypergraph_sync_rpc_server_test.go @@ -66,6 +66,7 @@ func TestLoadHypergraphFallback(t *testing.T) { assert.Equal(t, len(crypto.ConvertAllPreloadedLeaves(string(application.VertexAtomType), string(application.AddsPhaseType), k, serverHypergraphStore, a.GetTree().Root, []int{})), 100000) } for k, a := range clientLoad.GetVertexAdds() { + fmt.Printf("%x\n", a.GetTree().Commit(true)) assert.Equal(t, len(crypto.ConvertAllPreloadedLeaves(string(application.VertexAtomType), string(application.AddsPhaseType), k, clientHypergraphStore, a.GetTree().Root, []int{})), 100000) } diff --git a/sidecar/main.go b/sidecar/main.go index cc6b652..77240c7 100644 --- a/sidecar/main.go +++ b/sidecar/main.go @@ -59,7 +59,6 @@ var frameProver qcrypto.FrameProver var inclusionProver qcrypto.InclusionProver var keyManager keys.KeyManager var lastProven uint64 -var proverTrie *tries.RollingFrecencyCritbitTrie func main() { flag.Parse() @@ -71,7 +70,7 @@ func main() { done := make(chan os.Signal, 1) signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) recentlyProcessedFrames, _ = lru.New[string, struct{}](25) - logger, _ = zap.NewDevelopment() + logger, _ = zap.NewProduction() var privKey crypto.PrivKey keyManager, _,