diff --git a/consensus/eventhandler/event_handler_test.go b/consensus/eventhandler/event_handler_test.go index 48269fb..8b4e7cf 100644 --- a/consensus/eventhandler/event_handler_test.go +++ b/consensus/eventhandler/event_handler_test.go @@ -49,7 +49,7 @@ func NewTestPacemaker[ notifier consensus.Consumer[StateT, VoteT], store consensus.ConsensusStore[VoteT], ) *TestPacemaker[StateT, VoteT, PeerIDT, CollectedT] { - p, err := pacemaker.NewPacemaker[StateT, VoteT](timeoutController, proposalDelayProvider, notifier, store) + p, err := pacemaker.NewPacemaker[StateT, VoteT](timeoutController, proposalDelayProvider, notifier, store, helper.Logger()) if err != nil { panic(err) } diff --git a/consensus/helper/state.go b/consensus/helper/state.go index 93b6871..f644287 100644 --- a/consensus/helper/state.go +++ b/consensus/helper/state.go @@ -404,3 +404,64 @@ func stringFromValue(param consensus.LogParam) string { func Logger() *FmtLog { return &FmtLog{} } + +type BufferLog struct { + params []consensus.LogParam + b *strings.Builder +} + +// Error implements consensus.TraceLogger. +func (n *BufferLog) Error(message string, err error, params ...consensus.LogParam) { + n.b.WriteString(fmt.Sprintf("ERROR: %s: %v\n", message, err)) + for _, param := range n.params { + n.b.WriteString(fmt.Sprintf( + "\t%s: %s\n", + param.GetKey(), + stringFromValue(param), + )) + } + for _, param := range params { + n.b.WriteString(fmt.Sprintf( + "\t%s: %s\n", + param.GetKey(), + stringFromValue(param), + )) + } +} + +// Trace implements consensus.TraceLogger. +func (n *BufferLog) Trace(message string, params ...consensus.LogParam) { + n.b.WriteString(fmt.Sprintf("TRACE: %s\n", message)) + n.b.WriteString(fmt.Sprintf("\t[%s]\n", time.Now().String())) + for _, param := range n.params { + n.b.WriteString(fmt.Sprintf( + "\t%s: %s\n", + param.GetKey(), + stringFromValue(param), + )) + } + for _, param := range params { + n.b.WriteString(fmt.Sprintf( + "\t%s: %s\n", + param.GetKey(), + stringFromValue(param), + )) + } +} + +func (n *BufferLog) Flush() { + fmt.Println(n.b.String()) +} + +func (n *BufferLog) With(params ...consensus.LogParam) consensus.TraceLogger { + return &BufferLog{ + params: slices.Concat(n.params, params), + b: n.b, + } +} + +func BufferLogger() *BufferLog { + return &BufferLog{ + b: &strings.Builder{}, + } +} diff --git a/consensus/integration/instance_test.go b/consensus/integration/instance_test.go index 9fd20ae..a179b6a 100644 --- a/consensus/integration/instance_test.go +++ b/consensus/integration/instance_test.go @@ -384,7 +384,7 @@ func NewInstance(t *testing.T, options ...Option) *Instance { // initialize the pacemaker controller := timeout.NewController(cfg.Timeouts) - in.pacemaker, err = pacemaker.NewPacemaker[*helper.TestState, *helper.TestVote](controller, pacemaker.NoProposalDelay(), notifier, in.persist) + in.pacemaker, err = pacemaker.NewPacemaker[*helper.TestState, *helper.TestVote](controller, pacemaker.NoProposalDelay(), notifier, in.persist, in.logger) require.NoError(t, err) // initialize the forks handler @@ -473,8 +473,8 @@ func NewInstance(t *testing.T, options ...Option) *Instance { Bitmask: bitmask, PublicKey: make([]byte, 585), }, nil - }) - sigAgg.On("VerifySignatureRaw", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(true, nil) + }).Maybe() + sigAgg.On("VerifySignatureRaw", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(true, nil).Maybe() createCollectorFactoryMethod := votecollector.NewStateMachineFactory(in.logger, voteAggregationDistributor, voteProcessorFactory.Create, []byte{}, sigAgg) voteCollectors := voteaggregator.NewVoteCollectors(in.logger, livenessData.CurrentRank, workerpool.New(2), createCollectorFactoryMethod) @@ -600,7 +600,9 @@ func NewInstance(t *testing.T, options ...Option) *Instance { ) require.NoError(t, err) + timeoutAggregationDistributor.AddTimeoutCollectorConsumer(logConsumer) timeoutAggregationDistributor.AddTimeoutCollectorConsumer(&in) + voteAggregationDistributor.AddVoteCollectorConsumer(logConsumer) return &in } @@ -624,7 +626,6 @@ func (in *Instance) Run(t *testing.T) error { // run until an error or stop condition is reached for { - // check on stop conditions if in.stop(in) { return errStopCondition @@ -635,7 +636,7 @@ func (in *Instance) Run(t *testing.T) error { case <-in.handler.TimeoutChannel(): err := in.handler.OnLocalTimeout() if err != nil { - return fmt.Errorf("could not process timeout: %w", err) + panic(fmt.Errorf("could not process timeout: %w", err)) } default: } diff --git a/consensus/integration/liveness_test.go b/consensus/integration/liveness_test.go index 7acda71..a196ccb 100644 --- a/consensus/integration/liveness_test.go +++ b/consensus/integration/liveness_test.go @@ -1,6 +1,7 @@ package integration import ( + "encoding/hex" "errors" "fmt" "sync" @@ -46,8 +47,9 @@ func Test2TimeoutOutof7Instances(t *testing.T) { in := NewInstance(t, WithRoot(root), WithParticipants(participants), - WithLocalID(participants[n].Identity()), WithTimeouts(timeouts), + WithBufferLogger(), + WithLocalID(participants[n].Identity()), WithLoggerParams(consensus.StringParam("status", "healthy")), WithStopCondition(RankFinalized(finalRank)), ) @@ -59,8 +61,9 @@ func Test2TimeoutOutof7Instances(t *testing.T) { in := NewInstance(t, WithRoot(root), WithParticipants(participants), - WithLocalID(participants[n].Identity()), WithTimeouts(timeouts), + WithBufferLogger(), + WithLocalID(participants[n].Identity()), WithLoggerParams(consensus.StringParam("status", "unhealthy")), WithStopCondition(RankFinalized(finalRank)), WithOutgoingVotes(DropAllVotes), @@ -82,7 +85,14 @@ func Test2TimeoutOutof7Instances(t *testing.T) { wg.Done() }(in) } - unittest.AssertReturnsBefore(t, wg.Wait, 10*time.Second, "expect to finish before timeout") + unittest.AssertReturnsBefore(t, wg.Wait, 20*time.Second, "expect to finish before timeout") + + for i, in := range instances { + fmt.Println("=============================================================================") + fmt.Println("INSTANCE", i, "-", hex.EncodeToString([]byte(in.localID))) + fmt.Println("=============================================================================") + in.logger.(*helper.BufferLog).Flush() + } // check that all instances have the same finalized state ref := instances[0] @@ -377,7 +387,7 @@ func TestAsyncClusterStartup(t *testing.T) { WithOutgoingTimeoutStates(func(object *models.TimeoutState[*helper.TestVote]) bool { lock.Lock() defer lock.Unlock() - timeoutStateGenerated[fmt.Sprintf("%d", object.Rank)] = struct{}{} + timeoutStateGenerated[(*object.Vote).ID] = struct{}{} // start allowing timeouts when every node has generated one // when nodes will broadcast again, it will go through return len(timeoutStateGenerated) != replicas @@ -399,7 +409,7 @@ func TestAsyncClusterStartup(t *testing.T) { wg.Done() }(in) } - unittest.AssertReturnsBefore(t, wg.Wait, 10*time.Second, "expect to finish before timeout") + unittest.AssertReturnsBefore(t, wg.Wait, 20*time.Second, "expect to finish before timeout") // check that all instances have the same finalized state ref := instances[0] diff --git a/consensus/integration/options_test.go b/consensus/integration/options_test.go index 0f178fe..db994ce 100644 --- a/consensus/integration/options_test.go +++ b/consensus/integration/options_test.go @@ -54,6 +54,12 @@ func WithTimeouts(timeouts timeout.Config) Option { } } +func WithBufferLogger() Option { + return func(cfg *Config) { + cfg.Logger = helper.BufferLogger() + } +} + func WithLoggerParams(params ...consensus.LogParam) Option { return func(cfg *Config) { cfg.Logger = cfg.Logger.With(params...) diff --git a/consensus/notifications/log_consumer.go b/consensus/notifications/log_consumer.go index 1117037..778eaa7 100644 --- a/consensus/notifications/log_consumer.go +++ b/consensus/notifications/log_consumer.go @@ -234,19 +234,24 @@ func (lc *LogConsumer[StateT, VoteT]) OnVoteProcessed(vote *VoteT) { func (lc *LogConsumer[StateT, VoteT]) OnTimeoutProcessed( timeout *models.TimeoutState[VoteT], ) { - lc.log.With( + logger := lc.log.With( consensus.Uint64Param("timeout_rank", timeout.Rank), consensus.Uint64Param( "timeout_newest_qc_rank", timeout.LatestQuorumCertificate.GetRank(), ), - consensus.Uint64Param( - "timeout_last_tc_rank", - timeout.PriorRankTimeoutCertificate.GetRank(), - ), consensus.IdentityParam("timeout_vote_id", (*timeout.Vote).Identity()), consensus.Uint64Param("timeout_tick", timeout.TimeoutTick), - ).Trace("processed valid timeout object") + ) + if timeout.PriorRankTimeoutCertificate != nil { + logger = logger.With( + consensus.Uint64Param( + "timeout_last_tc_rank", + timeout.PriorRankTimeoutCertificate.GetRank(), + ), + ) + } + logger.Trace("processed valid timeout object") } func (lc *LogConsumer[StateT, VoteT]) OnCurrentRankDetails( @@ -322,20 +327,25 @@ func (lc *LogConsumer[StateT, VoteT]) OnInvalidTimeoutDetected( err models.InvalidTimeoutError[VoteT], ) { timeout := err.Timeout - lc.log.With( + logger := lc.log.With( consensus.StringParam("suspicious", "true"), consensus.Uint64Param("timeout_rank", timeout.Rank), consensus.Uint64Param( "timeout_newest_qc_rank", timeout.LatestQuorumCertificate.GetRank(), ), - consensus.Uint64Param( - "timeout_last_tc_rank", - timeout.PriorRankTimeoutCertificate.GetRank(), - ), consensus.IdentityParam("timeout_vote_id", (*timeout.Vote).Identity()), consensus.Uint64Param("timeout_tick", timeout.TimeoutTick), - ).Error("invalid timeout detected", err) + ) + if timeout.PriorRankTimeoutCertificate != nil { + logger = logger.With( + consensus.Uint64Param( + "timeout_last_tc_rank", + timeout.PriorRankTimeoutCertificate.GetRank(), + ), + ) + } + logger.Error("invalid timeout detected", err) } func (lc *LogConsumer[StateT, VoteT]) logBasicStateData( diff --git a/consensus/pacemaker/pacemaker.go b/consensus/pacemaker/pacemaker.go index f766bf5..189bcfe 100644 --- a/consensus/pacemaker/pacemaker.go +++ b/consensus/pacemaker/pacemaker.go @@ -30,6 +30,7 @@ type Pacemaker[StateT models.Unique, VoteT models.Unique] struct { consensus.ProposalDurationProvider ctx context.Context + tracer consensus.TraceLogger timeoutControl *timeout.Controller notifier consensus.ParticipantConsumer[StateT, VoteT] rankTracker rankTracker[StateT, VoteT] @@ -51,6 +52,7 @@ func NewPacemaker[StateT models.Unique, VoteT models.Unique]( proposalDurationProvider consensus.ProposalDurationProvider, notifier consensus.Consumer[StateT, VoteT], store consensus.ConsensusStore[VoteT], + tracer consensus.TraceLogger, recovery ...recoveryInformation[StateT, VoteT], ) (*Pacemaker[StateT, VoteT], error) { vt, err := newRankTracker[StateT, VoteT](store) @@ -63,6 +65,7 @@ func NewPacemaker[StateT models.Unique, VoteT models.Unique]( timeoutControl: timeoutController, notifier: notifier, rankTracker: vt, + tracer: tracer, started: false, } for _, recoveryAction := range recovery { @@ -162,6 +165,11 @@ func (p *Pacemaker[StateT, VoteT]) ReceiveTimeoutCertificate( err, ) } + p.tracer.Trace( + "pacemaker receive tc", + consensus.Uint64Param("resulting_rank", resultingRank), + consensus.Uint64Param("initial_rank", initialRank), + ) if resultingRank <= initialRank { return nil, nil } diff --git a/consensus/pacemaker/pacemaker_test.go b/consensus/pacemaker/pacemaker_test.go index bccbd39..114ca6f 100644 --- a/consensus/pacemaker/pacemaker_test.go +++ b/consensus/pacemaker/pacemaker_test.go @@ -70,7 +70,7 @@ func (s *PacemakerTestSuite) SetupTest() { s.store.On("GetLivenessState").Return(livenessState, nil) // init Pacemaker and start - s.pacemaker, err = NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store) + s.pacemaker, err = NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger()) require.NoError(s.T(), err) var ctx context.Context @@ -335,7 +335,7 @@ func (s *PacemakerTestSuite) Test_Initialization() { // test that the constructor finds the newest QC and TC s.Run("Random TCs and QCs combined", func() { pm, err := NewPacemaker( - timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, + timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(), WithQCs[*helper.TestState, *helper.TestVote](qcs...), WithTCs[*helper.TestState, *helper.TestVote](tcs...), ) require.NoError(s.T(), err) @@ -355,7 +355,7 @@ func (s *PacemakerTestSuite) Test_Initialization() { tcs[45] = helper.MakeTC(helper.WithTCRank(highestRank+15), helper.WithTCNewestQC(QC(highestRank+12))) pm, err := NewPacemaker( - timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, + timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(), WithTCs[*helper.TestState, *helper.TestVote](tcs...), WithQCs[*helper.TestState, *helper.TestVote](qcs...), ) require.NoError(s.T(), err) @@ -375,7 +375,7 @@ func (s *PacemakerTestSuite) Test_Initialization() { tcs[45] = helper.MakeTC(helper.WithTCRank(highestRank+15), helper.WithTCNewestQC(QC(highestRank+15))) pm, err := NewPacemaker( - timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, + timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(), WithTCs[*helper.TestState, *helper.TestVote](tcs...), WithQCs[*helper.TestState, *helper.TestVote](qcs...), ) require.NoError(s.T(), err) @@ -391,11 +391,11 @@ func (s *PacemakerTestSuite) Test_Initialization() { // Verify that WithTCs still works correctly if no TCs are given: // the list of TCs is empty or all contained TCs are nil s.Run("Only nil TCs", func() { - pm, err := NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, WithTCs[*helper.TestState, *helper.TestVote]()) + pm, err := NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(), WithTCs[*helper.TestState, *helper.TestVote]()) require.NoError(s.T(), err) require.Equal(s.T(), s.initialRank, pm.CurrentRank()) - pm, err = NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, WithTCs[*helper.TestState, *helper.TestVote](nil, nil, nil)) + pm, err = NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(), WithTCs[*helper.TestState, *helper.TestVote](nil, nil, nil)) require.NoError(s.T(), err) require.Equal(s.T(), s.initialRank, pm.CurrentRank()) }) @@ -403,11 +403,11 @@ func (s *PacemakerTestSuite) Test_Initialization() { // Verify that WithQCs still works correctly if no QCs are given: // the list of QCs is empty or all contained QCs are nil s.Run("Only nil QCs", func() { - pm, err := NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, WithQCs[*helper.TestState, *helper.TestVote]()) + pm, err := NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(), WithQCs[*helper.TestState, *helper.TestVote]()) require.NoError(s.T(), err) require.Equal(s.T(), s.initialRank, pm.CurrentRank()) - pm, err = NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, WithQCs[*helper.TestState, *helper.TestVote](nil, nil, nil)) + pm, err = NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(), WithQCs[*helper.TestState, *helper.TestVote](nil, nil, nil)) require.NoError(s.T(), err) require.Equal(s.T(), s.initialRank, pm.CurrentRank()) }) @@ -417,7 +417,7 @@ func (s *PacemakerTestSuite) Test_Initialization() { // TestProposalDuration tests that the active pacemaker forwards proposal duration values from the provider. func (s *PacemakerTestSuite) TestProposalDuration() { proposalDurationProvider := NewStaticProposalDurationProvider(time.Millisecond * 500) - pm, err := NewPacemaker(timeout.NewController(s.timeoutConf), &proposalDurationProvider, s.notifier, s.store) + pm, err := NewPacemaker(timeout.NewController(s.timeoutConf), &proposalDurationProvider, s.notifier, s.store, helper.Logger()) require.NoError(s.T(), err) now := time.Now().UTC() diff --git a/consensus/timeoutaggregator/timeout_aggregator.go b/consensus/timeoutaggregator/timeout_aggregator.go index db3909e..d4edfbb 100644 --- a/consensus/timeoutaggregator/timeout_aggregator.go +++ b/consensus/timeoutaggregator/timeout_aggregator.go @@ -87,7 +87,7 @@ func NewTimeoutAggregator[VoteT models.Unique]( // about pending work and as soon as there is some it triggers processing. func ( t *TimeoutAggregator[VoteT], -) queuedTimeoutsProcessingLoop(ctx context.Context) { +) queuedTimeoutsProcessingLoop(ctx lifecycle.SignalerContext) { defer t.wg.Done() notifier := t.queuedTimeoutsNotifier for { @@ -98,7 +98,10 @@ func ( t.tracer.Trace("notified for queued timeout state") err := t.processQueuedTimeoutStates(ctx) if err != nil { - t.tracer.Error("fatal error encountered", err) + ctx.Throw(fmt.Errorf( + "internal error processing queued timeout events: %w", + err, + )) return } } @@ -118,8 +121,6 @@ func (t *TimeoutAggregator[VoteT]) processQueuedTimeoutStates( return nil case timeoutState, ok := <-t.queuedTimeouts: if !ok { - // when there is no more messages in the queue, back to the loop to wait - // for the next incoming message to arrive. return nil } @@ -133,6 +134,10 @@ func (t *TimeoutAggregator[VoteT]) processQueuedTimeoutStates( } t.tracer.Trace("TimeoutState processed successfully") + default: + // when there is no more messages in the queue, back to the loop to wait + // for the next incoming message to arrive. + return nil } } } diff --git a/consensus/timeoutcollector/timeout_processor.go b/consensus/timeoutcollector/timeout_processor.go index 438880c..c20466b 100644 --- a/consensus/timeoutcollector/timeout_processor.go +++ b/consensus/timeoutcollector/timeout_processor.go @@ -206,6 +206,10 @@ func (p *TimeoutProcessor[StateT, VoteT, PeerIDT]) Process( // true only once. willBuildTC := p.tcTracker.Track(totalWeight) if !willBuildTC { + p.tracer.Trace( + "insufficient weight to build tc", + consensus.Uint64Param("total_weight", totalWeight), + ) // either we do not have enough timeouts to build a TC, or another thread // has already passed this gate and created a TC return nil @@ -216,7 +220,10 @@ func (p *TimeoutProcessor[StateT, VoteT, PeerIDT]) Process( return fmt.Errorf("internal error constructing TC: %w", err) } p.notifier.OnTimeoutCertificateConstructedFromTimeouts(*tc) - + p.tracer.Trace( + "timeout constructed from timeouts", + consensus.Uint64Param("rank", (*tc).GetRank()), + ) return nil }