From b14a57b25a59f9273bf308c673b5a24a6a43e73e Mon Sep 17 00:00:00 2001 From: Tyler Sturos <55340199+tjsturos@users.noreply.github.com> Date: Wed, 8 Jan 2025 23:31:13 -0900 Subject: [PATCH 1/2] speed up data worker connection speeds (#414) Co-authored-by: Tyler Sturos --- .../data/data_clock_consensus_engine.go | 337 +++++++----------- node/consensus/data/main_data_loop.go | 46 +-- 2 files changed, 137 insertions(+), 246 deletions(-) diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index 14e171e..0ca6f89 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -141,6 +141,7 @@ type DataClockConsensusEngine struct { infoMessageProcessorCh chan *pb.Message report *protobufs.SelfTestReport clients []protobufs.DataIPCServiceClient + clientsMx sync.Mutex grpcRateLimiter *RateLimiter previousFrameProven *protobufs.ClockFrame previousTree *mt.MerkleTree @@ -562,19 +563,7 @@ func (e *DataClockConsensusEngine) Start() <-chan error { e.wg.Add(1) go func() { defer e.wg.Done() - if len(e.config.Engine.DataWorkerMultiaddrs) != 0 { - e.clients, err = e.createParallelDataClientsFromList() - if err != nil { - panic(err) - } - } else { - e.clients, err = e.createParallelDataClientsFromBaseMultiaddr( - e.config.Engine.DataWorkerCount, - ) - if err != nil { - panic(err) - } - } + e.createParallelDataWorkerClients() }() return errChan @@ -831,211 +820,155 @@ func (e *DataClockConsensusEngine) createCommunicationKeys() error { return nil } -func (e *DataClockConsensusEngine) createParallelDataClientsFromListAndIndex( - index uint32, +func (e *DataClockConsensusEngine) connectToClient( + index int, + useList bool, ) ( protobufs.DataIPCServiceClient, error, ) { - ma, err := multiaddr.NewMultiaddr(e.config.Engine.DataWorkerMultiaddrs[index]) - if err != nil { - return nil, errors.Wrap(err, "create parallel data client") - } - - _, addr, err := mn.DialArgs(ma) - if err != nil { - return nil, errors.Wrap(err, "create parallel data client") - } - - ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second) - defer cancel() - conn, err := qgrpc.DialContext( - ctx, - addr, - grpc.WithTransportCredentials( - insecure.NewCredentials(), - ), - grpc.WithDefaultCallOptions( - grpc.MaxCallSendMsgSize(10*1024*1024), - grpc.MaxCallRecvMsgSize(10*1024*1024), - ), - grpc.WithBlock(), - ) - if err != nil { - return nil, errors.Wrap(err, "create parallel data client") - } - - client := protobufs.NewDataIPCServiceClient(conn) - - e.logger.Info( - "connected to data worker process", - zap.Uint32("client", index), - ) - return client, nil -} - -func ( - e *DataClockConsensusEngine, -) createParallelDataClientsFromBaseMultiaddrAndIndex( - index uint32, -) ( - protobufs.DataIPCServiceClient, - error, -) { - e.logger.Info( - "re-connecting to data worker process", - zap.Uint32("client", index), - ) - - ma, err := multiaddr.NewMultiaddr( - fmt.Sprintf( - e.config.Engine.DataWorkerBaseListenMultiaddr, - int(e.config.Engine.DataWorkerBaseListenPort)+int(index), - ), - ) - if err != nil { - return nil, errors.Wrap(err, "create parallel data client") - } - - _, addr, err := mn.DialArgs(ma) - if err != nil { - return nil, errors.Wrap(err, "create parallel data client") - } - - ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second) - defer cancel() - conn, err := qgrpc.DialContext( - ctx, - addr, - grpc.WithTransportCredentials( - insecure.NewCredentials(), - ), - grpc.WithDefaultCallOptions( - grpc.MaxCallSendMsgSize(10*1024*1024), - grpc.MaxCallRecvMsgSize(10*1024*1024), - ), - grpc.WithBlock(), - ) - if err != nil { - return nil, errors.Wrap(err, "create parallel data client") - } - - client := protobufs.NewDataIPCServiceClient(conn) - - e.logger.Info( - "connected to data worker process", - zap.Uint32("client", index), - ) - return client, nil -} - -func (e *DataClockConsensusEngine) createParallelDataClientsFromList() ( - []protobufs.DataIPCServiceClient, - error, -) { - parallelism := len(e.config.Engine.DataWorkerMultiaddrs) - - e.logger.Info( - "connecting to data worker processes", - zap.Int("parallelism", parallelism), - ) - - clients := make([]protobufs.DataIPCServiceClient, parallelism) - - for i := 0; i < parallelism; i++ { - ma, err := multiaddr.NewMultiaddr(e.config.Engine.DataWorkerMultiaddrs[i]) - if err != nil { - panic(err) - } - - _, addr, err := mn.DialArgs(ma) - if err != nil { - e.logger.Error("could not get dial args", zap.Error(err)) - continue - } - - ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second) - defer cancel() - conn, err := qgrpc.DialContext( - ctx, - addr, - grpc.WithTransportCredentials( - insecure.NewCredentials(), - ), - grpc.WithDefaultCallOptions( - grpc.MaxCallSendMsgSize(10*1024*1024), - grpc.MaxCallRecvMsgSize(10*1024*1024), - ), - grpc.WithBlock(), - ) - if err != nil { - e.logger.Error("could not dial", zap.Error(err)) - continue - } - - clients[i] = protobufs.NewDataIPCServiceClient(conn) - } - - e.logger.Info( - "connected to data worker processes", - zap.Int("parallelism", parallelism), - ) - return clients, nil -} - -func (e *DataClockConsensusEngine) createParallelDataClientsFromBaseMultiaddr( - parallelism int, -) ([]protobufs.DataIPCServiceClient, error) { - e.logger.Info( - "connecting to data worker processes", - zap.Int("parallelism", parallelism), - ) - - clients := make([]protobufs.DataIPCServiceClient, parallelism) - - for i := 0; i < parallelism; i++ { - ma, err := multiaddr.NewMultiaddr( + var ma multiaddr.Multiaddr + var err error + if useList { + ma, err = multiaddr.NewMultiaddr(e.config.Engine.DataWorkerMultiaddrs[index]) + } else { + ma, err = multiaddr.NewMultiaddr( fmt.Sprintf( e.config.Engine.DataWorkerBaseListenMultiaddr, - int(e.config.Engine.DataWorkerBaseListenPort)+i, + int(e.config.Engine.DataWorkerBaseListenPort)+int(index), ), ) - if err != nil { - panic(err) - } + } + if err != nil { + e.logger.Error("failed to create multiaddr", zap.Error(err)) + return nil, err + } - _, addr, err := mn.DialArgs(ma) - if err != nil { - e.logger.Error("could not get dial args", zap.Error(err)) - continue - } - ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second) - defer cancel() - conn, err := qgrpc.DialContext( - ctx, - addr, - grpc.WithTransportCredentials( - insecure.NewCredentials(), - ), - grpc.WithDefaultCallOptions( - grpc.MaxCallSendMsgSize(10*1024*1024), - grpc.MaxCallRecvMsgSize(10*1024*1024), - ), - grpc.WithBlock(), + _, addr, err := mn.DialArgs(ma) + + if err != nil { + e.logger.Error("could not get dial args", + zap.Error(err), + zap.String("multiaddr", ma.String()), + zap.Int("index", index), ) - if err != nil { - e.logger.Error("could not dial", zap.Error(err)) - continue - } + return nil, err + } - clients[i] = protobufs.NewDataIPCServiceClient(conn) + ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second) + defer cancel() + conn, err := qgrpc.DialContext( + ctx, + addr, + grpc.WithTransportCredentials( + insecure.NewCredentials(), + ), + grpc.WithDefaultCallOptions( + grpc.MaxCallSendMsgSize(10*1024*1024), + grpc.MaxCallRecvMsgSize(10*1024*1024), + ), + grpc.WithBlock(), + ) + if err != nil { + e.logger.Error("could not dial", + zap.Error(err), + zap.String("multiaddr", ma.String()), + zap.Int("index", index), + ) + return nil, err } e.logger.Info( - "connected to data worker processes", + "connected to data worker process", + zap.String("multiaddr", ma.String()), + ) + + return protobufs.NewDataIPCServiceClient(conn), nil + +} + +func (e *DataClockConsensusEngine) createParallelDataWorkerClients() { + parallelism := len(e.config.Engine.DataWorkerMultiaddrs) + useList := true + if parallelism == 0 { + parallelism = e.config.Engine.DataWorkerCount + useList = false + } + + e.clientsMx.Lock() + e.clients = make([]protobufs.DataIPCServiceClient, parallelism) + e.clientsMx.Unlock() + + e.logger.Info( + "connecting to data worker processes", zap.Int("parallelism", parallelism), ) - return clients, nil + + wg := sync.WaitGroup{} + wg.Add(parallelism) + for i := 0; i < parallelism; i++ { + index := i + go func() { + defer wg.Done() + client, err := e.connectToClient(index, useList) + if err != nil { + e.clientsMx.Lock() + e.clients[index] = nil + e.clientsMx.Unlock() + e.logger.Error("failed to connect to data worker", zap.Error(err)) + return + } + e.clientsMx.Lock() + e.clients[index] = client + e.clientsMx.Unlock() + }() + } + wg.Wait() +} + +func (e *DataClockConsensusEngine) tryReconnectDataWorkerClients() { + // could reload worker list config here + parallelism := len(e.config.Engine.DataWorkerMultiaddrs) + useList := true + if parallelism == 0 { + parallelism = e.config.Engine.DataWorkerCount + useList = false + } + + wg := sync.WaitGroup{} + wg.Add(parallelism) + for i := 0; i < parallelism; i++ { + index := i + + go func() { + defer wg.Done() + if e.clients[index] != nil { + return + } + for j := 3; j >= 0; j-- { + client, err := e.connectToClient(index, useList) + if err != nil { + e.clientsMx.Lock() + e.clients[index] = nil + e.clientsMx.Unlock() + e.logger.Error("failed to connect to data worker", + zap.Error(err), + zap.Int("index", index), + ) + time.Sleep(50 * time.Millisecond) + continue + } + e.clientsMx.Lock() + e.logger.Info("reconnected to data worker", + zap.Int("index", index), + ) + e.clients[index] = client + e.clientsMx.Unlock() + break + } + }() + } + wg.Wait() } func (e *DataClockConsensusEngine) GetWorkerCount() uint32 { diff --git a/node/consensus/data/main_data_loop.go b/node/consensus/data/main_data_loop.go index 36a4722..d5832e5 100644 --- a/node/consensus/data/main_data_loop.go +++ b/node/consensus/data/main_data_loop.go @@ -2,7 +2,6 @@ package data import ( "bytes" - "sync" "time" "github.com/iden3/go-iden3-crypto/poseidon" @@ -234,6 +233,7 @@ func (e *DataClockConsensusEngine) processFrame( latestFrame *protobufs.ClockFrame, dataFrame *protobufs.ClockFrame, ) *protobufs.ClockFrame { + e.logger.Info( "current frame head", zap.Uint64("frame_number", dataFrame.FrameNumber), @@ -311,49 +311,7 @@ func (e *DataClockConsensusEngine) processFrame( e.clientReconnectTest++ if e.clientReconnectTest >= 10 { - wg := sync.WaitGroup{} - wg.Add(len(e.clients)) - for i, client := range e.clients { - i := i - client := client - go func() { - for j := 3; j >= 0; j-- { - var err error - if client == nil { - if len(e.config.Engine.DataWorkerMultiaddrs) != 0 { - e.logger.Error( - "client failed, reconnecting after 50ms", - zap.Uint32("client", uint32(i)), - ) - time.Sleep(50 * time.Millisecond) - client, err = e.createParallelDataClientsFromListAndIndex(uint32(i)) - if err != nil { - e.logger.Error("failed to reconnect", zap.Error(err)) - } - } else if len(e.config.Engine.DataWorkerMultiaddrs) == 0 { - e.logger.Error( - "client failed, reconnecting after 50ms", - zap.Uint32("client", uint32(i)), - ) - time.Sleep(50 * time.Millisecond) - client, err = - e.createParallelDataClientsFromBaseMultiaddrAndIndex(uint32(i)) - if err != nil { - e.logger.Error( - "failed to reconnect", - zap.Uint32("client", uint32(i)), - zap.Error(err), - ) - } - } - e.clients[i] = client - continue - } - } - wg.Done() - }() - } - wg.Wait() + e.tryReconnectDataWorkerClients() e.clientReconnectTest = 0 } From 0b831deec15e57f976f3d9be558b3bc8a8791afd Mon Sep 17 00:00:00 2001 From: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> Date: Thu, 9 Jan 2025 09:32:49 +0100 Subject: [PATCH 2/2] Wait frame processing on sync end (#412) --- node/consensus/data/consensus_frames.go | 41 ++++++++++++------- node/consensus/data/main_data_loop.go | 4 +- node/consensus/data/message_handler.go | 4 +- node/consensus/master/broadcast_messaging.go | 2 +- .../master/master_clock_consensus_engine.go | 2 +- node/consensus/time/data_time_reel.go | 22 +++++++--- node/consensus/time/data_time_reel_test.go | 10 ++--- node/consensus/time/master_time_reel.go | 5 +-- node/consensus/time/master_time_reel_test.go | 4 +- node/consensus/time/time_reel.go | 2 +- 10 files changed, 62 insertions(+), 34 deletions(-) diff --git a/node/consensus/data/consensus_frames.go b/node/consensus/data/consensus_frames.go index 99c659b..52509a4 100644 --- a/node/consensus/data/consensus_frames.go +++ b/node/consensus/data/consensus_frames.go @@ -27,15 +27,13 @@ func (e *DataClockConsensusEngine) syncWithMesh() error { if err != nil { return errors.Wrap(err, "sync") } + var doneChs []<-chan struct{} for { candidates := e.GetAheadPeers(max(latest.FrameNumber, e.latestFrameReceived)) if len(candidates) == 0 { break } for _, candidate := range candidates { - if candidate.MaxFrame <= max(latest.FrameNumber, e.latestFrameReceived) { - continue - } head, err := e.dataTimeReel.Head() if err != nil { return errors.Wrap(err, "sync") @@ -43,13 +41,24 @@ func (e *DataClockConsensusEngine) syncWithMesh() error { if latest.FrameNumber < head.FrameNumber { latest = head } - latest, err = e.syncWithPeer(latest, candidate.MaxFrame, candidate.PeerID) + if candidate.MaxFrame <= max(latest.FrameNumber, e.latestFrameReceived) { + continue + } + latest, doneChs, err = e.syncWithPeer(latest, doneChs, candidate.MaxFrame, candidate.PeerID) if err != nil { e.logger.Debug("error syncing frame", zap.Error(err)) } } } + for _, doneCh := range doneChs { + select { + case <-e.ctx.Done(): + return e.ctx.Err() + case <-doneCh: + } + } + e.logger.Info( "returning leader frame", zap.Uint64("frame_number", latest.FrameNumber), @@ -312,13 +321,13 @@ func (e *DataClockConsensusEngine) GetAheadPeers(frameNumber uint64) []internal. } func (e *DataClockConsensusEngine) syncWithPeer( - currentLatest *protobufs.ClockFrame, + latest *protobufs.ClockFrame, + doneChs []<-chan struct{}, maxFrame uint64, peerId []byte, -) (*protobufs.ClockFrame, error) { +) (*protobufs.ClockFrame, []<-chan struct{}, error) { e.syncingStatus = SyncStatusSynchronizing defer func() { e.syncingStatus = SyncStatusNotSyncing }() - latest := currentLatest e.logger.Info( "polling peer for new frames", zap.String("peer_id", peer.ID(peerId).String()), @@ -350,7 +359,7 @@ func (e *DataClockConsensusEngine) syncWithPeer( zap.Error(err), ) cooperative = false - return latest, errors.Wrap(err, "sync") + return latest, doneChs, errors.Wrap(err, "sync") } defer func() { if err := cc.Close(); err != nil { @@ -378,12 +387,12 @@ func (e *DataClockConsensusEngine) syncWithPeer( zap.Error(err), ) cooperative = false - return latest, errors.Wrap(err, "sync") + return latest, doneChs, errors.Wrap(err, "sync") } if response == nil { e.logger.Debug("received no response from peer") - return latest, nil + return latest, doneChs, nil } if response.ClockFrame == nil || @@ -391,7 +400,7 @@ func (e *DataClockConsensusEngine) syncWithPeer( response.ClockFrame.Timestamp < latest.Timestamp { e.logger.Debug("received invalid response from peer") cooperative = false - return latest, nil + return latest, doneChs, nil } e.logger.Info( "received new leading frame", @@ -406,12 +415,16 @@ func (e *DataClockConsensusEngine) syncWithPeer( if err := e.frameProver.VerifyDataClockFrame( response.ClockFrame, ); err != nil { - return nil, errors.Wrap(err, "sync") + return latest, doneChs, errors.Wrap(err, "sync") } - e.dataTimeReel.Insert(e.ctx, response.ClockFrame, true) + doneCh, err := e.dataTimeReel.Insert(e.ctx, response.ClockFrame) + if err != nil { + return latest, doneChs, errors.Wrap(err, "sync") + } + doneChs = append(doneChs, doneCh) latest = response.ClockFrame if latest.FrameNumber >= maxFrame { - return latest, nil + return latest, doneChs, nil } } } diff --git a/node/consensus/data/main_data_loop.go b/node/consensus/data/main_data_loop.go index d5832e5..f3616d5 100644 --- a/node/consensus/data/main_data_loop.go +++ b/node/consensus/data/main_data_loop.go @@ -276,7 +276,9 @@ func (e *DataClockConsensusEngine) processFrame( return dataFrame } - e.dataTimeReel.Insert(e.ctx, nextFrame, true) + if _, err := e.dataTimeReel.Insert(e.ctx, nextFrame); err != nil { + e.logger.Debug("could not insert frame", zap.Error(err)) + } return nextFrame } else { diff --git a/node/consensus/data/message_handler.go b/node/consensus/data/message_handler.go index 23e856a..e4e2d0d 100644 --- a/node/consensus/data/message_handler.go +++ b/node/consensus/data/message_handler.go @@ -253,7 +253,9 @@ func (e *DataClockConsensusEngine) handleClockFrame( } if frame.FrameNumber > head.FrameNumber { - e.dataTimeReel.Insert(e.ctx, frame, false) + if _, err := e.dataTimeReel.Insert(e.ctx, frame); err != nil { + e.logger.Debug("could not insert frame", zap.Error(err)) + } } return nil diff --git a/node/consensus/master/broadcast_messaging.go b/node/consensus/master/broadcast_messaging.go index cde9b3f..9119d7d 100644 --- a/node/consensus/master/broadcast_messaging.go +++ b/node/consensus/master/broadcast_messaging.go @@ -155,7 +155,7 @@ func (e *MasterClockConsensusEngine) publishProof( zap.Uint64("frame_number", frame.FrameNumber), ) - e.masterTimeReel.Insert(context.TODO(), frame, false) + e.masterTimeReel.Insert(context.TODO(), frame) } e.state = consensus.EngineStateCollecting diff --git a/node/consensus/master/master_clock_consensus_engine.go b/node/consensus/master/master_clock_consensus_engine.go index 54fa8f7..e7ea32f 100644 --- a/node/consensus/master/master_clock_consensus_engine.go +++ b/node/consensus/master/master_clock_consensus_engine.go @@ -208,7 +208,7 @@ func (e *MasterClockConsensusEngine) Start() <-chan error { continue } - e.masterTimeReel.Insert(context.TODO(), newFrame, false) + e.masterTimeReel.Insert(context.TODO(), newFrame) } } }() diff --git a/node/consensus/time/data_time_reel.go b/node/consensus/time/data_time_reel.go index 25e08f7..f5f9ec4 100644 --- a/node/consensus/time/data_time_reel.go +++ b/node/consensus/time/data_time_reel.go @@ -29,6 +29,7 @@ type pendingFrame struct { selector *big.Int parentSelector *big.Int frameNumber uint64 + done chan struct{} } type DataTimeReel struct { @@ -190,12 +191,18 @@ func (d *DataTimeReel) Head() (*protobufs.ClockFrame, error) { return d.head, nil } +var alreadyDone chan struct{} = func() chan struct{} { + done := make(chan struct{}) + close(done) + return done +}() + // Insert enqueues a structurally valid frame into the time reel. If the frame // is the next one in sequence, it advances the reel head forward and emits a // new frame on the new frame channel. -func (d *DataTimeReel) Insert(ctx context.Context, frame *protobufs.ClockFrame, isSync bool) error { +func (d *DataTimeReel) Insert(ctx context.Context, frame *protobufs.ClockFrame) (<-chan struct{}, error) { if err := d.ctx.Err(); err != nil { - return err + return nil, err } d.logger.Debug( @@ -222,21 +229,24 @@ func (d *DataTimeReel) Insert(ctx context.Context, frame *protobufs.ClockFrame, d.storePending(selector, parent, distance, frame) if d.head.FrameNumber+1 == frame.FrameNumber { + done := make(chan struct{}) select { case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() case <-d.ctx.Done(): - return d.ctx.Err() + return nil, d.ctx.Err() case d.frames <- &pendingFrame{ selector: selector, parentSelector: parent, frameNumber: frame.FrameNumber, + done: done, }: + return done, nil } } } - return nil + return alreadyDone, nil } func ( @@ -393,6 +403,7 @@ func (d *DataTimeReel) runLoop() { // Otherwise set it as the next and process all pending if err = d.setHead(rawFrame, distance); err != nil { + close(frame.done) continue } d.processPending(d.head, frame) @@ -559,6 +570,7 @@ func (d *DataTimeReel) processPending( frame *protobufs.ClockFrame, lastReceived *pendingFrame, ) { + defer close(lastReceived.done) // d.logger.Debug( // "process pending", // zap.Uint64("head_frame", frame.FrameNumber), diff --git a/node/consensus/time/data_time_reel_test.go b/node/consensus/time/data_time_reel_test.go index 43cc07d..3ebce28 100644 --- a/node/consensus/time/data_time_reel_test.go +++ b/node/consensus/time/data_time_reel_test.go @@ -233,7 +233,7 @@ func TestDataTimeReel(t *testing.T) { i+1, 10, ) - d.Insert(ctx, frame, false) + d.Insert(ctx, frame) prevBI, _ := frame.GetSelector() prev = prevBI.FillBytes(make([]byte, 32)) } @@ -264,7 +264,7 @@ func TestDataTimeReel(t *testing.T) { } for i := 99; i >= 0; i-- { - err := d.Insert(ctx, insertFrames[i], false) + _, err := d.Insert(ctx, insertFrames[i]) assert.NoError(t, err) } @@ -286,7 +286,7 @@ func TestDataTimeReel(t *testing.T) { i+1, 10, ) - d.Insert(ctx, frame, false) + d.Insert(ctx, frame) prevBI, _ := frame.GetSelector() prev = prevBI.FillBytes(make([]byte, 32)) @@ -334,7 +334,7 @@ func TestDataTimeReel(t *testing.T) { } for i := 99; i >= 0; i-- { - err := d.Insert(ctx, insertFrames[i], false) + _, err := d.Insert(ctx, insertFrames[i]) assert.NoError(t, err) } @@ -397,7 +397,7 @@ func TestDataTimeReel(t *testing.T) { // Someone is honest, but running backwards: for i := 99; i >= 0; i-- { - err := d.Insert(ctx, insertFrames[i], false) + _, err := d.Insert(ctx, insertFrames[i]) gotime.Sleep(1 * gotime.Second) assert.NoError(t, err) } diff --git a/node/consensus/time/master_time_reel.go b/node/consensus/time/master_time_reel.go index 70ef4d3..fcfe0d6 100644 --- a/node/consensus/time/master_time_reel.go +++ b/node/consensus/time/master_time_reel.go @@ -123,13 +123,12 @@ func (m *MasterTimeReel) Head() (*protobufs.ClockFrame, error) { func (m *MasterTimeReel) Insert( ctx context.Context, frame *protobufs.ClockFrame, - isSync bool, -) error { +) (<-chan struct{}, error) { go func() { m.frames <- frame }() - return nil + return alreadyDone, nil } // NewFrameCh implements TimeReel. diff --git a/node/consensus/time/master_time_reel_test.go b/node/consensus/time/master_time_reel_test.go index 6332d3f..4184598 100644 --- a/node/consensus/time/master_time_reel_test.go +++ b/node/consensus/time/master_time_reel_test.go @@ -61,7 +61,7 @@ func TestMasterTimeReel(t *testing.T) { ) assert.NoError(t, err) - err := m.Insert(ctx, frame, false) + _, err := m.Insert(ctx, frame) assert.NoError(t, err) } @@ -81,7 +81,7 @@ func TestMasterTimeReel(t *testing.T) { } for i := 99; i >= 0; i-- { - err := m.Insert(ctx, insertFrames[i], false) + _, err := m.Insert(ctx, insertFrames[i]) assert.NoError(t, err) } diff --git a/node/consensus/time/time_reel.go b/node/consensus/time/time_reel.go index 489b892..26719d2 100644 --- a/node/consensus/time/time_reel.go +++ b/node/consensus/time/time_reel.go @@ -9,7 +9,7 @@ import ( type TimeReel interface { Start() error Stop() - Insert(ctx context.Context, frame *protobufs.ClockFrame, isSync bool) error + Insert(ctx context.Context, frame *protobufs.ClockFrame) (<-chan struct{}, error) Head() (*protobufs.ClockFrame, error) NewFrameCh() <-chan *protobufs.ClockFrame BadFrameCh() <-chan *protobufs.ClockFrame