Wait frame processing on sync end (#412)

This commit is contained in:
petricadaipegsp 2025-01-09 09:32:49 +01:00 committed by GitHub
parent b14a57b25a
commit 0b831deec1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 62 additions and 34 deletions

View File

@ -27,15 +27,13 @@ func (e *DataClockConsensusEngine) syncWithMesh() error {
if err != nil {
return errors.Wrap(err, "sync")
}
var doneChs []<-chan struct{}
for {
candidates := e.GetAheadPeers(max(latest.FrameNumber, e.latestFrameReceived))
if len(candidates) == 0 {
break
}
for _, candidate := range candidates {
if candidate.MaxFrame <= max(latest.FrameNumber, e.latestFrameReceived) {
continue
}
head, err := e.dataTimeReel.Head()
if err != nil {
return errors.Wrap(err, "sync")
@ -43,13 +41,24 @@ func (e *DataClockConsensusEngine) syncWithMesh() error {
if latest.FrameNumber < head.FrameNumber {
latest = head
}
latest, err = e.syncWithPeer(latest, candidate.MaxFrame, candidate.PeerID)
if candidate.MaxFrame <= max(latest.FrameNumber, e.latestFrameReceived) {
continue
}
latest, doneChs, err = e.syncWithPeer(latest, doneChs, candidate.MaxFrame, candidate.PeerID)
if err != nil {
e.logger.Debug("error syncing frame", zap.Error(err))
}
}
}
for _, doneCh := range doneChs {
select {
case <-e.ctx.Done():
return e.ctx.Err()
case <-doneCh:
}
}
e.logger.Info(
"returning leader frame",
zap.Uint64("frame_number", latest.FrameNumber),
@ -312,13 +321,13 @@ func (e *DataClockConsensusEngine) GetAheadPeers(frameNumber uint64) []internal.
}
func (e *DataClockConsensusEngine) syncWithPeer(
currentLatest *protobufs.ClockFrame,
latest *protobufs.ClockFrame,
doneChs []<-chan struct{},
maxFrame uint64,
peerId []byte,
) (*protobufs.ClockFrame, error) {
) (*protobufs.ClockFrame, []<-chan struct{}, error) {
e.syncingStatus = SyncStatusSynchronizing
defer func() { e.syncingStatus = SyncStatusNotSyncing }()
latest := currentLatest
e.logger.Info(
"polling peer for new frames",
zap.String("peer_id", peer.ID(peerId).String()),
@ -350,7 +359,7 @@ func (e *DataClockConsensusEngine) syncWithPeer(
zap.Error(err),
)
cooperative = false
return latest, errors.Wrap(err, "sync")
return latest, doneChs, errors.Wrap(err, "sync")
}
defer func() {
if err := cc.Close(); err != nil {
@ -378,12 +387,12 @@ func (e *DataClockConsensusEngine) syncWithPeer(
zap.Error(err),
)
cooperative = false
return latest, errors.Wrap(err, "sync")
return latest, doneChs, errors.Wrap(err, "sync")
}
if response == nil {
e.logger.Debug("received no response from peer")
return latest, nil
return latest, doneChs, nil
}
if response.ClockFrame == nil ||
@ -391,7 +400,7 @@ func (e *DataClockConsensusEngine) syncWithPeer(
response.ClockFrame.Timestamp < latest.Timestamp {
e.logger.Debug("received invalid response from peer")
cooperative = false
return latest, nil
return latest, doneChs, nil
}
e.logger.Info(
"received new leading frame",
@ -406,12 +415,16 @@ func (e *DataClockConsensusEngine) syncWithPeer(
if err := e.frameProver.VerifyDataClockFrame(
response.ClockFrame,
); err != nil {
return nil, errors.Wrap(err, "sync")
return latest, doneChs, errors.Wrap(err, "sync")
}
e.dataTimeReel.Insert(e.ctx, response.ClockFrame, true)
doneCh, err := e.dataTimeReel.Insert(e.ctx, response.ClockFrame)
if err != nil {
return latest, doneChs, errors.Wrap(err, "sync")
}
doneChs = append(doneChs, doneCh)
latest = response.ClockFrame
if latest.FrameNumber >= maxFrame {
return latest, nil
return latest, doneChs, nil
}
}
}

View File

@ -276,7 +276,9 @@ func (e *DataClockConsensusEngine) processFrame(
return dataFrame
}
e.dataTimeReel.Insert(e.ctx, nextFrame, true)
if _, err := e.dataTimeReel.Insert(e.ctx, nextFrame); err != nil {
e.logger.Debug("could not insert frame", zap.Error(err))
}
return nextFrame
} else {

View File

@ -253,7 +253,9 @@ func (e *DataClockConsensusEngine) handleClockFrame(
}
if frame.FrameNumber > head.FrameNumber {
e.dataTimeReel.Insert(e.ctx, frame, false)
if _, err := e.dataTimeReel.Insert(e.ctx, frame); err != nil {
e.logger.Debug("could not insert frame", zap.Error(err))
}
}
return nil

View File

@ -155,7 +155,7 @@ func (e *MasterClockConsensusEngine) publishProof(
zap.Uint64("frame_number", frame.FrameNumber),
)
e.masterTimeReel.Insert(context.TODO(), frame, false)
e.masterTimeReel.Insert(context.TODO(), frame)
}
e.state = consensus.EngineStateCollecting

View File

@ -208,7 +208,7 @@ func (e *MasterClockConsensusEngine) Start() <-chan error {
continue
}
e.masterTimeReel.Insert(context.TODO(), newFrame, false)
e.masterTimeReel.Insert(context.TODO(), newFrame)
}
}
}()

View File

@ -29,6 +29,7 @@ type pendingFrame struct {
selector *big.Int
parentSelector *big.Int
frameNumber uint64
done chan struct{}
}
type DataTimeReel struct {
@ -190,12 +191,18 @@ func (d *DataTimeReel) Head() (*protobufs.ClockFrame, error) {
return d.head, nil
}
var alreadyDone chan struct{} = func() chan struct{} {
done := make(chan struct{})
close(done)
return done
}()
// Insert enqueues a structurally valid frame into the time reel. If the frame
// is the next one in sequence, it advances the reel head forward and emits a
// new frame on the new frame channel.
func (d *DataTimeReel) Insert(ctx context.Context, frame *protobufs.ClockFrame, isSync bool) error {
func (d *DataTimeReel) Insert(ctx context.Context, frame *protobufs.ClockFrame) (<-chan struct{}, error) {
if err := d.ctx.Err(); err != nil {
return err
return nil, err
}
d.logger.Debug(
@ -222,21 +229,24 @@ func (d *DataTimeReel) Insert(ctx context.Context, frame *protobufs.ClockFrame,
d.storePending(selector, parent, distance, frame)
if d.head.FrameNumber+1 == frame.FrameNumber {
done := make(chan struct{})
select {
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
case <-d.ctx.Done():
return d.ctx.Err()
return nil, d.ctx.Err()
case d.frames <- &pendingFrame{
selector: selector,
parentSelector: parent,
frameNumber: frame.FrameNumber,
done: done,
}:
return done, nil
}
}
}
return nil
return alreadyDone, nil
}
func (
@ -393,6 +403,7 @@ func (d *DataTimeReel) runLoop() {
// Otherwise set it as the next and process all pending
if err = d.setHead(rawFrame, distance); err != nil {
close(frame.done)
continue
}
d.processPending(d.head, frame)
@ -559,6 +570,7 @@ func (d *DataTimeReel) processPending(
frame *protobufs.ClockFrame,
lastReceived *pendingFrame,
) {
defer close(lastReceived.done)
// d.logger.Debug(
// "process pending",
// zap.Uint64("head_frame", frame.FrameNumber),

View File

@ -233,7 +233,7 @@ func TestDataTimeReel(t *testing.T) {
i+1,
10,
)
d.Insert(ctx, frame, false)
d.Insert(ctx, frame)
prevBI, _ := frame.GetSelector()
prev = prevBI.FillBytes(make([]byte, 32))
}
@ -264,7 +264,7 @@ func TestDataTimeReel(t *testing.T) {
}
for i := 99; i >= 0; i-- {
err := d.Insert(ctx, insertFrames[i], false)
_, err := d.Insert(ctx, insertFrames[i])
assert.NoError(t, err)
}
@ -286,7 +286,7 @@ func TestDataTimeReel(t *testing.T) {
i+1,
10,
)
d.Insert(ctx, frame, false)
d.Insert(ctx, frame)
prevBI, _ := frame.GetSelector()
prev = prevBI.FillBytes(make([]byte, 32))
@ -334,7 +334,7 @@ func TestDataTimeReel(t *testing.T) {
}
for i := 99; i >= 0; i-- {
err := d.Insert(ctx, insertFrames[i], false)
_, err := d.Insert(ctx, insertFrames[i])
assert.NoError(t, err)
}
@ -397,7 +397,7 @@ func TestDataTimeReel(t *testing.T) {
// Someone is honest, but running backwards:
for i := 99; i >= 0; i-- {
err := d.Insert(ctx, insertFrames[i], false)
_, err := d.Insert(ctx, insertFrames[i])
gotime.Sleep(1 * gotime.Second)
assert.NoError(t, err)
}

View File

@ -123,13 +123,12 @@ func (m *MasterTimeReel) Head() (*protobufs.ClockFrame, error) {
func (m *MasterTimeReel) Insert(
ctx context.Context,
frame *protobufs.ClockFrame,
isSync bool,
) error {
) (<-chan struct{}, error) {
go func() {
m.frames <- frame
}()
return nil
return alreadyDone, nil
}
// NewFrameCh implements TimeReel.

View File

@ -61,7 +61,7 @@ func TestMasterTimeReel(t *testing.T) {
)
assert.NoError(t, err)
err := m.Insert(ctx, frame, false)
_, err := m.Insert(ctx, frame)
assert.NoError(t, err)
}
@ -81,7 +81,7 @@ func TestMasterTimeReel(t *testing.T) {
}
for i := 99; i >= 0; i-- {
err := m.Insert(ctx, insertFrames[i], false)
_, err := m.Insert(ctx, insertFrames[i])
assert.NoError(t, err)
}

View File

@ -9,7 +9,7 @@ import (
type TimeReel interface {
Start() error
Stop()
Insert(ctx context.Context, frame *protobufs.ClockFrame, isSync bool) error
Insert(ctx context.Context, frame *protobufs.ClockFrame) (<-chan struct{}, error)
Head() (*protobufs.ClockFrame, error)
NewFrameCh() <-chan *protobufs.ClockFrame
BadFrameCh() <-chan *protobufs.ClockFrame