ceremonyclient/node/consensus/time/data_time_reel.go
2025-01-09 02:32:49 -06:00

1028 lines
23 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package time
import (
"bytes"
"context"
"encoding/hex"
"math/big"
"os"
"sync"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/pkg/errors"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/node/config"
"source.quilibrium.com/quilibrium/monorepo/node/crypto"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
"source.quilibrium.com/quilibrium/monorepo/node/store"
"source.quilibrium.com/quilibrium/monorepo/node/tries"
)
var unknownDistance = new(big.Int).SetBytes([]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
})
type pendingFrame struct {
selector *big.Int
parentSelector *big.Int
frameNumber uint64
done chan struct{}
}
type DataTimeReel struct {
rwMutex sync.RWMutex
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
filter []byte
engineConfig *config.EngineConfig
logger *zap.Logger
clockStore store.ClockStore
frameProver crypto.FrameProver
exec func(
txn store.Transaction,
frame *protobufs.ClockFrame,
triesAtFrame []*tries.RollingFrecencyCritbitTrie,
) (
[]*tries.RollingFrecencyCritbitTrie,
error,
)
origin []byte
initialInclusionProof *crypto.InclusionAggregateProof
initialProverKeys [][]byte
head *protobufs.ClockFrame
totalDistance *big.Int
headDistance *big.Int
lruFrames *lru.Cache[string, string]
proverTries []*tries.RollingFrecencyCritbitTrie
// pending map[uint64][]*pendingFrame
incompleteForks map[uint64][]*pendingFrame
frames chan *pendingFrame
newFrameCh chan *protobufs.ClockFrame
badFrameCh chan *protobufs.ClockFrame
alwaysSend bool
restore func() []*tries.RollingFrecencyCritbitTrie
}
func NewDataTimeReel(
filter []byte,
logger *zap.Logger,
clockStore store.ClockStore,
engineConfig *config.EngineConfig,
frameProver crypto.FrameProver,
exec func(
txn store.Transaction,
frame *protobufs.ClockFrame,
triesAtFrame []*tries.RollingFrecencyCritbitTrie,
) (
[]*tries.RollingFrecencyCritbitTrie,
error,
),
origin []byte,
initialInclusionProof *crypto.InclusionAggregateProof,
initialProverKeys [][]byte,
alwaysSend bool,
restore func() []*tries.RollingFrecencyCritbitTrie,
) *DataTimeReel {
if filter == nil {
panic("filter is nil")
}
if logger == nil {
panic("logger is nil")
}
if clockStore == nil {
panic("clock store is nil")
}
if engineConfig == nil {
panic("engine config is nil")
}
if exec == nil {
panic("execution function is nil")
}
if frameProver == nil {
panic("frame prover is nil")
}
cache, err := lru.New[string, string](10000)
if err != nil {
panic(err)
}
ctx, cancel := context.WithCancel(context.Background())
return &DataTimeReel{
ctx: ctx,
cancel: cancel,
logger: logger,
filter: filter,
engineConfig: engineConfig,
clockStore: clockStore,
frameProver: frameProver,
exec: exec,
origin: origin,
initialInclusionProof: initialInclusionProof,
initialProverKeys: initialProverKeys,
lruFrames: cache,
// pending: make(map[uint64][]*pendingFrame),
incompleteForks: make(map[uint64][]*pendingFrame),
frames: make(chan *pendingFrame, 65536),
newFrameCh: make(chan *protobufs.ClockFrame),
badFrameCh: make(chan *protobufs.ClockFrame),
alwaysSend: alwaysSend,
restore: restore,
}
}
func (d *DataTimeReel) Start() error {
frame, tries, err := d.clockStore.GetLatestDataClockFrame(d.filter)
if err != nil && !errors.Is(err, store.ErrNotFound) {
panic(err)
}
if frame == nil {
d.head, d.proverTries = d.createGenesisFrame()
d.totalDistance = big.NewInt(0)
d.headDistance = big.NewInt(0)
} else {
if len(tries[0].FindNearestAndApproximateNeighbors(make([]byte, 32))) == 0 {
if frame.FrameNumber > 53027 {
d.logger.Info("DANGER")
d.logger.Info("DANGER")
d.logger.Info("DANGER")
d.logger.Info("It appears your node is running with a broken store. Please restore from backup or create a new store.")
d.logger.Info("DANGER")
d.logger.Info("DANGER")
d.logger.Info("DANGER")
os.Exit(1)
}
d.logger.Info("encountered trie corruption, invoking restoration")
tries = d.restore()
}
d.head = frame
if err != nil {
panic(err)
}
d.totalDistance = big.NewInt(0)
d.proverTries = tries
d.headDistance, err = d.GetDistance(frame)
}
d.wg.Add(1)
go d.runLoop()
return nil
}
func (d *DataTimeReel) SetHead(frame *protobufs.ClockFrame) {
d.head = frame
}
func (d *DataTimeReel) Head() (*protobufs.ClockFrame, error) {
return d.head, nil
}
var alreadyDone chan struct{} = func() chan struct{} {
done := make(chan struct{})
close(done)
return done
}()
// Insert enqueues a structurally valid frame into the time reel. If the frame
// is the next one in sequence, it advances the reel head forward and emits a
// new frame on the new frame channel.
func (d *DataTimeReel) Insert(ctx context.Context, frame *protobufs.ClockFrame) (<-chan struct{}, error) {
if err := d.ctx.Err(); err != nil {
return nil, err
}
d.logger.Debug(
"insert frame",
zap.Uint64("frame_number", frame.FrameNumber),
zap.String("output_tag", hex.EncodeToString(frame.Output[:64])),
)
// if d.lruFrames.Contains(string(frame.Output[:64])) {
// return nil
// }
// d.lruFrames.Add(string(frame.Output[:64]), string(frame.ParentSelector))
parent := new(big.Int).SetBytes(frame.ParentSelector)
selector, err := frame.GetSelector()
if err != nil {
panic(err)
}
distance, _ := d.GetDistance(frame)
if d.head.FrameNumber < frame.FrameNumber {
d.storePending(selector, parent, distance, frame)
if d.head.FrameNumber+1 == frame.FrameNumber {
done := make(chan struct{})
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-d.ctx.Done():
return nil, d.ctx.Err()
case d.frames <- &pendingFrame{
selector: selector,
parentSelector: parent,
frameNumber: frame.FrameNumber,
done: done,
}:
return done, nil
}
}
}
return alreadyDone, nil
}
func (
d *DataTimeReel,
) GetFrameProverTries() []*tries.RollingFrecencyCritbitTrie {
return d.proverTries
}
func (d *DataTimeReel) NewFrameCh() <-chan *protobufs.ClockFrame {
return d.newFrameCh
}
func (d *DataTimeReel) BadFrameCh() <-chan *protobufs.ClockFrame {
return d.badFrameCh
}
func (d *DataTimeReel) Stop() {
d.cancel()
d.wg.Wait()
}
func (d *DataTimeReel) createGenesisFrame() (
*protobufs.ClockFrame,
[]*tries.RollingFrecencyCritbitTrie,
) {
if d.origin == nil {
panic("origin is nil")
}
if d.initialInclusionProof == nil {
panic("initial inclusion proof is nil")
}
if d.initialProverKeys == nil {
panic("initial prover keys is nil")
}
difficulty := d.engineConfig.Difficulty
if difficulty == 0 || difficulty == 10000 {
difficulty = 200000
}
frame, tries, err := d.frameProver.CreateDataGenesisFrame(
d.filter,
d.origin,
difficulty,
d.initialInclusionProof,
d.initialProverKeys,
)
if err != nil {
panic(err)
}
selector, err := frame.GetSelector()
if err != nil {
panic(err)
}
txn, err := d.clockStore.NewTransaction(false)
if err != nil {
panic(err)
}
err = d.clockStore.StageDataClockFrame(
selector.FillBytes(make([]byte, 32)),
frame,
txn,
)
if err != nil {
txn.Abort()
panic(err)
}
err = txn.Commit()
if err != nil {
txn.Abort()
panic(err)
}
txn, err = d.clockStore.NewTransaction(false)
if err != nil {
panic(err)
}
if err := d.clockStore.CommitDataClockFrame(
d.filter,
0,
selector.FillBytes(make([]byte, 32)),
tries,
txn,
false,
); err != nil {
panic(err)
}
if err := txn.Commit(); err != nil {
panic(err)
}
return frame, tries
}
// Main data consensus loop
func (d *DataTimeReel) runLoop() {
defer d.wg.Done()
for {
select {
case <-d.ctx.Done():
return
case frame := <-d.frames:
rawFrame, err := d.clockStore.GetStagedDataClockFrame(
d.filter,
frame.frameNumber,
frame.selector.FillBytes(make([]byte, 32)),
false,
)
if err != nil {
panic(err)
}
d.logger.Debug(
"processing frame",
zap.Uint64("frame_number", rawFrame.FrameNumber),
zap.String("output_tag", hex.EncodeToString(rawFrame.Output[:64])),
zap.Uint64("head_number", d.head.FrameNumber),
zap.String("head_output_tag", hex.EncodeToString(d.head.Output[:64])),
)
// Most common scenario: in order new frame is higher number
if d.head.FrameNumber < rawFrame.FrameNumber {
d.logger.Debug("frame is higher")
// parent := new(big.Int).SetBytes(rawFrame.ParentSelector)
// selector, err := rawFrame.GetSelector()
// if err != nil {
// panic(err)
// }
distance, err := d.GetDistance(rawFrame)
if err != nil {
if !errors.Is(err, store.ErrNotFound) {
panic(err)
}
// d.addPending(selector, parent, frame.frameNumber)
d.processPending(d.head, frame)
continue
}
// If the frame has a gap from the head or is not descendent, mark it as
// pending:
if rawFrame.FrameNumber-d.head.FrameNumber != 1 {
d.processPending(d.head, frame)
continue
}
// Otherwise set it as the next and process all pending
if err = d.setHead(rawFrame, distance); err != nil {
close(frame.done)
continue
}
d.processPending(d.head, frame)
} else if d.head.FrameNumber == rawFrame.FrameNumber {
// frames are equivalent, no need to act
if bytes.Equal(d.head.Output, rawFrame.Output) {
d.logger.Debug("equivalent frame")
d.processPending(d.head, frame)
continue
}
// temp: remove fork choice until prover ring testing
// distance, err := d.GetDistance(rawFrame)
// if err != nil {
// panic(err)
// }
// d.logger.Debug(
// "frame is same height",
// zap.String("head_distance", d.headDistance.Text(16)),
// zap.String("distance", distance.Text(16)),
// )
// // Optimization: if competing frames share a parent we can short-circuit
// // fork choice
// if bytes.Equal(d.head.ParentSelector, rawFrame.ParentSelector) &&
// distance.Cmp(d.headDistance) < 0 {
// d.logger.Debug(
// "frame shares parent, has shorter distance, short circuit",
// )
// d.totalDistance.Sub(d.totalDistance, d.headDistance)
// d.setHead(rawFrame, distance)
// d.processPending(d.head, frame)
// continue
// }
// Choose fork
// d.forkChoice(rawFrame, distance)
d.processPending(d.head, frame)
} else {
// d.logger.Debug("frame is lower height")
// existing, _, err := d.clockStore.GetDataClockFrame(
// d.filter,
// rawFrame.FrameNumber,
// true,
// )
// if err != nil {
// // if this returns an error it's either not found (which shouldn't
// // happen without corruption) or pebble is borked, either way, panic
// panic(err)
// }
// if !bytes.Equal(existing.Output, rawFrame.Output) {
// parent, selector, err := rawFrame.GetParentAndSelector()
// if err != nil {
// panic(err)
// }
// if bytes.Equal(existing.ParentSelector, rawFrame.ParentSelector) {
// ld := d.getTotalDistance(existing)
// rd := d.getTotalDistance(rawFrame)
// if rd.Cmp(ld) < 0 {
// d.forkChoice(rawFrame, rd)
// d.processPending(d.head, frame)
// } else {
// d.addPending(selector, parent, frame.frameNumber)
// d.processPending(d.head, frame)
// }
// } else {
// d.addPending(selector, parent, frame.frameNumber)
// d.processPending(d.head, frame)
// }
// }
}
}
}
}
// func (d *DataTimeReel) addPending(
// selector *big.Int,
// parent *big.Int,
// frameNumber uint64,
// ) {
// // d.logger.Debug(
// // "add pending",
// // zap.Uint64("head_frame_number", d.head.FrameNumber),
// // zap.Uint64("add_frame_number", frameNumber),
// // zap.String("selector", selector.Text(16)),
// // zap.String("parent", parent.Text(16)),
// // )
// if d.head.FrameNumber <= frameNumber {
// if _, ok := d.pending[frameNumber]; !ok {
// d.pending[frameNumber] = []*pendingFrame{}
// }
// // avoid heavy thrashing
// for _, frame := range d.pending[frameNumber] {
// if frame.selector.Cmp(selector) == 0 {
// d.logger.Debug("exists in pending already")
// return
// }
// }
// }
// if d.head.FrameNumber <= frameNumber {
// // d.logger.Debug(
// // "accumulate in pending",
// // zap.Int("pending_neighbors", len(d.pending[frameNumber])),
// // )
// d.pending[frameNumber] = append(
// d.pending[frameNumber],
// &pendingFrame{
// selector: selector,
// parentSelector: parent,
// frameNumber: frameNumber,
// },
// )
// }
// }
func (d *DataTimeReel) storePending(
selector *big.Int,
parent *big.Int,
distance *big.Int,
frame *protobufs.ClockFrame,
) {
// avoid db thrashing
if existing, err := d.clockStore.GetStagedDataClockFrame(
frame.Filter,
frame.FrameNumber,
selector.FillBytes(make([]byte, 32)),
true,
); err != nil && existing == nil {
d.logger.Debug(
"not stored yet, save data candidate",
zap.Uint64("frame_number", frame.FrameNumber),
zap.String("selector", selector.Text(16)),
zap.String("parent", parent.Text(16)),
zap.String("distance", distance.Text(16)),
)
txn, err := d.clockStore.NewTransaction(false)
if err != nil {
panic(err)
}
err = d.clockStore.StageDataClockFrame(
selector.FillBytes(make([]byte, 32)),
frame,
txn,
)
if err != nil {
txn.Abort()
panic(err)
}
if err = txn.Commit(); err != nil {
panic(err)
}
}
}
func (d *DataTimeReel) processPending(
frame *protobufs.ClockFrame,
lastReceived *pendingFrame,
) {
defer close(lastReceived.done)
// d.logger.Debug(
// "process pending",
// zap.Uint64("head_frame", frame.FrameNumber),
// zap.Uint64("last_received_frame", lastReceived.frameNumber),
// zap.Int("pending_frame_numbers", len(d.pending)),
// )
for {
select {
case <-d.ctx.Done():
return
default:
}
next := d.head.FrameNumber + 1
sel, err := d.head.GetSelector()
if err != nil {
panic(err)
}
selector := sel.FillBytes(make([]byte, 32))
// d.logger.Debug(
// "checking frame set",
// zap.Uint64("pending_frame_number", f),
// zap.Uint64("frame_number", frame.FrameNumber),
// )
// Pull the next
d.logger.Debug("try process next")
//// todo: revise for prover rings
rawFrames, err := d.clockStore.GetStagedDataClockFramesForFrameNumber(
d.filter,
next,
)
if err != nil {
panic(err)
}
found := false
for _, rawFrame := range rawFrames {
if !bytes.Equal(rawFrame.ParentSelector, selector) {
continue
}
d.logger.Debug(
"processing frame",
zap.Uint64("frame_number", rawFrame.FrameNumber),
zap.String("output_tag", hex.EncodeToString(rawFrame.Output[:64])),
zap.Uint64("head_number", d.head.FrameNumber),
zap.String("head_output_tag", hex.EncodeToString(d.head.Output[:64])),
)
distance, err := d.GetDistance(rawFrame)
if err != nil {
if !errors.Is(err, store.ErrNotFound) {
panic(err)
}
continue
}
// Otherwise set it as the next and process all pending
err = d.setHead(rawFrame, distance)
if err != nil {
break
}
found = true
break
}
if !found {
break
}
}
}
func (d *DataTimeReel) setHead(frame *protobufs.ClockFrame, distance *big.Int) error {
d.logger.Debug(
"set frame to head",
zap.Uint64("frame_number", frame.FrameNumber),
zap.String("output_tag", hex.EncodeToString(frame.Output[:64])),
zap.Uint64("head_number", d.head.FrameNumber),
zap.String("head_output_tag", hex.EncodeToString(d.head.Output[:64])),
)
txn, err := d.clockStore.NewTransaction(false)
if err != nil {
panic(err)
}
d.logger.Debug(
"save data",
zap.Uint64("frame_number", frame.FrameNumber),
zap.String("distance", distance.Text(16)),
)
selector, err := frame.GetSelector()
if err != nil {
panic(err)
}
_, tries, err := d.clockStore.GetDataClockFrame(
d.filter,
frame.FrameNumber-1,
false,
)
if tries, err = d.exec(txn, frame, tries); err != nil {
d.logger.Error("invalid frame execution, unwinding", zap.Error(err))
txn.Abort()
return errors.Wrap(err, "set head")
}
if err := d.clockStore.CommitDataClockFrame(
d.filter,
frame.FrameNumber,
selector.FillBytes(make([]byte, 32)),
tries,
txn,
false,
); err != nil {
panic(err)
}
if err = txn.Commit(); err != nil {
panic(err)
}
d.proverTries = tries
d.head = frame
d.headDistance = distance
if d.alwaysSend {
select {
case <-d.ctx.Done():
return d.ctx.Err()
case d.newFrameCh <- frame:
}
} else {
select {
case <-d.ctx.Done():
return d.ctx.Err()
case d.newFrameCh <- frame:
default:
}
}
return nil
}
func (d *DataTimeReel) getTotalDistance(frame *protobufs.ClockFrame) *big.Int {
selector, err := frame.GetSelector()
if err != nil {
panic(err)
}
total, err := d.clockStore.GetTotalDistance(
d.filter,
frame.FrameNumber,
selector.FillBytes(make([]byte, 32)),
)
if err == nil && total != nil {
return total
}
total, err = d.GetDistance(frame)
if err != nil {
panic(err)
}
for index := frame; err == nil &&
index.FrameNumber > 0; index, err = d.clockStore.GetStagedDataClockFrame(
d.filter,
index.FrameNumber-1,
index.ParentSelector,
true,
) {
distance, err := d.GetDistance(index)
if err != nil {
panic(err)
}
total.Add(total, distance)
}
d.clockStore.SetTotalDistance(
d.filter,
frame.FrameNumber,
selector.FillBytes(make([]byte, 32)),
total,
)
return total
}
func (d *DataTimeReel) GetDistance(frame *protobufs.ClockFrame) (
*big.Int,
error,
) {
if frame.FrameNumber == 0 {
return big.NewInt(0), nil
}
prev, _, err := d.clockStore.GetDataClockFrame(
d.filter,
frame.FrameNumber-1,
false,
)
if err != nil {
return unknownDistance, errors.Wrap(err, "get distance")
}
prevSelector, err := prev.GetSelector()
if err != nil {
return unknownDistance, errors.Wrap(err, "get distance")
}
discriminatorNode :=
d.proverTries[0].FindNearest(prevSelector.FillBytes(make([]byte, 32)))
discriminator := discriminatorNode.Key
addr, err := frame.GetAddress()
if err != nil {
return unknownDistance, errors.Wrap(err, "get distance")
}
distance := new(big.Int).Sub(
new(big.Int).SetBytes(discriminator),
new(big.Int).SetBytes(addr),
)
distance.Abs(distance)
return distance, nil
}
func (d *DataTimeReel) forkChoice(
frame *protobufs.ClockFrame,
distance *big.Int,
) {
d.logger.Debug(
"fork choice",
zap.Uint64("frame_number", frame.FrameNumber),
zap.String("output_tag", hex.EncodeToString(frame.Output[:64])),
zap.Uint64("head_number", d.head.FrameNumber),
zap.String("head_output_tag", hex.EncodeToString(d.head.Output[:64])),
)
_, selector, err := frame.GetParentAndSelector()
if err != nil {
panic(err)
}
leftIndex := d.head
rightIndex := frame
leftTotal := new(big.Int).Set(d.headDistance)
overweight := big.NewInt(0)
rightTotal := new(big.Int).Set(distance)
left := d.head.ParentSelector
right := frame.ParentSelector
rightReplaySelectors := [][]byte{}
for rightIndex.FrameNumber > leftIndex.FrameNumber {
rightReplaySelectors = append(
append(
[][]byte{},
right,
),
rightReplaySelectors...,
)
rightIndex, err = d.clockStore.GetStagedDataClockFrame(
d.filter,
rightIndex.FrameNumber-1,
rightIndex.ParentSelector,
true,
)
if err != nil {
// If lineage cannot be verified, set it for later
if errors.Is(err, store.ErrNotFound) {
// d.addPending(selector, parentSelector, frame.FrameNumber)
return
} else {
panic(err)
}
}
right = rightIndex.ParentSelector
rightIndexDistance, err := d.GetDistance(rightIndex)
if err != nil {
panic(err)
}
// We accumulate right on left when right is longer because we cannot know
// where the left will lead and don't want it to disadvantage our comparison
overweight.Add(overweight, rightIndexDistance)
rightTotal.Add(rightTotal, rightIndexDistance)
}
// Walk backwards through the parents, until we find a matching parent
// selector:
for !bytes.Equal(left, right) {
d.logger.Debug(
"scan backwards",
zap.String("left_parent", hex.EncodeToString(leftIndex.ParentSelector)),
zap.String("right_parent", hex.EncodeToString(rightIndex.ParentSelector)),
)
rightReplaySelectors = append(
append(
[][]byte{},
right,
),
rightReplaySelectors...,
)
leftIndex, err = d.clockStore.GetStagedDataClockFrame(
d.filter,
leftIndex.FrameNumber-1,
leftIndex.ParentSelector,
true,
)
if err != nil {
d.logger.Error(
"store corruption: a discontinuity has been found in your time reel",
zap.String(
"selector",
hex.EncodeToString(leftIndex.ParentSelector),
),
zap.Uint64("frame_number", leftIndex.FrameNumber-1),
)
panic(err)
}
rightIndex, err = d.clockStore.GetStagedDataClockFrame(
d.filter,
rightIndex.FrameNumber-1,
rightIndex.ParentSelector,
true,
)
if err != nil {
// If lineage cannot be verified, set it for later
if errors.Is(err, store.ErrNotFound) {
// d.addPending(selector, parentSelector, frame.FrameNumber)
return
} else {
panic(err)
}
}
left = leftIndex.ParentSelector
right = rightIndex.ParentSelector
leftIndexDistance, err := d.GetDistance(leftIndex)
if err != nil {
panic(err)
}
rightIndexDistance, err := d.GetDistance(rightIndex)
if err != nil {
panic(err)
}
leftTotal.Add(leftTotal, leftIndexDistance)
rightTotal.Add(rightTotal, rightIndexDistance)
}
d.logger.Debug("found mutual root")
frameNumber := rightIndex.FrameNumber
overweight.Add(overweight, leftTotal)
// Choose new fork based on lightest distance sub-tree
if rightTotal.Cmp(overweight) > 0 {
d.logger.Debug("proposed fork has greater distance",
zap.String("right_total", rightTotal.Text(16)),
zap.String("left_total", overweight.Text(16)),
)
// d.addPending(selector, parentSelector, frame.FrameNumber)
return
}
for {
if len(rightReplaySelectors) == 0 {
break
}
next := rightReplaySelectors[0]
rightReplaySelectors =
rightReplaySelectors[1:]
txn, err := d.clockStore.NewTransaction(false)
if err != nil {
panic(err)
}
if err := d.clockStore.CommitDataClockFrame(
d.filter,
frameNumber,
next,
d.proverTries,
txn,
rightIndex.FrameNumber < d.head.FrameNumber,
); err != nil {
panic(err)
}
if err = txn.Commit(); err != nil {
panic(err)
}
frameNumber++
}
txn, err := d.clockStore.NewTransaction(false)
if err != nil {
panic(err)
}
if err := d.clockStore.CommitDataClockFrame(
d.filter,
frame.FrameNumber,
selector.FillBytes(make([]byte, 32)),
d.proverTries,
txn,
false,
); err != nil {
panic(err)
}
if err = txn.Commit(); err != nil {
panic(err)
}
d.head = frame
d.totalDistance.Sub(d.totalDistance, leftTotal)
d.totalDistance.Add(d.totalDistance, rightTotal)
d.headDistance = distance
d.logger.Debug(
"set total distance",
zap.String("total_distance", d.totalDistance.Text(16)),
)
d.clockStore.SetTotalDistance(
d.filter,
frame.FrameNumber,
selector.FillBytes(make([]byte, 32)),
d.totalDistance,
)
select {
case <-d.ctx.Done():
case d.newFrameCh <- frame:
default:
}
}
func (d *DataTimeReel) GetTotalDistance() *big.Int {
return new(big.Int).Set(d.totalDistance)
}
var _ TimeReel = (*DataTimeReel)(nil)