From 7ad50ec33befcc8eb224eaed037b0d13d3704837 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Fri, 25 Oct 2024 04:45:46 -0500 Subject: [PATCH] temporarily bulk process on frame candidates --- node/consensus/data/main_data_loop.go | 13 +- node/consensus/data/message_handler.go | 2 + node/consensus/time/data_time_reel.go | 214 +++++++++++-------------- node/store/clock.go | 43 +++++ 4 files changed, 145 insertions(+), 127 deletions(-) diff --git a/node/consensus/data/main_data_loop.go b/node/consensus/data/main_data_loop.go index e305f47..2624050 100644 --- a/node/consensus/data/main_data_loop.go +++ b/node/consensus/data/main_data_loop.go @@ -57,14 +57,14 @@ func (e *DataClockConsensusEngine) runLoop() { select { case dataFrame := <-dataFrameCh: + e.logger.Info( + "current frame head", + zap.Uint64("frame_number", dataFrame.FrameNumber), + ) if latestFrame != nil && dataFrame.FrameNumber > latestFrame.FrameNumber { latestFrame = dataFrame } - e.logger.Info( - "current frame head", - zap.Uint64("frame_number", latestFrame.FrameNumber), - ) if e.latestFrameReceived < latestFrame.FrameNumber { e.latestFrameReceived = latestFrame.FrameNumber @@ -167,6 +167,11 @@ func (e *DataClockConsensusEngine) runLoop() { panic(err) } + e.logger.Info( + "current frame head", + zap.Uint64("frame_number", dataFrame.FrameNumber), + ) + if latestFrame == nil || latestFrame.FrameNumber < dataFrame.FrameNumber { latestFrame, err = e.dataTimeReel.Head() diff --git a/node/consensus/data/message_handler.go b/node/consensus/data/message_handler.go index e01de3c..38e1ca7 100644 --- a/node/consensus/data/message_handler.go +++ b/node/consensus/data/message_handler.go @@ -181,6 +181,8 @@ func (e *DataClockConsensusEngine) handleRebroadcast( continue } + e.logger.Info("receiving synchronization data") + if err := e.handleClockFrame(peerID, address, frame); err != nil { return errors.Wrap(err, "handle rebroadcast") } diff --git a/node/consensus/time/data_time_reel.go b/node/consensus/time/data_time_reel.go index fd6b45a..4280f03 100644 --- a/node/consensus/time/data_time_reel.go +++ b/node/consensus/time/data_time_reel.go @@ -3,9 +3,7 @@ package time import ( "bytes" "encoding/hex" - "fmt" "math/big" - "sort" "sync" lru "github.com/hashicorp/golang-lru/v2" @@ -50,12 +48,12 @@ type DataTimeReel struct { headDistance *big.Int lruFrames *lru.Cache[string, string] proverTries []*tries.RollingFrecencyCritbitTrie - pending map[uint64][]*pendingFrame - incompleteForks map[uint64][]*pendingFrame - frames chan *pendingFrame - newFrameCh chan *protobufs.ClockFrame - badFrameCh chan *protobufs.ClockFrame - done chan bool + // pending map[uint64][]*pendingFrame + incompleteForks map[uint64][]*pendingFrame + frames chan *pendingFrame + newFrameCh chan *protobufs.ClockFrame + badFrameCh chan *protobufs.ClockFrame + done chan bool } func NewDataTimeReel( @@ -110,12 +108,12 @@ func NewDataTimeReel( initialInclusionProof: initialInclusionProof, initialProverKeys: initialProverKeys, lruFrames: cache, - pending: make(map[uint64][]*pendingFrame), - incompleteForks: make(map[uint64][]*pendingFrame), - frames: make(chan *pendingFrame), - newFrameCh: make(chan *protobufs.ClockFrame), - badFrameCh: make(chan *protobufs.ClockFrame), - done: make(chan bool), + // pending: make(map[uint64][]*pendingFrame), + incompleteForks: make(map[uint64][]*pendingFrame), + frames: make(chan *pendingFrame), + newFrameCh: make(chan *protobufs.ClockFrame), + badFrameCh: make(chan *protobufs.ClockFrame), + done: make(chan bool), } } @@ -313,11 +311,11 @@ func (d *DataTimeReel) runLoop() { if d.head.FrameNumber < rawFrame.FrameNumber { d.logger.Debug("frame is higher") - parent := new(big.Int).SetBytes(rawFrame.ParentSelector) - selector, err := rawFrame.GetSelector() - if err != nil { - panic(err) - } + // parent := new(big.Int).SetBytes(rawFrame.ParentSelector) + // selector, err := rawFrame.GetSelector() + // if err != nil { + // panic(err) + // } distance, err := d.GetDistance(rawFrame) if err != nil { @@ -325,7 +323,7 @@ func (d *DataTimeReel) runLoop() { panic(err) } - d.addPending(selector, parent, frame.frameNumber) + // d.addPending(selector, parent, frame.frameNumber) d.processPending(d.head, frame) continue } @@ -417,49 +415,49 @@ func (d *DataTimeReel) runLoop() { } } -func (d *DataTimeReel) addPending( - selector *big.Int, - parent *big.Int, - frameNumber uint64, -) { - // d.logger.Debug( - // "add pending", - // zap.Uint64("head_frame_number", d.head.FrameNumber), - // zap.Uint64("add_frame_number", frameNumber), - // zap.String("selector", selector.Text(16)), - // zap.String("parent", parent.Text(16)), - // ) +// func (d *DataTimeReel) addPending( +// selector *big.Int, +// parent *big.Int, +// frameNumber uint64, +// ) { +// // d.logger.Debug( +// // "add pending", +// // zap.Uint64("head_frame_number", d.head.FrameNumber), +// // zap.Uint64("add_frame_number", frameNumber), +// // zap.String("selector", selector.Text(16)), +// // zap.String("parent", parent.Text(16)), +// // ) - if d.head.FrameNumber <= frameNumber { - if _, ok := d.pending[frameNumber]; !ok { - d.pending[frameNumber] = []*pendingFrame{} - } +// if d.head.FrameNumber <= frameNumber { +// if _, ok := d.pending[frameNumber]; !ok { +// d.pending[frameNumber] = []*pendingFrame{} +// } - // avoid heavy thrashing - for _, frame := range d.pending[frameNumber] { - if frame.selector.Cmp(selector) == 0 { - d.logger.Debug("exists in pending already") - return - } - } - } +// // avoid heavy thrashing +// for _, frame := range d.pending[frameNumber] { +// if frame.selector.Cmp(selector) == 0 { +// d.logger.Debug("exists in pending already") +// return +// } +// } +// } - if d.head.FrameNumber <= frameNumber { - // d.logger.Debug( - // "accumulate in pending", - // zap.Int("pending_neighbors", len(d.pending[frameNumber])), - // ) +// if d.head.FrameNumber <= frameNumber { +// // d.logger.Debug( +// // "accumulate in pending", +// // zap.Int("pending_neighbors", len(d.pending[frameNumber])), +// // ) - d.pending[frameNumber] = append( - d.pending[frameNumber], - &pendingFrame{ - selector: selector, - parentSelector: parent, - frameNumber: frameNumber, - }, - ) - } -} +// d.pending[frameNumber] = append( +// d.pending[frameNumber], +// &pendingFrame{ +// selector: selector, +// parentSelector: parent, +// frameNumber: frameNumber, +// }, +// ) +// } +// } func (d *DataTimeReel) storePending( selector *big.Int, @@ -511,76 +509,46 @@ func (d *DataTimeReel) processPending( // zap.Uint64("last_received_frame", lastReceived.frameNumber), // zap.Int("pending_frame_numbers", len(d.pending)), // ) - frameNumbers := []uint64{} - for f := range d.pending { - frameNumbers = append(frameNumbers, f) - } - sort.Slice(frameNumbers, func(i, j int) bool { - return frameNumbers[i] < frameNumbers[j] - }) - lastSelector := lastReceived.selector + for { + next := d.head.FrameNumber + 1 + // d.logger.Debug( + // "checking frame set", + // zap.Uint64("pending_frame_number", f), + // zap.Uint64("frame_number", frame.FrameNumber), + // ) + // Pull the next + d.logger.Debug("try process next") - for _, f := range frameNumbers { - if d.head.FrameNumber > f { - continue + //// todo: revise for prover rings + rawFrames, err := d.clockStore.GetStagedDataClockFramesForFrameNumber( + d.filter, + next, + ) + if err != nil { + panic(err) } - nextF := d.head.FrameNumber + 1 - if f == nextF { - nextPending := d.pending[f] - // d.logger.Debug( - // "checking frame set", - // zap.Uint64("pending_frame_number", f), - // zap.Uint64("frame_number", frame.FrameNumber), - // ) - // Pull the next - for len(nextPending) != 0 { - d.logger.Debug("try process next") - next := nextPending[0] - d.pending[f] = d.pending[f][1:] - if f == lastReceived.frameNumber && next.selector.Cmp(lastSelector) == 0 { - d.pending[f] = append(d.pending[f], next) - if len(d.pending[f]) == 1 { - nextPending = nil - } - continue - } + for _, rawFrame := range rawFrames { + d.logger.Debug( + "processing frame", + zap.Uint64("frame_number", rawFrame.FrameNumber), + zap.String("output_tag", hex.EncodeToString(rawFrame.Output[:64])), + zap.Uint64("head_number", d.head.FrameNumber), + zap.String("head_output_tag", hex.EncodeToString(d.head.Output[:64])), + ) - //// todo: revise for prover rings - rawFrame, err := d.clockStore.GetStagedDataClockFrame( - d.filter, - next.frameNumber, - next.selector.FillBytes(make([]byte, 32)), - false, - ) - if err != nil { + distance, err := d.GetDistance(rawFrame) + if err != nil { + if !errors.Is(err, store.ErrNotFound) { panic(err) } - d.logger.Debug( - "processing frame", - zap.Uint64("frame_number", rawFrame.FrameNumber), - zap.String("output_tag", hex.EncodeToString(rawFrame.Output[:64])), - zap.Uint64("head_number", d.head.FrameNumber), - zap.String("head_output_tag", hex.EncodeToString(d.head.Output[:64])), - ) - distance, err := d.GetDistance(rawFrame) - if err != nil { - if !errors.Is(err, store.ErrNotFound) { - panic(err) - } - - continue - } - fmt.Println(f) - // Otherwise set it as the next and process all pending - d.setHead(rawFrame, distance) - break + continue } - delete(d.pending, f) - } else { + // Otherwise set it as the next and process all pending + d.setHead(rawFrame, distance) break } } @@ -735,7 +703,7 @@ func (d *DataTimeReel) forkChoice( zap.Uint64("head_number", d.head.FrameNumber), zap.String("head_output_tag", hex.EncodeToString(d.head.Output[:64])), ) - parentSelector, selector, err := frame.GetParentAndSelector() + _, selector, err := frame.GetParentAndSelector() if err != nil { panic(err) } @@ -768,7 +736,7 @@ func (d *DataTimeReel) forkChoice( if err != nil { // If lineage cannot be verified, set it for later if errors.Is(err, store.ErrNotFound) { - d.addPending(selector, parentSelector, frame.FrameNumber) + // d.addPending(selector, parentSelector, frame.FrameNumber) return } else { panic(err) @@ -831,7 +799,7 @@ func (d *DataTimeReel) forkChoice( if err != nil { // If lineage cannot be verified, set it for later if errors.Is(err, store.ErrNotFound) { - d.addPending(selector, parentSelector, frame.FrameNumber) + // d.addPending(selector, parentSelector, frame.FrameNumber) return } else { panic(err) @@ -865,7 +833,7 @@ func (d *DataTimeReel) forkChoice( zap.String("right_total", rightTotal.Text(16)), zap.String("left_total", overweight.Text(16)), ) - d.addPending(selector, parentSelector, frame.FrameNumber) + // d.addPending(selector, parentSelector, frame.FrameNumber) return } diff --git a/node/store/clock.go b/node/store/clock.go index 3ca41c3..03c9d71 100644 --- a/node/store/clock.go +++ b/node/store/clock.go @@ -63,6 +63,10 @@ type ClockStore interface { parentSelector []byte, truncate bool, ) (*protobufs.ClockFrame, error) + GetStagedDataClockFramesForFrameNumber( + filter []byte, + frameNumber uint64, + ) ([]*protobufs.ClockFrame, error) SetLatestDataClockFrameNumber( filter []byte, frameNumber uint64, @@ -816,6 +820,45 @@ func (p *PebbleClockStore) GetStagedDataClockFrame( return parent, nil } +func (p *PebbleClockStore) GetStagedDataClockFramesForFrameNumber( + filter []byte, + frameNumber uint64, +) ([]*protobufs.ClockFrame, error) { + iter, err := p.db.NewIter( + clockDataParentIndexKey(filter, frameNumber, bytes.Repeat([]byte{0x00}, 32)), + clockDataParentIndexKey(filter, frameNumber, bytes.Repeat([]byte{0xff}, 32)), + ) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return nil, errors.Wrap(ErrNotFound, "get staged data clock frames") + } + return nil, errors.Wrap(err, "get staged data clock frames") + } + + frames := []*protobufs.ClockFrame{} + + for iter.First(); iter.Valid(); iter.Next() { + data := iter.Value() + frame := &protobufs.ClockFrame{} + if err := proto.Unmarshal(data, frame); err != nil { + return nil, errors.Wrap(err, "get staged data clock frames") + } + + if err := p.fillAggregateProofs(frame, false); err != nil { + return nil, errors.Wrap( + errors.Wrap(err, ErrInvalidData.Error()), + "get staged data clock frames", + ) + } + + frames = append(frames, frame) + } + + iter.Close() + + return frames, nil +} + // StageDataClockFrame implements ClockStore. func (p *PebbleClockStore) StageDataClockFrame( selector []byte,