temporarily bulk process on frame candidates

This commit is contained in:
Cassandra Heart 2024-10-25 04:45:46 -05:00
parent eccf5ebf59
commit 7ad50ec33b
No known key found for this signature in database
GPG Key ID: 6352152859385958
4 changed files with 145 additions and 127 deletions

View File

@ -57,14 +57,14 @@ func (e *DataClockConsensusEngine) runLoop() {
select {
case dataFrame := <-dataFrameCh:
e.logger.Info(
"current frame head",
zap.Uint64("frame_number", dataFrame.FrameNumber),
)
if latestFrame != nil &&
dataFrame.FrameNumber > latestFrame.FrameNumber {
latestFrame = dataFrame
}
e.logger.Info(
"current frame head",
zap.Uint64("frame_number", latestFrame.FrameNumber),
)
if e.latestFrameReceived < latestFrame.FrameNumber {
e.latestFrameReceived = latestFrame.FrameNumber
@ -167,6 +167,11 @@ func (e *DataClockConsensusEngine) runLoop() {
panic(err)
}
e.logger.Info(
"current frame head",
zap.Uint64("frame_number", dataFrame.FrameNumber),
)
if latestFrame == nil ||
latestFrame.FrameNumber < dataFrame.FrameNumber {
latestFrame, err = e.dataTimeReel.Head()

View File

@ -181,6 +181,8 @@ func (e *DataClockConsensusEngine) handleRebroadcast(
continue
}
e.logger.Info("receiving synchronization data")
if err := e.handleClockFrame(peerID, address, frame); err != nil {
return errors.Wrap(err, "handle rebroadcast")
}

View File

@ -3,9 +3,7 @@ package time
import (
"bytes"
"encoding/hex"
"fmt"
"math/big"
"sort"
"sync"
lru "github.com/hashicorp/golang-lru/v2"
@ -50,12 +48,12 @@ type DataTimeReel struct {
headDistance *big.Int
lruFrames *lru.Cache[string, string]
proverTries []*tries.RollingFrecencyCritbitTrie
pending map[uint64][]*pendingFrame
incompleteForks map[uint64][]*pendingFrame
frames chan *pendingFrame
newFrameCh chan *protobufs.ClockFrame
badFrameCh chan *protobufs.ClockFrame
done chan bool
// pending map[uint64][]*pendingFrame
incompleteForks map[uint64][]*pendingFrame
frames chan *pendingFrame
newFrameCh chan *protobufs.ClockFrame
badFrameCh chan *protobufs.ClockFrame
done chan bool
}
func NewDataTimeReel(
@ -110,12 +108,12 @@ func NewDataTimeReel(
initialInclusionProof: initialInclusionProof,
initialProverKeys: initialProverKeys,
lruFrames: cache,
pending: make(map[uint64][]*pendingFrame),
incompleteForks: make(map[uint64][]*pendingFrame),
frames: make(chan *pendingFrame),
newFrameCh: make(chan *protobufs.ClockFrame),
badFrameCh: make(chan *protobufs.ClockFrame),
done: make(chan bool),
// pending: make(map[uint64][]*pendingFrame),
incompleteForks: make(map[uint64][]*pendingFrame),
frames: make(chan *pendingFrame),
newFrameCh: make(chan *protobufs.ClockFrame),
badFrameCh: make(chan *protobufs.ClockFrame),
done: make(chan bool),
}
}
@ -313,11 +311,11 @@ func (d *DataTimeReel) runLoop() {
if d.head.FrameNumber < rawFrame.FrameNumber {
d.logger.Debug("frame is higher")
parent := new(big.Int).SetBytes(rawFrame.ParentSelector)
selector, err := rawFrame.GetSelector()
if err != nil {
panic(err)
}
// parent := new(big.Int).SetBytes(rawFrame.ParentSelector)
// selector, err := rawFrame.GetSelector()
// if err != nil {
// panic(err)
// }
distance, err := d.GetDistance(rawFrame)
if err != nil {
@ -325,7 +323,7 @@ func (d *DataTimeReel) runLoop() {
panic(err)
}
d.addPending(selector, parent, frame.frameNumber)
// d.addPending(selector, parent, frame.frameNumber)
d.processPending(d.head, frame)
continue
}
@ -417,49 +415,49 @@ func (d *DataTimeReel) runLoop() {
}
}
func (d *DataTimeReel) addPending(
selector *big.Int,
parent *big.Int,
frameNumber uint64,
) {
// d.logger.Debug(
// "add pending",
// zap.Uint64("head_frame_number", d.head.FrameNumber),
// zap.Uint64("add_frame_number", frameNumber),
// zap.String("selector", selector.Text(16)),
// zap.String("parent", parent.Text(16)),
// )
// func (d *DataTimeReel) addPending(
// selector *big.Int,
// parent *big.Int,
// frameNumber uint64,
// ) {
// // d.logger.Debug(
// // "add pending",
// // zap.Uint64("head_frame_number", d.head.FrameNumber),
// // zap.Uint64("add_frame_number", frameNumber),
// // zap.String("selector", selector.Text(16)),
// // zap.String("parent", parent.Text(16)),
// // )
if d.head.FrameNumber <= frameNumber {
if _, ok := d.pending[frameNumber]; !ok {
d.pending[frameNumber] = []*pendingFrame{}
}
// if d.head.FrameNumber <= frameNumber {
// if _, ok := d.pending[frameNumber]; !ok {
// d.pending[frameNumber] = []*pendingFrame{}
// }
// avoid heavy thrashing
for _, frame := range d.pending[frameNumber] {
if frame.selector.Cmp(selector) == 0 {
d.logger.Debug("exists in pending already")
return
}
}
}
// // avoid heavy thrashing
// for _, frame := range d.pending[frameNumber] {
// if frame.selector.Cmp(selector) == 0 {
// d.logger.Debug("exists in pending already")
// return
// }
// }
// }
if d.head.FrameNumber <= frameNumber {
// d.logger.Debug(
// "accumulate in pending",
// zap.Int("pending_neighbors", len(d.pending[frameNumber])),
// )
// if d.head.FrameNumber <= frameNumber {
// // d.logger.Debug(
// // "accumulate in pending",
// // zap.Int("pending_neighbors", len(d.pending[frameNumber])),
// // )
d.pending[frameNumber] = append(
d.pending[frameNumber],
&pendingFrame{
selector: selector,
parentSelector: parent,
frameNumber: frameNumber,
},
)
}
}
// d.pending[frameNumber] = append(
// d.pending[frameNumber],
// &pendingFrame{
// selector: selector,
// parentSelector: parent,
// frameNumber: frameNumber,
// },
// )
// }
// }
func (d *DataTimeReel) storePending(
selector *big.Int,
@ -511,76 +509,46 @@ func (d *DataTimeReel) processPending(
// zap.Uint64("last_received_frame", lastReceived.frameNumber),
// zap.Int("pending_frame_numbers", len(d.pending)),
// )
frameNumbers := []uint64{}
for f := range d.pending {
frameNumbers = append(frameNumbers, f)
}
sort.Slice(frameNumbers, func(i, j int) bool {
return frameNumbers[i] < frameNumbers[j]
})
lastSelector := lastReceived.selector
for {
next := d.head.FrameNumber + 1
// d.logger.Debug(
// "checking frame set",
// zap.Uint64("pending_frame_number", f),
// zap.Uint64("frame_number", frame.FrameNumber),
// )
// Pull the next
d.logger.Debug("try process next")
for _, f := range frameNumbers {
if d.head.FrameNumber > f {
continue
//// todo: revise for prover rings
rawFrames, err := d.clockStore.GetStagedDataClockFramesForFrameNumber(
d.filter,
next,
)
if err != nil {
panic(err)
}
nextF := d.head.FrameNumber + 1
if f == nextF {
nextPending := d.pending[f]
// d.logger.Debug(
// "checking frame set",
// zap.Uint64("pending_frame_number", f),
// zap.Uint64("frame_number", frame.FrameNumber),
// )
// Pull the next
for len(nextPending) != 0 {
d.logger.Debug("try process next")
next := nextPending[0]
d.pending[f] = d.pending[f][1:]
if f == lastReceived.frameNumber && next.selector.Cmp(lastSelector) == 0 {
d.pending[f] = append(d.pending[f], next)
if len(d.pending[f]) == 1 {
nextPending = nil
}
continue
}
for _, rawFrame := range rawFrames {
d.logger.Debug(
"processing frame",
zap.Uint64("frame_number", rawFrame.FrameNumber),
zap.String("output_tag", hex.EncodeToString(rawFrame.Output[:64])),
zap.Uint64("head_number", d.head.FrameNumber),
zap.String("head_output_tag", hex.EncodeToString(d.head.Output[:64])),
)
//// todo: revise for prover rings
rawFrame, err := d.clockStore.GetStagedDataClockFrame(
d.filter,
next.frameNumber,
next.selector.FillBytes(make([]byte, 32)),
false,
)
if err != nil {
distance, err := d.GetDistance(rawFrame)
if err != nil {
if !errors.Is(err, store.ErrNotFound) {
panic(err)
}
d.logger.Debug(
"processing frame",
zap.Uint64("frame_number", rawFrame.FrameNumber),
zap.String("output_tag", hex.EncodeToString(rawFrame.Output[:64])),
zap.Uint64("head_number", d.head.FrameNumber),
zap.String("head_output_tag", hex.EncodeToString(d.head.Output[:64])),
)
distance, err := d.GetDistance(rawFrame)
if err != nil {
if !errors.Is(err, store.ErrNotFound) {
panic(err)
}
continue
}
fmt.Println(f)
// Otherwise set it as the next and process all pending
d.setHead(rawFrame, distance)
break
continue
}
delete(d.pending, f)
} else {
// Otherwise set it as the next and process all pending
d.setHead(rawFrame, distance)
break
}
}
@ -735,7 +703,7 @@ func (d *DataTimeReel) forkChoice(
zap.Uint64("head_number", d.head.FrameNumber),
zap.String("head_output_tag", hex.EncodeToString(d.head.Output[:64])),
)
parentSelector, selector, err := frame.GetParentAndSelector()
_, selector, err := frame.GetParentAndSelector()
if err != nil {
panic(err)
}
@ -768,7 +736,7 @@ func (d *DataTimeReel) forkChoice(
if err != nil {
// If lineage cannot be verified, set it for later
if errors.Is(err, store.ErrNotFound) {
d.addPending(selector, parentSelector, frame.FrameNumber)
// d.addPending(selector, parentSelector, frame.FrameNumber)
return
} else {
panic(err)
@ -831,7 +799,7 @@ func (d *DataTimeReel) forkChoice(
if err != nil {
// If lineage cannot be verified, set it for later
if errors.Is(err, store.ErrNotFound) {
d.addPending(selector, parentSelector, frame.FrameNumber)
// d.addPending(selector, parentSelector, frame.FrameNumber)
return
} else {
panic(err)
@ -865,7 +833,7 @@ func (d *DataTimeReel) forkChoice(
zap.String("right_total", rightTotal.Text(16)),
zap.String("left_total", overweight.Text(16)),
)
d.addPending(selector, parentSelector, frame.FrameNumber)
// d.addPending(selector, parentSelector, frame.FrameNumber)
return
}

View File

@ -63,6 +63,10 @@ type ClockStore interface {
parentSelector []byte,
truncate bool,
) (*protobufs.ClockFrame, error)
GetStagedDataClockFramesForFrameNumber(
filter []byte,
frameNumber uint64,
) ([]*protobufs.ClockFrame, error)
SetLatestDataClockFrameNumber(
filter []byte,
frameNumber uint64,
@ -816,6 +820,45 @@ func (p *PebbleClockStore) GetStagedDataClockFrame(
return parent, nil
}
func (p *PebbleClockStore) GetStagedDataClockFramesForFrameNumber(
filter []byte,
frameNumber uint64,
) ([]*protobufs.ClockFrame, error) {
iter, err := p.db.NewIter(
clockDataParentIndexKey(filter, frameNumber, bytes.Repeat([]byte{0x00}, 32)),
clockDataParentIndexKey(filter, frameNumber, bytes.Repeat([]byte{0xff}, 32)),
)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, errors.Wrap(ErrNotFound, "get staged data clock frames")
}
return nil, errors.Wrap(err, "get staged data clock frames")
}
frames := []*protobufs.ClockFrame{}
for iter.First(); iter.Valid(); iter.Next() {
data := iter.Value()
frame := &protobufs.ClockFrame{}
if err := proto.Unmarshal(data, frame); err != nil {
return nil, errors.Wrap(err, "get staged data clock frames")
}
if err := p.fillAggregateProofs(frame, false); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get staged data clock frames",
)
}
frames = append(frames, frame)
}
iter.Close()
return frames, nil
}
// StageDataClockFrame implements ClockStore.
func (p *PebbleClockStore) StageDataClockFrame(
selector []byte,