resolve timeout hanging

This commit is contained in:
Cassandra Heart 2025-11-02 04:33:51 -06:00
parent 045d07e924
commit 89f15bae36
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
10 changed files with 145 additions and 37 deletions

View File

@ -49,7 +49,7 @@ func NewTestPacemaker[
notifier consensus.Consumer[StateT, VoteT],
store consensus.ConsensusStore[VoteT],
) *TestPacemaker[StateT, VoteT, PeerIDT, CollectedT] {
p, err := pacemaker.NewPacemaker[StateT, VoteT](timeoutController, proposalDelayProvider, notifier, store)
p, err := pacemaker.NewPacemaker[StateT, VoteT](timeoutController, proposalDelayProvider, notifier, store, helper.Logger())
if err != nil {
panic(err)
}

View File

@ -404,3 +404,64 @@ func stringFromValue(param consensus.LogParam) string {
func Logger() *FmtLog {
return &FmtLog{}
}
type BufferLog struct {
params []consensus.LogParam
b *strings.Builder
}
// Error implements consensus.TraceLogger.
func (n *BufferLog) Error(message string, err error, params ...consensus.LogParam) {
n.b.WriteString(fmt.Sprintf("ERROR: %s: %v\n", message, err))
for _, param := range n.params {
n.b.WriteString(fmt.Sprintf(
"\t%s: %s\n",
param.GetKey(),
stringFromValue(param),
))
}
for _, param := range params {
n.b.WriteString(fmt.Sprintf(
"\t%s: %s\n",
param.GetKey(),
stringFromValue(param),
))
}
}
// Trace implements consensus.TraceLogger.
func (n *BufferLog) Trace(message string, params ...consensus.LogParam) {
n.b.WriteString(fmt.Sprintf("TRACE: %s\n", message))
n.b.WriteString(fmt.Sprintf("\t[%s]\n", time.Now().String()))
for _, param := range n.params {
n.b.WriteString(fmt.Sprintf(
"\t%s: %s\n",
param.GetKey(),
stringFromValue(param),
))
}
for _, param := range params {
n.b.WriteString(fmt.Sprintf(
"\t%s: %s\n",
param.GetKey(),
stringFromValue(param),
))
}
}
func (n *BufferLog) Flush() {
fmt.Println(n.b.String())
}
func (n *BufferLog) With(params ...consensus.LogParam) consensus.TraceLogger {
return &BufferLog{
params: slices.Concat(n.params, params),
b: n.b,
}
}
func BufferLogger() *BufferLog {
return &BufferLog{
b: &strings.Builder{},
}
}

View File

@ -384,7 +384,7 @@ func NewInstance(t *testing.T, options ...Option) *Instance {
// initialize the pacemaker
controller := timeout.NewController(cfg.Timeouts)
in.pacemaker, err = pacemaker.NewPacemaker[*helper.TestState, *helper.TestVote](controller, pacemaker.NoProposalDelay(), notifier, in.persist)
in.pacemaker, err = pacemaker.NewPacemaker[*helper.TestState, *helper.TestVote](controller, pacemaker.NoProposalDelay(), notifier, in.persist, in.logger)
require.NoError(t, err)
// initialize the forks handler
@ -473,8 +473,8 @@ func NewInstance(t *testing.T, options ...Option) *Instance {
Bitmask: bitmask,
PublicKey: make([]byte, 585),
}, nil
})
sigAgg.On("VerifySignatureRaw", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(true, nil)
}).Maybe()
sigAgg.On("VerifySignatureRaw", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(true, nil).Maybe()
createCollectorFactoryMethod := votecollector.NewStateMachineFactory(in.logger, voteAggregationDistributor, voteProcessorFactory.Create, []byte{}, sigAgg)
voteCollectors := voteaggregator.NewVoteCollectors(in.logger, livenessData.CurrentRank, workerpool.New(2), createCollectorFactoryMethod)
@ -600,7 +600,9 @@ func NewInstance(t *testing.T, options ...Option) *Instance {
)
require.NoError(t, err)
timeoutAggregationDistributor.AddTimeoutCollectorConsumer(logConsumer)
timeoutAggregationDistributor.AddTimeoutCollectorConsumer(&in)
voteAggregationDistributor.AddVoteCollectorConsumer(logConsumer)
return &in
}
@ -624,7 +626,6 @@ func (in *Instance) Run(t *testing.T) error {
// run until an error or stop condition is reached
for {
// check on stop conditions
if in.stop(in) {
return errStopCondition
@ -635,7 +636,7 @@ func (in *Instance) Run(t *testing.T) error {
case <-in.handler.TimeoutChannel():
err := in.handler.OnLocalTimeout()
if err != nil {
return fmt.Errorf("could not process timeout: %w", err)
panic(fmt.Errorf("could not process timeout: %w", err))
}
default:
}

View File

@ -1,6 +1,7 @@
package integration
import (
"encoding/hex"
"errors"
"fmt"
"sync"
@ -46,8 +47,9 @@ func Test2TimeoutOutof7Instances(t *testing.T) {
in := NewInstance(t,
WithRoot(root),
WithParticipants(participants),
WithLocalID(participants[n].Identity()),
WithTimeouts(timeouts),
WithBufferLogger(),
WithLocalID(participants[n].Identity()),
WithLoggerParams(consensus.StringParam("status", "healthy")),
WithStopCondition(RankFinalized(finalRank)),
)
@ -59,8 +61,9 @@ func Test2TimeoutOutof7Instances(t *testing.T) {
in := NewInstance(t,
WithRoot(root),
WithParticipants(participants),
WithLocalID(participants[n].Identity()),
WithTimeouts(timeouts),
WithBufferLogger(),
WithLocalID(participants[n].Identity()),
WithLoggerParams(consensus.StringParam("status", "unhealthy")),
WithStopCondition(RankFinalized(finalRank)),
WithOutgoingVotes(DropAllVotes),
@ -82,7 +85,14 @@ func Test2TimeoutOutof7Instances(t *testing.T) {
wg.Done()
}(in)
}
unittest.AssertReturnsBefore(t, wg.Wait, 10*time.Second, "expect to finish before timeout")
unittest.AssertReturnsBefore(t, wg.Wait, 20*time.Second, "expect to finish before timeout")
for i, in := range instances {
fmt.Println("=============================================================================")
fmt.Println("INSTANCE", i, "-", hex.EncodeToString([]byte(in.localID)))
fmt.Println("=============================================================================")
in.logger.(*helper.BufferLog).Flush()
}
// check that all instances have the same finalized state
ref := instances[0]
@ -377,7 +387,7 @@ func TestAsyncClusterStartup(t *testing.T) {
WithOutgoingTimeoutStates(func(object *models.TimeoutState[*helper.TestVote]) bool {
lock.Lock()
defer lock.Unlock()
timeoutStateGenerated[fmt.Sprintf("%d", object.Rank)] = struct{}{}
timeoutStateGenerated[(*object.Vote).ID] = struct{}{}
// start allowing timeouts when every node has generated one
// when nodes will broadcast again, it will go through
return len(timeoutStateGenerated) != replicas
@ -399,7 +409,7 @@ func TestAsyncClusterStartup(t *testing.T) {
wg.Done()
}(in)
}
unittest.AssertReturnsBefore(t, wg.Wait, 10*time.Second, "expect to finish before timeout")
unittest.AssertReturnsBefore(t, wg.Wait, 20*time.Second, "expect to finish before timeout")
// check that all instances have the same finalized state
ref := instances[0]

View File

@ -54,6 +54,12 @@ func WithTimeouts(timeouts timeout.Config) Option {
}
}
func WithBufferLogger() Option {
return func(cfg *Config) {
cfg.Logger = helper.BufferLogger()
}
}
func WithLoggerParams(params ...consensus.LogParam) Option {
return func(cfg *Config) {
cfg.Logger = cfg.Logger.With(params...)

View File

@ -234,19 +234,24 @@ func (lc *LogConsumer[StateT, VoteT]) OnVoteProcessed(vote *VoteT) {
func (lc *LogConsumer[StateT, VoteT]) OnTimeoutProcessed(
timeout *models.TimeoutState[VoteT],
) {
lc.log.With(
logger := lc.log.With(
consensus.Uint64Param("timeout_rank", timeout.Rank),
consensus.Uint64Param(
"timeout_newest_qc_rank",
timeout.LatestQuorumCertificate.GetRank(),
),
consensus.Uint64Param(
"timeout_last_tc_rank",
timeout.PriorRankTimeoutCertificate.GetRank(),
),
consensus.IdentityParam("timeout_vote_id", (*timeout.Vote).Identity()),
consensus.Uint64Param("timeout_tick", timeout.TimeoutTick),
).Trace("processed valid timeout object")
)
if timeout.PriorRankTimeoutCertificate != nil {
logger = logger.With(
consensus.Uint64Param(
"timeout_last_tc_rank",
timeout.PriorRankTimeoutCertificate.GetRank(),
),
)
}
logger.Trace("processed valid timeout object")
}
func (lc *LogConsumer[StateT, VoteT]) OnCurrentRankDetails(
@ -322,20 +327,25 @@ func (lc *LogConsumer[StateT, VoteT]) OnInvalidTimeoutDetected(
err models.InvalidTimeoutError[VoteT],
) {
timeout := err.Timeout
lc.log.With(
logger := lc.log.With(
consensus.StringParam("suspicious", "true"),
consensus.Uint64Param("timeout_rank", timeout.Rank),
consensus.Uint64Param(
"timeout_newest_qc_rank",
timeout.LatestQuorumCertificate.GetRank(),
),
consensus.Uint64Param(
"timeout_last_tc_rank",
timeout.PriorRankTimeoutCertificate.GetRank(),
),
consensus.IdentityParam("timeout_vote_id", (*timeout.Vote).Identity()),
consensus.Uint64Param("timeout_tick", timeout.TimeoutTick),
).Error("invalid timeout detected", err)
)
if timeout.PriorRankTimeoutCertificate != nil {
logger = logger.With(
consensus.Uint64Param(
"timeout_last_tc_rank",
timeout.PriorRankTimeoutCertificate.GetRank(),
),
)
}
logger.Error("invalid timeout detected", err)
}
func (lc *LogConsumer[StateT, VoteT]) logBasicStateData(

View File

@ -30,6 +30,7 @@ type Pacemaker[StateT models.Unique, VoteT models.Unique] struct {
consensus.ProposalDurationProvider
ctx context.Context
tracer consensus.TraceLogger
timeoutControl *timeout.Controller
notifier consensus.ParticipantConsumer[StateT, VoteT]
rankTracker rankTracker[StateT, VoteT]
@ -51,6 +52,7 @@ func NewPacemaker[StateT models.Unique, VoteT models.Unique](
proposalDurationProvider consensus.ProposalDurationProvider,
notifier consensus.Consumer[StateT, VoteT],
store consensus.ConsensusStore[VoteT],
tracer consensus.TraceLogger,
recovery ...recoveryInformation[StateT, VoteT],
) (*Pacemaker[StateT, VoteT], error) {
vt, err := newRankTracker[StateT, VoteT](store)
@ -63,6 +65,7 @@ func NewPacemaker[StateT models.Unique, VoteT models.Unique](
timeoutControl: timeoutController,
notifier: notifier,
rankTracker: vt,
tracer: tracer,
started: false,
}
for _, recoveryAction := range recovery {
@ -162,6 +165,11 @@ func (p *Pacemaker[StateT, VoteT]) ReceiveTimeoutCertificate(
err,
)
}
p.tracer.Trace(
"pacemaker receive tc",
consensus.Uint64Param("resulting_rank", resultingRank),
consensus.Uint64Param("initial_rank", initialRank),
)
if resultingRank <= initialRank {
return nil, nil
}

View File

@ -70,7 +70,7 @@ func (s *PacemakerTestSuite) SetupTest() {
s.store.On("GetLivenessState").Return(livenessState, nil)
// init Pacemaker and start
s.pacemaker, err = NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store)
s.pacemaker, err = NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger())
require.NoError(s.T(), err)
var ctx context.Context
@ -335,7 +335,7 @@ func (s *PacemakerTestSuite) Test_Initialization() {
// test that the constructor finds the newest QC and TC
s.Run("Random TCs and QCs combined", func() {
pm, err := NewPacemaker(
timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store,
timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(),
WithQCs[*helper.TestState, *helper.TestVote](qcs...), WithTCs[*helper.TestState, *helper.TestVote](tcs...),
)
require.NoError(s.T(), err)
@ -355,7 +355,7 @@ func (s *PacemakerTestSuite) Test_Initialization() {
tcs[45] = helper.MakeTC(helper.WithTCRank(highestRank+15), helper.WithTCNewestQC(QC(highestRank+12)))
pm, err := NewPacemaker(
timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store,
timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(),
WithTCs[*helper.TestState, *helper.TestVote](tcs...), WithQCs[*helper.TestState, *helper.TestVote](qcs...),
)
require.NoError(s.T(), err)
@ -375,7 +375,7 @@ func (s *PacemakerTestSuite) Test_Initialization() {
tcs[45] = helper.MakeTC(helper.WithTCRank(highestRank+15), helper.WithTCNewestQC(QC(highestRank+15)))
pm, err := NewPacemaker(
timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store,
timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(),
WithTCs[*helper.TestState, *helper.TestVote](tcs...), WithQCs[*helper.TestState, *helper.TestVote](qcs...),
)
require.NoError(s.T(), err)
@ -391,11 +391,11 @@ func (s *PacemakerTestSuite) Test_Initialization() {
// Verify that WithTCs still works correctly if no TCs are given:
// the list of TCs is empty or all contained TCs are nil
s.Run("Only nil TCs", func() {
pm, err := NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, WithTCs[*helper.TestState, *helper.TestVote]())
pm, err := NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(), WithTCs[*helper.TestState, *helper.TestVote]())
require.NoError(s.T(), err)
require.Equal(s.T(), s.initialRank, pm.CurrentRank())
pm, err = NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, WithTCs[*helper.TestState, *helper.TestVote](nil, nil, nil))
pm, err = NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(), WithTCs[*helper.TestState, *helper.TestVote](nil, nil, nil))
require.NoError(s.T(), err)
require.Equal(s.T(), s.initialRank, pm.CurrentRank())
})
@ -403,11 +403,11 @@ func (s *PacemakerTestSuite) Test_Initialization() {
// Verify that WithQCs still works correctly if no QCs are given:
// the list of QCs is empty or all contained QCs are nil
s.Run("Only nil QCs", func() {
pm, err := NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, WithQCs[*helper.TestState, *helper.TestVote]())
pm, err := NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(), WithQCs[*helper.TestState, *helper.TestVote]())
require.NoError(s.T(), err)
require.Equal(s.T(), s.initialRank, pm.CurrentRank())
pm, err = NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, WithQCs[*helper.TestState, *helper.TestVote](nil, nil, nil))
pm, err = NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(), WithQCs[*helper.TestState, *helper.TestVote](nil, nil, nil))
require.NoError(s.T(), err)
require.Equal(s.T(), s.initialRank, pm.CurrentRank())
})
@ -417,7 +417,7 @@ func (s *PacemakerTestSuite) Test_Initialization() {
// TestProposalDuration tests that the active pacemaker forwards proposal duration values from the provider.
func (s *PacemakerTestSuite) TestProposalDuration() {
proposalDurationProvider := NewStaticProposalDurationProvider(time.Millisecond * 500)
pm, err := NewPacemaker(timeout.NewController(s.timeoutConf), &proposalDurationProvider, s.notifier, s.store)
pm, err := NewPacemaker(timeout.NewController(s.timeoutConf), &proposalDurationProvider, s.notifier, s.store, helper.Logger())
require.NoError(s.T(), err)
now := time.Now().UTC()

View File

@ -87,7 +87,7 @@ func NewTimeoutAggregator[VoteT models.Unique](
// about pending work and as soon as there is some it triggers processing.
func (
t *TimeoutAggregator[VoteT],
) queuedTimeoutsProcessingLoop(ctx context.Context) {
) queuedTimeoutsProcessingLoop(ctx lifecycle.SignalerContext) {
defer t.wg.Done()
notifier := t.queuedTimeoutsNotifier
for {
@ -98,7 +98,10 @@ func (
t.tracer.Trace("notified for queued timeout state")
err := t.processQueuedTimeoutStates(ctx)
if err != nil {
t.tracer.Error("fatal error encountered", err)
ctx.Throw(fmt.Errorf(
"internal error processing queued timeout events: %w",
err,
))
return
}
}
@ -118,8 +121,6 @@ func (t *TimeoutAggregator[VoteT]) processQueuedTimeoutStates(
return nil
case timeoutState, ok := <-t.queuedTimeouts:
if !ok {
// when there is no more messages in the queue, back to the loop to wait
// for the next incoming message to arrive.
return nil
}
@ -133,6 +134,10 @@ func (t *TimeoutAggregator[VoteT]) processQueuedTimeoutStates(
}
t.tracer.Trace("TimeoutState processed successfully")
default:
// when there is no more messages in the queue, back to the loop to wait
// for the next incoming message to arrive.
return nil
}
}
}

View File

@ -206,6 +206,10 @@ func (p *TimeoutProcessor[StateT, VoteT, PeerIDT]) Process(
// true only once.
willBuildTC := p.tcTracker.Track(totalWeight)
if !willBuildTC {
p.tracer.Trace(
"insufficient weight to build tc",
consensus.Uint64Param("total_weight", totalWeight),
)
// either we do not have enough timeouts to build a TC, or another thread
// has already passed this gate and created a TC
return nil
@ -216,7 +220,10 @@ func (p *TimeoutProcessor[StateT, VoteT, PeerIDT]) Process(
return fmt.Errorf("internal error constructing TC: %w", err)
}
p.notifier.OnTimeoutCertificateConstructedFromTimeouts(*tc)
p.tracer.Trace(
"timeout constructed from timeouts",
consensus.Uint64Param("rank", (*tc).GetRank()),
)
return nil
}