mostly finalized state for consensus

This commit is contained in:
Cassandra Heart 2025-11-03 03:05:10 -06:00
parent 89f15bae36
commit 7d1320c226
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
35 changed files with 2204 additions and 5544 deletions

View File

@ -13,8 +13,6 @@ type VotingProvider[
VoteT models.Unique,
PeerIDT models.Unique,
] interface {
// Sends a proposal for voting.
SendProposal(ctx context.Context, proposal *StateT) error
// SignVote signs a proposal, produces an output vote for aggregation and
// broadcasting.
SignVote(
@ -42,22 +40,4 @@ type VotingProvider[
latestQuorumCertificateRanks []uint64,
aggregatedSignature models.AggregatedSignature,
) (models.TimeoutCertificate, error)
// Re-publishes a vote message, used to help lagging peers catch up.
SendVote(ctx context.Context, vote *VoteT) (PeerIDT, error)
// IsQuorum returns a response indicating whether or not quorum has been
// reached.
IsQuorum(
ctx context.Context,
proposalVotes map[models.Identity]*VoteT,
) (bool, error)
// FinalizeVotes performs any folding of proposed state required from VoteT
// onto StateT, proposed states and votes matched by PeerIDT, returns
// finalized state, chosen proposer PeerIDT.
FinalizeVotes(
ctx context.Context,
proposals map[models.Identity]*StateT,
proposalVotes map[models.Identity]*VoteT,
) (*StateT, PeerIDT, error)
// SendConfirmation sends confirmation of the finalized state.
SendConfirmation(ctx context.Context, finalized *StateT) error
}

View File

@ -108,7 +108,7 @@ func (e *EventHandler[
"received QC",
consensus.Uint64Param("current_rank", curRank),
consensus.Uint64Param("qc_rank", qc.GetRank()),
consensus.IdentityParam("state_id", qc.GetSelector()),
consensus.IdentityParam("state_id", qc.Identity()),
)
e.notifier.OnReceiveQuorumCertificate(curRank, qc)
defer e.notifier.OnEventProcessed()
@ -148,7 +148,7 @@ func (e *EventHandler[
),
consensus.IdentityParam(
"tc_newest_qc_state_id",
tc.GetLatestQuorumCert().GetSelector(),
tc.GetLatestQuorumCert().Identity(),
),
)
e.notifier.OnReceiveTimeoutCertificate(curRank, tc)
@ -168,7 +168,7 @@ func (e *EventHandler[
),
consensus.IdentityParam(
"tc_newest_qc_state_id",
tc.GetLatestQuorumCert().GetSelector(),
tc.GetLatestQuorumCert().Identity(),
))
return nil
}
@ -183,7 +183,7 @@ func (e *EventHandler[
),
consensus.IdentityParam(
"tc_newest_qc_state_id",
tc.GetLatestQuorumCert().GetSelector(),
tc.GetLatestQuorumCert().Identity(),
))
return e.proposeForNewRankIfPrimary()
}
@ -518,7 +518,7 @@ func (e *EventHandler[
newestQC := e.paceMaker.LatestQuorumCertificate()
previousRankTimeoutCert := e.paceMaker.PriorRankTimeoutCertificate()
_, found := e.forks.GetState(newestQC.GetSelector())
_, found := e.forks.GetState(newestQC.Identity())
if !found {
// we don't know anything about state referenced by our newest QC, in this
// case we can't create a valid proposal since we can't guarantee validity
@ -649,7 +649,7 @@ func (e *EventHandler[
targetPublicationTime := e.paceMaker.TargetPublicationTime(
stateProposal.State.Rank,
start,
stateProposal.State.ParentQuorumCertificate.GetSelector(),
stateProposal.State.ParentQuorumCertificate.Identity(),
) // determine target publication time
e.tracer.Trace(
"forwarding proposal to communicator for broadcasting",
@ -657,7 +657,7 @@ func (e *EventHandler[
consensus.TimeParam("target_publication", targetPublicationTime),
consensus.IdentityParam("state_id", stateProposal.State.Identifier),
consensus.Uint64Param("parent_rank", newestQC.GetRank()),
consensus.IdentityParam("parent_id", newestQC.GetSelector()),
consensus.IdentityParam("parent_id", newestQC.Identity()),
consensus.IdentityParam("signer", stateProposal.State.ProposerID),
)
@ -729,7 +729,7 @@ func (e *EventHandler[
nextLeader models.Identity,
) error {
_, found := e.forks.GetState(
proposal.State.ParentQuorumCertificate.GetSelector(),
proposal.State.ParentQuorumCertificate.Identity(),
)
if !found {
// we don't have parent for this proposal, we can't vote since we can't
@ -759,7 +759,7 @@ func (e *EventHandler[
),
consensus.IdentityParam(
"parent_id",
proposal.State.ParentQuorumCertificate.GetSelector(),
proposal.State.ParentQuorumCertificate.Identity(),
),
consensus.IdentityParam("signer", proposal.State.ProposerID[:]),
)
@ -776,7 +776,7 @@ func (e *EventHandler[
),
consensus.IdentityParam(
"parent_id",
proposal.State.ParentQuorumCertificate.GetSelector(),
proposal.State.ParentQuorumCertificate.Identity(),
),
consensus.IdentityParam("signer", proposal.State.ProposerID[:]),
)

View File

@ -391,7 +391,7 @@ func (es *EventHandlerSuite) TestStartNewRank_ParentProposalNotFound() {
require.NoError(es.T(), err)
require.Equal(es.T(), es.endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
es.forks.AssertCalled(es.T(), "GetState", newestQC.GetSelector())
es.forks.AssertCalled(es.T(), "GetState", newestQC.Identity())
es.notifier.AssertNotCalled(es.T(), "OnOwnProposal", mock.Anything, mock.Anything)
}
@ -449,7 +449,7 @@ func (es *EventHandlerSuite) TestOnReceiveProposal_NoVote_ParentProposalNotFound
proposal := createProposal(es.initRank, es.initRank-1)
// remove parent from known proposals
delete(es.forks.proposals, proposal.State.ParentQuorumCertificate.GetSelector())
delete(es.forks.proposals, proposal.State.ParentQuorumCertificate.Identity())
// no vote for this proposal, no parent found
err := es.eventhandler.OnReceiveProposal(proposal)
@ -842,7 +842,7 @@ func (es *EventHandlerSuite) TestLeaderBuild100States() {
// for first proposal we need to store the parent otherwise it won't be voted for
if i == 0 {
parentState := helper.MakeState(func(state *models.State[*helper.TestState]) {
state.Identifier = proposal.State.ParentQuorumCertificate.GetSelector()
state.Identifier = proposal.State.ParentQuorumCertificate.Identity()
state.Rank = proposal.State.ParentQuorumCertificate.GetRank()
})
es.forks.proposals[parentState.Identifier] = parentState

File diff suppressed because it is too large Load Diff

View File

@ -34,7 +34,7 @@ func NewForks[StateT models.Unique, VoteT models.Unique](
finalizationCallback consensus.Finalizer,
notifier consensus.FollowerConsumer[StateT, VoteT],
) (*Forks[StateT, VoteT], error) {
if (trustedRoot.State.Identifier != trustedRoot.CertifyingQuorumCertificate.GetSelector()) ||
if (trustedRoot.State.Identifier != trustedRoot.CertifyingQuorumCertificate.Identity()) ||
(trustedRoot.State.Rank != trustedRoot.CertifyingQuorumCertificate.GetRank()) {
return nil,
models.NewConfigurationErrorf(
@ -209,12 +209,12 @@ func (f *Forks[StateT, VoteT]) EnsureStateIsValidExtension(
// For a state whose parent is _not_ below the pruning height, we expect the
// parent to be known.
_, isParentKnown := f.forest.GetVertex(
state.ParentQuorumCertificate.GetSelector(),
state.ParentQuorumCertificate.Identity(),
)
if !isParentKnown { // missing parent
return models.MissingStateError{
Rank: state.ParentQuorumCertificate.GetRank(),
Identifier: state.ParentQuorumCertificate.GetSelector(),
Identifier: state.ParentQuorumCertificate.Identity(),
}
}
return nil
@ -338,7 +338,7 @@ func (f *Forks[StateT, VoteT]) AddValidatedState(
// based on certified states.
// The certified parent essentially combines the parent, with the QC contained
// in state, to drive finalization.
parent, found := f.GetState(proposal.ParentQuorumCertificate.GetSelector())
parent, found := f.GetState(proposal.ParentQuorumCertificate.Identity())
if !found {
// Not finding the parent means it is already pruned; hence this state does
// not change the finalization state.
@ -418,7 +418,7 @@ func (f *Forks[StateT, VoteT]) checkForConflictingQCs(
it := f.forest.GetVerticesAtLevel((*qc).GetRank())
for it.HasNext() {
otherState := it.NextVertex() // by construction, must have same rank as qc.Rank
if (*qc).GetSelector() != otherState.VertexID() {
if (*qc).Identity() != otherState.VertexID() {
// * we have just found another state at the same rank number as qc.Rank
// but with different hash
// * if this state has a child c, this child will have
@ -431,7 +431,7 @@ func (f *Forks[StateT, VoteT]) checkForConflictingQCs(
conflictingQC := otherChild.ParentQuorumCertificate
return models.ByzantineThresholdExceededError{Evidence: fmt.Sprintf(
"conflicting QCs at rank %d: %x and %x",
(*qc).GetRank(), (*qc).GetSelector(), conflictingQC.GetSelector(),
(*qc).GetRank(), (*qc).Identity(), conflictingQC.Identity(),
)}
}
}
@ -506,12 +506,12 @@ func (f *Forks[StateT, VoteT]) checkForAdvancingFinalization(
// above
qcForParent := certifiedState.State.ParentQuorumCertificate
parentVertex, parentStateKnown := f.forest.GetVertex(
qcForParent.GetSelector(),
qcForParent.Identity(),
)
if !parentStateKnown {
return models.MissingStateError{
Rank: qcForParent.GetRank(),
Identifier: qcForParent.GetSelector(),
Identifier: qcForParent.Identity(),
}
}
parentState := parentVertex.(*StateContainer[StateT]).GetState()
@ -550,7 +550,7 @@ func (f *Forks[StateT, VoteT]) checkForAdvancingFinalization(
if err != nil {
return fmt.Errorf(
"advancing finalization to state %x from rank %d failed: %w",
qcForParent.GetSelector(),
qcForParent.Identity(),
qcForParent.GetRank(),
err,
)
@ -614,12 +614,12 @@ func (f *Forks[StateT, VoteT]) collectStatesForFinalization(
l := (*qc).GetRank() - lastFinalized.Rank // l is an upper limit to the number of states that can be maximally finalized
statesToBeFinalized := make([]*models.State[StateT], l)
for (*qc).GetRank() > lastFinalized.Rank {
b, ok := f.GetState((*qc).GetSelector())
b, ok := f.GetState((*qc).Identity())
if !ok {
return nil, fmt.Errorf(
"failed to get state (rank=%d, stateID=%x) for finalization",
(*qc).GetRank(),
(*qc).GetSelector(),
(*qc).Identity(),
)
}
l--
@ -641,10 +641,10 @@ func (f *Forks[StateT, VoteT]) collectStatesForFinalization(
)}
}
if (*qc).GetRank() == lastFinalized.Rank &&
lastFinalized.Identifier != (*qc).GetSelector() {
lastFinalized.Identifier != (*qc).Identity() {
return nil, models.ByzantineThresholdExceededError{Evidence: fmt.Sprintf(
"finalizing states with rank %d at conflicting forks: %x and %x",
(*qc).GetRank(), (*qc).GetSelector(), lastFinalized.Identifier,
(*qc).GetRank(), (*qc).Identity(), lastFinalized.Identifier,
)}
}

View File

@ -857,7 +857,7 @@ func addCertifiedStatesToForks(forks *Forks[*helper.TestState, *helper.TestVote]
uncertifiedStates := make(map[models.Identity]*models.State[*helper.TestState])
for _, b := range states {
uncertifiedStates[b.Identifier] = b
parentID := b.ParentQuorumCertificate.GetSelector()
parentID := b.ParentQuorumCertificate.Identity()
parent, found := uncertifiedStates[parentID]
if !found {
continue

View File

@ -37,7 +37,7 @@ func (b *StateContainer[StateT]) Parent() (models.Identity, uint64) {
if b.ParentQuorumCertificate == nil {
return "", 0
}
return b.ParentQuorumCertificate.GetSelector(),
return b.ParentQuorumCertificate.Identity(),
b.ParentQuorumCertificate.GetRank()
}

View File

@ -20,7 +20,7 @@ func (t *TestAggregatedSignature) GetSignature() []byte {
return t.Signature
}
func (t *TestAggregatedSignature) GetPublicKey() []byte {
func (t *TestAggregatedSignature) GetPubKey() []byte {
return t.PublicKey
}
@ -33,7 +33,7 @@ type TestQuorumCertificate struct {
Rank uint64
FrameNumber uint64
Selector models.Identity
Timestamp int64
Timestamp uint64
AggregatedSignature models.AggregatedSignature
}
@ -49,11 +49,11 @@ func (t *TestQuorumCertificate) GetFrameNumber() uint64 {
return t.FrameNumber
}
func (t *TestQuorumCertificate) GetSelector() models.Identity {
func (t *TestQuorumCertificate) Identity() models.Identity {
return t.Selector
}
func (t *TestQuorumCertificate) GetTimestamp() int64 {
func (t *TestQuorumCertificate) GetTimestamp() uint64 {
return t.Timestamp
}
@ -65,15 +65,15 @@ func (t *TestQuorumCertificate) Equals(other models.QuorumCertificate) bool {
return bytes.Equal(t.Filter, other.GetFilter()) &&
t.Rank == other.GetRank() &&
t.FrameNumber == other.GetFrameNumber() &&
t.Selector == other.GetSelector() &&
t.Selector == other.Identity() &&
t.Timestamp == other.GetTimestamp() &&
bytes.Equal(
t.AggregatedSignature.GetBitmask(),
other.GetAggregatedSignature().GetBitmask(),
) &&
bytes.Equal(
t.AggregatedSignature.GetPublicKey(),
other.GetAggregatedSignature().GetPublicKey(),
t.AggregatedSignature.GetPubKey(),
other.GetAggregatedSignature().GetPubKey(),
) &&
bytes.Equal(
t.AggregatedSignature.GetSignature(),
@ -88,7 +88,7 @@ func MakeQC(options ...func(*TestQuorumCertificate)) models.QuorumCertificate {
Rank: rand.Uint64(),
FrameNumber: rand.Uint64() + 1,
Selector: string(s),
Timestamp: time.Now().UnixMilli(),
Timestamp: uint64(time.Now().UnixMilli()),
AggregatedSignature: &TestAggregatedSignature{
PublicKey: make([]byte, 585),
Signature: make([]byte, 74),

View File

@ -47,8 +47,8 @@ func (t *TestTimeoutCertificate) Equals(other models.TimeoutCertificate) bool {
other.GetAggregatedSignature().GetBitmask(),
) &&
bytes.Equal(
t.AggregatedSignature.GetPublicKey(),
other.GetAggregatedSignature().GetPublicKey(),
t.AggregatedSignature.GetPubKey(),
other.GetAggregatedSignature().GetPubKey(),
) &&
bytes.Equal(
t.AggregatedSignature.GetSignature(),

View File

@ -22,7 +22,7 @@ func FinalizedStates(in *Instance) []*models.State[*helper.TestState] {
break
}
finalizedState, found =
in.headers[finalizedState.ParentQuorumCertificate.GetSelector()]
in.headers[finalizedState.ParentQuorumCertificate.Identity()]
if !found {
break
}

View File

@ -28,10 +28,10 @@ func Connect(t *testing.T, instances []*Instance) {
require.True(t, ok)
// sender should always have the parent
sender.updatingStates.RLock()
_, exists := sender.headers[proposal.State.ParentQuorumCertificate.GetSelector()]
_, exists := sender.headers[proposal.State.ParentQuorumCertificate.Identity()]
sender.updatingStates.RUnlock()
if !exists {
t.Fatalf("parent for proposal not found (sender: %x, parent: %x)", sender.localID, proposal.State.ParentQuorumCertificate.GetSelector())
t.Fatalf("parent for proposal not found (sender: %x, parent: %x)", sender.localID, proposal.State.ParentQuorumCertificate.Identity())
}
// store locally and loop back to engine for processing

View File

@ -270,7 +270,7 @@ func NewInstance(t *testing.T, options ...Option) *Instance {
Rank: votes[0].Rank,
FrameNumber: votes[0].Rank,
Selector: votes[0].StateID,
Timestamp: time.Now().UnixMilli(),
Timestamp: uint64(time.Now().UnixMilli()),
AggregatedSignature: &helper.TestAggregatedSignature{
Signature: make([]byte, 74),
Bitmask: bitmask,
@ -295,11 +295,11 @@ func NewInstance(t *testing.T, options ...Option) *Instance {
// sender should always have the parent
in.updatingStates.RLock()
_, exists := in.headers[proposal.State.ParentQuorumCertificate.GetSelector()]
_, exists := in.headers[proposal.State.ParentQuorumCertificate.Identity()]
in.updatingStates.RUnlock()
if !exists {
t.Fatalf("parent for proposal not found parent: %x", proposal.State.ParentQuorumCertificate.GetSelector())
t.Fatalf("parent for proposal not found parent: %x", proposal.State.ParentQuorumCertificate.Identity())
}
// store locally and loop back to engine for processing
@ -365,7 +365,7 @@ func NewInstance(t *testing.T, options ...Option) *Instance {
Rank: rootState.Rank,
FrameNumber: rootState.Rank,
Selector: rootState.Identifier,
Timestamp: time.Now().UnixMilli(),
Timestamp: uint64(time.Now().UnixMilli()),
AggregatedSignature: &helper.TestAggregatedSignature{
Signature: make([]byte, 74),
Bitmask: []byte{0b11111111, 0b00000000},
@ -410,10 +410,10 @@ func NewInstance(t *testing.T, options ...Option) *Instance {
in.queue <- qc
}
voteProcessorFactory := mocks.NewVoteProcessorFactory[*helper.TestState, *helper.TestVote](t)
voteProcessorFactory.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
func(tracer consensus.TraceLogger, proposal *models.SignedProposal[*helper.TestState, *helper.TestVote], dsTag []byte, aggregator consensus.SignatureAggregator) consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote] {
processor, err := votecollector.NewBootstrapVoteProcessor(
voteProcessorFactory := mocks.NewVoteProcessorFactory[*helper.TestState, *helper.TestVote, *helper.TestPeer](t)
voteProcessorFactory.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
func(tracer consensus.TraceLogger, proposal *models.SignedProposal[*helper.TestState, *helper.TestVote], dsTag []byte, aggregator consensus.SignatureAggregator, votingProvider consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote] {
processor, err := votecollector.NewBootstrapVoteProcessor[*helper.TestState, *helper.TestVote, *helper.TestPeer](
in.logger,
in.committee,
proposal.State,
@ -441,7 +441,7 @@ func NewInstance(t *testing.T, options ...Option) *Instance {
) (models.QuorumCertificate, error) {
return &helper.TestQuorumCertificate{
Rank: state.Rank,
Timestamp: int64(state.Timestamp),
Timestamp: state.Timestamp,
FrameNumber: state.Rank,
Selector: state.Identifier,
AggregatedSignature: aggregatedSignature,
@ -475,11 +475,11 @@ func NewInstance(t *testing.T, options ...Option) *Instance {
}, nil
}).Maybe()
sigAgg.On("VerifySignatureRaw", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(true, nil).Maybe()
createCollectorFactoryMethod := votecollector.NewStateMachineFactory(in.logger, voteAggregationDistributor, voteProcessorFactory.Create, []byte{}, sigAgg)
voteCollectors := voteaggregator.NewVoteCollectors(in.logger, livenessData.CurrentRank, workerpool.New(2), createCollectorFactoryMethod)
createCollectorFactoryMethod := votecollector.NewStateMachineFactory(in.logger, voteAggregationDistributor, voteProcessorFactory.Create, []byte{}, sigAgg, in.voting)
voteCollectors := voteaggregator.NewVoteCollectors[*helper.TestState, *helper.TestVote](in.logger, livenessData.CurrentRank, workerpool.New(2), createCollectorFactoryMethod)
// initialize the vote aggregator
in.voteAggregator, err = voteaggregator.NewVoteAggregator(
in.voteAggregator, err = voteaggregator.NewVoteAggregator[*helper.TestState, *helper.TestVote](
in.logger,
voteAggregationDistributor,
livenessData.CurrentRank,
@ -543,7 +543,7 @@ func NewInstance(t *testing.T, options ...Option) *Instance {
nil,
).Maybe()
p, err := timeoutcollector.NewTimeoutProcessor(
p, err := timeoutcollector.NewTimeoutProcessor[*helper.TestState, *helper.TestVote, *helper.TestPeer](
in.logger,
in.committee,
in.validator,
@ -692,7 +692,7 @@ func (in *Instance) Run(t *testing.T) error {
func (in *Instance) ProcessState(proposal *models.SignedProposal[*helper.TestState, *helper.TestVote]) {
in.updatingStates.Lock()
defer in.updatingStates.Unlock()
_, parentExists := in.headers[proposal.State.ParentQuorumCertificate.GetSelector()]
_, parentExists := in.headers[proposal.State.ParentQuorumCertificate.Identity()]
if parentExists {
next := proposal
@ -701,11 +701,11 @@ func (in *Instance) ProcessState(proposal *models.SignedProposal[*helper.TestSta
in.queue <- next
// keep processing the pending states
next = in.pendings[next.State.ParentQuorumCertificate.GetSelector()]
next = in.pendings[next.State.ParentQuorumCertificate.Identity()]
}
} else {
// cache it in pendings by ParentID
in.pendings[proposal.State.ParentQuorumCertificate.GetSelector()] = proposal
in.pendings[proposal.State.ParentQuorumCertificate.Identity()] = proposal
}
}

View File

@ -10,13 +10,13 @@ import (
)
// VoteProcessorFactory is an autogenerated mock type for the VoteProcessorFactory type
type VoteProcessorFactory[StateT models.Unique, VoteT models.Unique] struct {
type VoteProcessorFactory[StateT models.Unique, VoteT models.Unique, PeerIDT models.Unique] struct {
mock.Mock
}
// Create provides a mock function with given fields: tracer, proposal, dsTag, aggregator
func (_m *VoteProcessorFactory[StateT, VoteT]) Create(tracer consensus.TraceLogger, proposal *models.SignedProposal[StateT, VoteT], dsTag []byte, aggregator consensus.SignatureAggregator) (consensus.VerifyingVoteProcessor[StateT, VoteT], error) {
ret := _m.Called(tracer, proposal, dsTag, aggregator)
func (_m *VoteProcessorFactory[StateT, VoteT, PeerIDT]) Create(tracer consensus.TraceLogger, proposal *models.SignedProposal[StateT, VoteT], dsTag []byte, aggregator consensus.SignatureAggregator, voter consensus.VotingProvider[StateT, VoteT, PeerIDT]) (consensus.VerifyingVoteProcessor[StateT, VoteT], error) {
ret := _m.Called(tracer, proposal, dsTag, aggregator, voter)
if len(ret) == 0 {
panic("no return value specified for Create")
@ -24,19 +24,19 @@ func (_m *VoteProcessorFactory[StateT, VoteT]) Create(tracer consensus.TraceLogg
var r0 consensus.VerifyingVoteProcessor[StateT, VoteT]
var r1 error
if rf, ok := ret.Get(0).(func(consensus.TraceLogger, *models.SignedProposal[StateT, VoteT], []byte, consensus.SignatureAggregator) (consensus.VerifyingVoteProcessor[StateT, VoteT], error)); ok {
return rf(tracer, proposal, dsTag, aggregator)
if rf, ok := ret.Get(0).(func(consensus.TraceLogger, *models.SignedProposal[StateT, VoteT], []byte, consensus.SignatureAggregator, consensus.VotingProvider[StateT, VoteT, PeerIDT]) (consensus.VerifyingVoteProcessor[StateT, VoteT], error)); ok {
return rf(tracer, proposal, dsTag, aggregator, voter)
}
if rf, ok := ret.Get(0).(func(consensus.TraceLogger, *models.SignedProposal[StateT, VoteT], []byte, consensus.SignatureAggregator) consensus.VerifyingVoteProcessor[StateT, VoteT]); ok {
r0 = rf(tracer, proposal, dsTag, aggregator)
if rf, ok := ret.Get(0).(func(consensus.TraceLogger, *models.SignedProposal[StateT, VoteT], []byte, consensus.SignatureAggregator, consensus.VotingProvider[StateT, VoteT, PeerIDT]) consensus.VerifyingVoteProcessor[StateT, VoteT]); ok {
r0 = rf(tracer, proposal, dsTag, aggregator, voter)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(consensus.VerifyingVoteProcessor[StateT, VoteT])
}
}
if rf, ok := ret.Get(1).(func(consensus.TraceLogger, *models.SignedProposal[StateT, VoteT], []byte, consensus.SignatureAggregator) error); ok {
r1 = rf(tracer, proposal, dsTag, aggregator)
if rf, ok := ret.Get(1).(func(consensus.TraceLogger, *models.SignedProposal[StateT, VoteT], []byte, consensus.SignatureAggregator, consensus.VotingProvider[StateT, VoteT, PeerIDT]) error); ok {
r1 = rf(tracer, proposal, dsTag, aggregator, voter)
} else {
r1 = ret.Error(1)
}
@ -46,11 +46,11 @@ func (_m *VoteProcessorFactory[StateT, VoteT]) Create(tracer consensus.TraceLogg
// NewVoteProcessorFactory creates a new instance of VoteProcessorFactory. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewVoteProcessorFactory[StateT models.Unique, VoteT models.Unique](t interface {
func NewVoteProcessorFactory[StateT models.Unique, VoteT models.Unique, PeerIDT models.Unique](t interface {
mock.TestingT
Cleanup(func())
}) *VoteProcessorFactory[StateT, VoteT] {
mock := &VoteProcessorFactory[StateT, VoteT]{}
}) *VoteProcessorFactory[StateT, VoteT, PeerIDT] {
mock := &VoteProcessorFactory[StateT, VoteT, PeerIDT]{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })

View File

@ -5,8 +5,8 @@ package models
type AggregatedSignature interface {
// GetSignature returns the aggregated signature in raw canonical bytes
GetSignature() []byte
// GetPublicKey returns the public key in raw canonical bytes
GetPublicKey() []byte
// GetPubKey returns the public key in raw canonical bytes
GetPubKey() []byte
// GetBitmask returns the bitmask of the signers in the signature, in matching
// order to the clique's prover set (in ascending ring order).
GetBitmask() []byte

View File

@ -9,10 +9,10 @@ type QuorumCertificate interface {
GetRank() uint64
// GetFrameNumber returns the frame number applied to the round.
GetFrameNumber() uint64
// GetSelector returns the selector of the frame.
GetSelector() Identity
// Identity returns the selector of the frame.
Identity() Identity
// GetTimestamp returns the timestamp of the certificate.
GetTimestamp() int64
GetTimestamp() uint64
// GetAggregatedSignature returns the set of signers who voted on the round.
GetAggregatedSignature() AggregatedSignature
// Equals compares inner equality with another quorum certificate.

View File

@ -74,12 +74,12 @@ func NewCertifiedState[StateT Unique](
quorumCertificate.GetRank(),
)
}
if state.Identifier != quorumCertificate.GetSelector() {
if state.Identifier != quorumCertificate.Identity() {
return &CertifiedState[StateT]{},
fmt.Errorf(
"state's ID (%x) should equal the state referenced by the qc (%x)",
state.Identifier,
quorumCertificate.GetSelector(),
quorumCertificate.Identity(),
)
}
return &CertifiedState[StateT]{
@ -92,7 +92,7 @@ func NewCertifiedState[StateT Unique](
// produce a state vote). To avoid repeated computation, we use value from the
// QuorumCertificate.
func (b *CertifiedState[StateT]) Identifier() Identity {
return b.CertifyingQuorumCertificate.GetSelector()
return b.CertifyingQuorumCertificate.Identity()
}
// Rank returns rank where the state was proposed.

View File

@ -68,7 +68,7 @@ func (lc *LogConsumer[StateT, VoteT]) OnInvalidStateDetected(
),
consensus.IdentityParam(
"qc_state_id",
invalidState.ParentQuorumCertificate.GetSelector(),
invalidState.ParentQuorumCertificate.Identity(),
),
).Error("invalid state detected", err)
}
@ -103,7 +103,7 @@ func (lc *LogConsumer[StateT, VoteT]) OnReceiveProposal(
),
consensus.IdentityParam(
"last_rank_tc_newest_qc_state_id",
lastRankTC.GetLatestQuorumCert().GetSelector(),
lastRankTC.GetLatestQuorumCert().Identity(),
),
)
}
@ -118,7 +118,7 @@ func (lc *LogConsumer[StateT, VoteT]) OnReceiveQuorumCertificate(
lc.log.With(
consensus.Uint64Param("cur_rank", currentRank),
consensus.Uint64Param("qc_rank", qc.GetRank()),
consensus.IdentityParam("qc_state_id", qc.GetSelector()),
consensus.IdentityParam("qc_state_id", qc.Identity()),
).Trace("processing QC")
}
@ -132,7 +132,7 @@ func (lc *LogConsumer[StateT, VoteT]) OnReceiveTimeoutCertificate(
consensus.Uint64Param("newest_qc_rank", tc.GetLatestQuorumCert().GetRank()),
consensus.IdentityParam(
"newest_qc_state_id",
tc.GetLatestQuorumCert().GetSelector(),
tc.GetLatestQuorumCert().Identity(),
),
).Trace("processing TC")
}
@ -150,7 +150,7 @@ func (lc *LogConsumer[StateT, VoteT]) OnPartialTimeoutCertificate(
),
consensus.IdentityParam(
"qc_state_id",
partialTc.NewestQuorumCertificate.GetSelector(),
partialTc.NewestQuorumCertificate.Identity(),
),
)
lastRankTC := partialTc.PriorRankTimeoutCertificate
@ -163,7 +163,7 @@ func (lc *LogConsumer[StateT, VoteT]) OnPartialTimeoutCertificate(
),
consensus.IdentityParam(
"last_rank_tc_newest_qc_state_id",
lastRankTC.GetLatestQuorumCert().GetSelector(),
lastRankTC.GetLatestQuorumCert().Identity(),
),
)
}
@ -191,7 +191,7 @@ func (lc *LogConsumer[StateT, VoteT]) OnQuorumCertificateTriggeredRankChange(
) {
lc.log.With(
consensus.Uint64Param("qc_rank", qc.GetRank()),
consensus.IdentityParam("qc_state_id", qc.GetSelector()),
consensus.IdentityParam("qc_state_id", qc.Identity()),
consensus.Uint64Param("old_rank", oldRank),
consensus.Uint64Param("new_rank", newRank),
).Trace("QC triggered rank change")
@ -359,7 +359,7 @@ func (lc *LogConsumer[StateT, VoteT]) logBasicStateData(
consensus.Uint64Param("qc_rank", state.ParentQuorumCertificate.GetRank()),
consensus.IdentityParam(
"qc_state_id",
state.ParentQuorumCertificate.GetSelector(),
state.ParentQuorumCertificate.Identity(),
),
)
}
@ -374,7 +374,7 @@ func (
consensus.Uint64Param("newest_qc_rank", tc.GetLatestQuorumCert().GetRank()),
consensus.IdentityParam(
"newest_qc_state_id",
tc.GetLatestQuorumCert().GetSelector(),
tc.GetLatestQuorumCert().Identity(),
),
).Trace("TC constructed")
}
@ -391,7 +391,7 @@ func (lc *LogConsumer[StateT, VoteT]) OnPartialTimeoutCertificateCreated(
lc.log.With(
consensus.Uint64Param("rank", rank),
consensus.Uint64Param("newest_qc_rank", newestQC.GetRank()),
consensus.IdentityParam("newest_qc_state_id", newestQC.GetSelector()),
consensus.IdentityParam("newest_qc_state_id", newestQC.Identity()),
consensus.StringParam("has_last_rank_tc", has),
).Trace("partial TC constructed")
}
@ -401,7 +401,7 @@ func (lc *LogConsumer[StateT, VoteT]) OnNewQuorumCertificateDiscovered(
) {
lc.log.With(
consensus.Uint64Param("qc_rank", qc.GetRank()),
consensus.IdentityParam("qc_state_id", qc.GetSelector()),
consensus.IdentityParam("qc_state_id", qc.Identity()),
).Trace("new QC discovered")
}
@ -413,7 +413,7 @@ func (lc *LogConsumer[StateT, VoteT]) OnNewTimeoutCertificateDiscovered(
consensus.Uint64Param("newest_qc_rank", tc.GetLatestQuorumCert().GetRank()),
consensus.IdentityParam(
"newest_qc_state_id",
tc.GetLatestQuorumCert().GetSelector(),
tc.GetLatestQuorumCert().Identity(),
),
).Trace("new TC discovered")
}
@ -466,7 +466,7 @@ func (lc *LogConsumer[StateT, VoteT]) OnOwnProposal(
consensus.IdentityParam("state_id", header.State.Identifier),
consensus.IdentityParam(
"parent_qc_id",
header.State.ParentQuorumCertificate.GetSelector(),
header.State.ParentQuorumCertificate.Identity(),
),
consensus.TimeParam(
"timestamp",
@ -481,6 +481,6 @@ func (lc *LogConsumer[StateT, VoteT]) OnQuorumCertificateConstructedFromVotes(
) {
lc.log.With(
consensus.Uint64Param("rank", qc.GetRank()),
consensus.IdentityParam("state_id", qc.GetSelector()),
consensus.IdentityParam("state_id", qc.Identity()),
).Trace("QC constructed from votes")
}

View File

@ -0,0 +1,240 @@
package participant
import (
"fmt"
"time"
"github.com/gammazero/workerpool"
"source.quilibrium.com/quilibrium/monorepo/consensus"
"source.quilibrium.com/quilibrium/monorepo/consensus/eventhandler"
"source.quilibrium.com/quilibrium/monorepo/consensus/eventloop"
"source.quilibrium.com/quilibrium/monorepo/consensus/forks"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
"source.quilibrium.com/quilibrium/monorepo/consensus/notifications/pubsub"
"source.quilibrium.com/quilibrium/monorepo/consensus/pacemaker"
"source.quilibrium.com/quilibrium/monorepo/consensus/pacemaker/timeout"
"source.quilibrium.com/quilibrium/monorepo/consensus/safetyrules"
"source.quilibrium.com/quilibrium/monorepo/consensus/stateproducer"
"source.quilibrium.com/quilibrium/monorepo/consensus/timeoutaggregator"
"source.quilibrium.com/quilibrium/monorepo/consensus/timeoutcollector"
"source.quilibrium.com/quilibrium/monorepo/consensus/validator"
"source.quilibrium.com/quilibrium/monorepo/consensus/verification"
"source.quilibrium.com/quilibrium/monorepo/consensus/voteaggregator"
"source.quilibrium.com/quilibrium/monorepo/consensus/votecollector"
)
// NewParticipant initializes the EventLoop instance with needed dependencies
func NewParticipant[
StateT models.Unique,
VoteT models.Unique,
PeerIDT models.Unique,
CollectedT models.Unique,
](
logger consensus.TraceLogger,
committee consensus.DynamicCommittee,
prover consensus.LeaderProvider[StateT, PeerIDT, CollectedT],
voter consensus.VotingProvider[StateT, VoteT, PeerIDT],
voteProcessorFactory consensus.VoteProcessorFactory[StateT, VoteT, PeerIDT],
consensusStore consensus.ConsensusStore[VoteT],
signatureAggregator consensus.SignatureAggregator,
consensusVerifier consensus.Verifier[VoteT],
voteNotifier pubsub.VoteAggregationDistributor[StateT, VoteT],
timeoutNotifier *pubsub.TimeoutCollectorDistributor[VoteT],
consumer consensus.Consumer[StateT, VoteT],
finalizer consensus.Finalizer,
filter []byte,
trustedRoot *models.CertifiedState[StateT],
proposalDomain []byte,
timeoutDomain []byte,
) (*eventloop.EventLoop[StateT, VoteT], error) {
cfg, err := timeout.NewConfig(
1*time.Second,
3*time.Second,
1.2,
6,
10*time.Second,
)
if err != nil {
return nil, err
}
livenessState, err := consensusStore.GetLivenessState()
if err != nil {
livenessState = &models.LivenessState{
Filter: filter,
CurrentRank: 0,
LatestQuorumCertificate: trustedRoot.CertifyingQuorumCertificate,
PriorRankTimeoutCertificate: nil,
}
err = consensusStore.PutLivenessState(livenessState)
if err != nil {
return nil, err
}
}
voteAggregationDistributor := pubsub.NewVoteAggregationDistributor[
StateT,
VoteT,
]()
createCollectorFactoryMethod := votecollector.NewStateMachineFactory(
logger,
voteAggregationDistributor,
votecollector.VerifyingVoteProcessorFactory[StateT, VoteT, PeerIDT](
voteProcessorFactory.Create,
),
proposalDomain,
signatureAggregator,
voter,
)
voteCollectors := voteaggregator.NewVoteCollectors(
logger,
livenessState.CurrentRank,
workerpool.New(2),
createCollectorFactoryMethod,
)
// initialize the vote aggregator
voteAggregator, err := voteaggregator.NewVoteAggregator(
logger,
voteAggregationDistributor,
livenessState.CurrentRank,
voteCollectors,
)
// initialize the Validator
validator := validator.NewValidator[StateT, VoteT](committee, consensusVerifier)
// initialize factories for timeout collector and timeout processor
timeoutAggregationDistributor := pubsub.NewTimeoutAggregationDistributor[VoteT]()
timeoutProcessorFactory := timeoutcollector.NewTimeoutProcessorFactory[StateT, VoteT, PeerIDT](
logger,
signatureAggregator,
timeoutNotifier,
committee,
validator,
timeoutDomain,
)
timeoutCollectorFactory := timeoutcollector.NewTimeoutCollectorFactory(
logger,
timeoutAggregationDistributor,
timeoutProcessorFactory,
)
timeoutCollectors := timeoutaggregator.NewTimeoutCollectors(
logger,
livenessState.CurrentRank,
timeoutCollectorFactory,
)
// initialize the timeout aggregator
timeoutAggregator, err := timeoutaggregator.NewTimeoutAggregator(
logger,
livenessState.CurrentRank,
timeoutCollectors,
)
consensusState, err := consensusStore.GetConsensusState()
if err != nil {
consensusState = &models.ConsensusState[VoteT]{
FinalizedRank: trustedRoot.Rank(),
LatestAcknowledgedRank: trustedRoot.Rank(),
}
err = consensusStore.PutConsensusState(consensusState)
if err != nil {
return nil, err
}
}
// prune vote aggregator to initial rank
voteAggregator.PruneUpToRank(trustedRoot.Rank())
timeoutAggregator.PruneUpToRank(trustedRoot.Rank())
// initialize dynamically updatable timeout config
timeoutConfig, err := timeout.NewConfig(
time.Duration(cfg.MinReplicaTimeout),
time.Duration(cfg.MaxReplicaTimeout),
cfg.TimeoutAdjustmentFactor,
cfg.HappyPathMaxRoundFailures,
time.Duration(cfg.MaxTimeoutStateRebroadcastInterval),
)
if err != nil {
return nil, fmt.Errorf("could not initialize timeout config: %w", err)
}
// initialize the pacemaker
controller := timeout.NewController(timeoutConfig)
pacemaker, err := pacemaker.NewPacemaker(
controller,
pacemaker.NoProposalDelay(),
consumer,
consensusStore,
logger,
)
if err != nil {
return nil, fmt.Errorf("could not initialize flow pacemaker: %w", err)
}
signer := verification.NewSigner[StateT, VoteT, PeerIDT](voter)
// initialize the safetyRules
safetyRules, err := safetyrules.NewSafetyRules[StateT, VoteT](
signer,
consensusStore,
committee,
)
if err != nil {
return nil, fmt.Errorf("could not initialize safety rules: %w", err)
}
// initialize state producer
producer, err := stateproducer.NewStateProducer[
StateT,
VoteT,
PeerIDT,
CollectedT,
](safetyRules, committee, prover)
if err != nil {
return nil, fmt.Errorf("could not initialize state producer: %w", err)
}
forks, err := forks.NewForks[StateT, VoteT](trustedRoot, finalizer, consumer)
if err != nil {
return nil, fmt.Errorf("could not initialize forks: %w", err)
}
// initialize the event handler
eventHandler, err := eventhandler.NewEventHandler[
StateT,
VoteT,
PeerIDT,
CollectedT,
](
pacemaker,
producer,
forks,
consensusStore,
committee,
safetyRules,
consumer,
logger,
)
if err != nil {
return nil, fmt.Errorf("could not initialize event handler: %w", err)
}
// initialize and return the event loop
loop, err := eventloop.NewEventLoop(
logger,
eventHandler,
time.Now().Add(10*time.Second),
)
if err != nil {
return nil, fmt.Errorf("could not initialize event loop: %w", err)
}
// add observer, event loop needs to receive events from distributor
voteNotifier.AddVoteCollectorConsumer(loop)
timeoutNotifier.AddTimeoutCollectorConsumer(loop)
return loop, nil
}

View File

@ -51,14 +51,14 @@ func (b *StateSignerDecoder[StateT]) DecodeSignerIDs(
// possibly, we request rank which is far behind in the past, in this
// case we won't have it in cache. try asking by parent ID
byStateMembers, err := b.IdentitiesByState(
state.ParentQuorumCertificate.GetSelector(),
state.ParentQuorumCertificate.Identity(),
)
if err != nil {
return nil, fmt.Errorf(
"could not retrieve identities for state %x with QC rank %d for parent %x: %w",
state.Identifier,
state.ParentQuorumCertificate.GetRank(),
state.ParentQuorumCertificate.GetSelector(),
state.ParentQuorumCertificate.Identity(),
err,
) // state.ErrUnknownSnapshotReference or exception
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,360 +0,0 @@
package consensus
// import (
// "fmt"
// "strings"
// "time"
// )
// // StateMachineViz provides visualization utilities for the generic state machine
// type StateMachineViz[
// StateT Unique,
// VoteT Unique,
// PeerIDT Unique,
// CollectedT Unique,
// ] struct {
// sm *StateMachine[StateT, VoteT, PeerIDT, CollectedT]
// }
// // NewStateMachineViz creates a new visualizer for the generic state machine
// func NewStateMachineViz[
// StateT Unique,
// VoteT Unique,
// PeerIDT Unique,
// CollectedT Unique,
// ](
// sm *StateMachine[StateT, VoteT, PeerIDT, CollectedT],
// ) *StateMachineViz[StateT, VoteT, PeerIDT, CollectedT] {
// return &StateMachineViz[StateT, VoteT, PeerIDT, CollectedT]{sm: sm}
// }
// // GenerateMermaidDiagram generates a Mermaid diagram of the state machine
// func (
// v *StateMachineViz[StateT, VoteT, PeerIDT, CollectedT],
// ) GenerateMermaidDiagram() string {
// var sb strings.Builder
// sb.WriteString("```mermaid\n")
// sb.WriteString("stateDiagram-v2\n")
// sb.WriteString(" [*] --> Stopped\n")
// // Define states with descriptions
// // Use CamelCase for state IDs to avoid underscore issues
// stateMap := map[State]string{
// StateStopped: "Stopped",
// StateStarting: "Starting",
// StateLoading: "Loading",
// StateCollecting: "Collecting",
// StateLivenessCheck: "LivenessCheck",
// StateProving: "Proving",
// StatePublishing: "Publishing",
// StateVoting: "Voting",
// StateFinalizing: "Finalizing",
// StateVerifying: "Verifying",
// StateStopping: "Stopping",
// }
// stateDescriptions := map[State]string{
// StateStopped: "Engine not running",
// StateStarting: "Initializing components",
// StateLoading: "Syncing with network",
// StateCollecting: "Gathering consensus data",
// StateLivenessCheck: "Checking prover availability",
// StateProving: "Generating cryptographic proof",
// StatePublishing: "Broadcasting proposal",
// StateVoting: "Participating in consensus",
// StateFinalizing: "Aggregating votes",
// StateVerifying: "Publishing confirmation",
// StateStopping: "Cleaning up resources",
// }
// // Add state descriptions
// for state, id := range stateMap {
// desc := stateDescriptions[state]
// sb.WriteString(fmt.Sprintf(" %s : %s\n", id, desc))
// }
// sb.WriteString("\n")
// // Add transitions using mapped state names
// transitions := v.getTransitionList()
// for _, t := range transitions {
// fromID := stateMap[t.From]
// toID := stateMap[t.To]
// if t.Guard != nil {
// sb.WriteString(fmt.Sprintf(
// " %s --> %s : %s [guarded]\n",
// fromID, toID, t.Event))
// } else {
// sb.WriteString(fmt.Sprintf(
// " %s --> %s : %s\n",
// fromID, toID, t.Event))
// }
// }
// // Add special annotations using mapped names
// sb.WriteString("\n")
// sb.WriteString(" note right of Proving : Leader only\n")
// sb.WriteString(
// " note right of LivenessCheck : Divergence point\\nfor leader/non-leader\n",
// )
// sb.WriteString(" note right of Voting : Convergence point\n")
// sb.WriteString("```\n")
// return sb.String()
// }
// // GenerateDotDiagram generates a Graphviz DOT diagram
// func (
// v *StateMachineViz[StateT, VoteT, PeerIDT, CollectedT],
// ) GenerateDotDiagram() string {
// var sb strings.Builder
// sb.WriteString("digraph ConsensusStateMachine {\n")
// sb.WriteString(" rankdir=TB;\n")
// sb.WriteString(" node [shape=box, style=rounded];\n")
// sb.WriteString(" edge [fontsize=10];\n\n")
// // Define node styles
// sb.WriteString(" // State styles\n")
// sb.WriteString(
// " Stopped [style=\"rounded,filled\", fillcolor=lightgray];\n",
// )
// sb.WriteString(
// " Starting [style=\"rounded,filled\", fillcolor=lightyellow];\n",
// )
// sb.WriteString(
// " Loading [style=\"rounded,filled\", fillcolor=lightyellow];\n",
// )
// sb.WriteString(
// " Collecting [style=\"rounded,filled\", fillcolor=lightblue];\n",
// )
// sb.WriteString(
// " LivenessCheck [style=\"rounded,filled\", fillcolor=orange];\n",
// )
// sb.WriteString(
// " Proving [style=\"rounded,filled\", fillcolor=lightgreen];\n",
// )
// sb.WriteString(
// " Publishing [style=\"rounded,filled\", fillcolor=lightgreen];\n",
// )
// sb.WriteString(
// " Voting [style=\"rounded,filled\", fillcolor=lightblue];\n",
// )
// sb.WriteString(
// " Finalizing [style=\"rounded,filled\", fillcolor=lightblue];\n",
// )
// sb.WriteString(
// " Verifying [style=\"rounded,filled\", fillcolor=lightblue];\n",
// )
// sb.WriteString(
// " Stopping [style=\"rounded,filled\", fillcolor=lightcoral];\n\n",
// )
// // Add transitions
// sb.WriteString(" // Transitions\n")
// transitions := v.getTransitionList()
// for _, t := range transitions {
// label := string(t.Event)
// if t.Guard != nil {
// label += " [G]"
// }
// sb.WriteString(fmt.Sprintf(
// " %s -> %s [label=\"%s\"];\n",
// t.From, t.To, label))
// }
// // Add legend
// sb.WriteString("\n // Legend\n")
// sb.WriteString(" subgraph cluster_legend {\n")
// sb.WriteString(" label=\"Legend\";\n")
// sb.WriteString(" style=dotted;\n")
// sb.WriteString(" \"[G] = Guarded transition\" [shape=none];\n")
// sb.WriteString(" \"Yellow = Initialization\" [shape=none];\n")
// sb.WriteString(" \"Blue = Consensus flow\" [shape=none];\n")
// sb.WriteString(" \"Green = Leader specific\" [shape=none];\n")
// sb.WriteString(" \"Orange = Decision point\" [shape=none];\n")
// sb.WriteString(" }\n")
// sb.WriteString("}\n")
// return sb.String()
// }
// // GenerateTransitionTable generates a markdown table of all transitions
// func (
// v *StateMachineViz[StateT, VoteT, PeerIDT, CollectedT],
// ) GenerateTransitionTable() string {
// var sb strings.Builder
// sb.WriteString("| From State | Event | To State | Condition |\n")
// sb.WriteString("|------------|-------|----------|----------|\n")
// transitions := v.getTransitionList()
// for _, t := range transitions {
// condition := "None"
// if t.Guard != nil {
// condition = "Has guard"
// }
// sb.WriteString(fmt.Sprintf(
// "| %s | %s | %s | %s |\n",
// t.From, t.Event, t.To, condition))
// }
// return sb.String()
// }
// // getTransitionList extracts all transitions from the state machine
// func (
// v *StateMachineViz[StateT, VoteT, PeerIDT, CollectedT],
// ) getTransitionList() []*Transition[StateT, VoteT, PeerIDT, CollectedT] {
// var transitions []*Transition[StateT, VoteT, PeerIDT, CollectedT]
// v.sm.mu.RLock()
// defer v.sm.mu.RUnlock()
// for _, eventMap := range v.sm.transitions {
// for _, transition := range eventMap {
// transitions = append(transitions, transition)
// }
// }
// return transitions
// }
// // GetStateStats returns statistics about the state machine
// func (
// v *StateMachineViz[StateT, VoteT, PeerIDT, CollectedT],
// ) GetStateStats() string {
// var sb strings.Builder
// sb.WriteString("State Machine Statistics:\n")
// sb.WriteString("========================\n\n")
// v.sm.mu.RLock()
// defer v.sm.mu.RUnlock()
// // Count states and transitions
// stateCount := 0
// transitionCount := 0
// eventCount := make(map[Event]int)
// for _, eventMap := range v.sm.transitions {
// // Only count if we have transitions for this state
// if len(eventMap) > 0 {
// stateCount++
// }
// for event := range eventMap {
// transitionCount++
// eventCount[event]++
// }
// }
// sb.WriteString(fmt.Sprintf("Total States: %d\n", stateCount))
// sb.WriteString(fmt.Sprintf("Total Transitions: %d\n", transitionCount))
// sb.WriteString(fmt.Sprintf("Current State: %s\n", v.sm.machineState))
// sb.WriteString(fmt.Sprintf("Transitions Made: %d\n", v.sm.transitionCount))
// sb.WriteString(
// fmt.Sprintf("Time in Current State: %v\n", v.sm.GetStateTime()),
// )
// // Display current leader info if available
// if len(v.sm.nextProvers) > 0 {
// sb.WriteString("\nNext Leaders:\n")
// for i, leader := range v.sm.nextProvers {
// sb.WriteString(fmt.Sprintf(" %d. %v\n", i+1, leader))
// }
// }
// // Display active state info
// if v.sm.activeState != nil {
// sb.WriteString(fmt.Sprintf("\nActive State: %+v\n", v.sm.activeState))
// }
// // Display liveness info
// sb.WriteString(fmt.Sprintf("\nLiveness Checks: %d\n", len(v.sm.liveness)))
// // Display voting info
// sb.WriteString(fmt.Sprintf("Proposals: %d\n", len(v.sm.proposals)))
// sb.WriteString(fmt.Sprintf("Votes: %d\n", len(v.sm.votes)))
// sb.WriteString(fmt.Sprintf("Confirmations: %d\n", len(v.sm.confirmations)))
// sb.WriteString("\nEvent Usage:\n")
// for event, count := range eventCount {
// sb.WriteString(fmt.Sprintf(" %s: %d transitions\n", event, count))
// }
// return sb.String()
// }
// // GetCurrentStateInfo returns detailed information about the current state
// func (
// v *StateMachineViz[StateT, VoteT, PeerIDT, CollectedT]) GetCurrentStateInfo() string {
// v.sm.mu.RLock()
// defer v.sm.mu.RUnlock()
// var sb strings.Builder
// sb.WriteString("Current State Information:\n")
// sb.WriteString("=========================\n\n")
// sb.WriteString(fmt.Sprintf("State: %s\n", v.sm.machineState))
// sb.WriteString(
// fmt.Sprintf("Time in State: %v\n", time.Since(v.sm.stateStartTime)),
// )
// sb.WriteString(fmt.Sprintf("Total Transitions: %d\n", v.sm.transitionCount))
// // State configuration info
// if config, exists := v.sm.stateConfigs[v.sm.machineState]; exists {
// sb.WriteString("\nState Configuration:\n")
// if config.Timeout > 0 {
// sb.WriteString(fmt.Sprintf(" Timeout: %v\n", config.Timeout))
// sb.WriteString(fmt.Sprintf(" Timeout Event: %s\n", config.OnTimeout))
// }
// if config.Behavior != nil {
// sb.WriteString(" Has Behavior: Yes\n")
// }
// if config.OnEnter != nil {
// sb.WriteString(" Has OnEnter Callback: Yes\n")
// }
// if config.OnExit != nil {
// sb.WriteString(" Has OnExit Callback: Yes\n")
// }
// }
// // Available transitions from current state
// sb.WriteString("\nAvailable Transitions:\n")
// if transitions, exists := v.sm.transitions[v.sm.machineState]; exists {
// for event, transition := range transitions {
// guardStr := ""
// if transition.Guard != nil {
// guardStr = " [guarded]"
// }
// sb.WriteString(
// fmt.Sprintf(" %s -> %s%s\n", event, transition.To, guardStr),
// )
// }
// }
// return sb.String()
// }
// // GenerateEventFlow generates a flow of events that occurred
// func (
// v *StateMachineViz[StateT, VoteT, PeerIDT, CollectedT],
// ) GenerateEventFlow() string {
// var sb strings.Builder
// sb.WriteString("Event Flow:\n")
// sb.WriteString("===========\n\n")
// transitions := v.getTransitionList()
// for i, tr := range transitions {
// sb.WriteString(fmt.Sprintf(
// "%d. %s -> %s [%s]\n",
// i+1, tr.From, tr.To, tr.Event,
// ))
// }
// return sb.String()
// }

View File

@ -65,19 +65,19 @@ func (bp *StateProducer[StateT, VoteT, PeerIDT, CollectedT]) MakeStateProposal(
context.TODO(),
rank,
qc.GetFilter(),
qc.GetSelector(),
qc.Identity(),
)
if err != nil {
if models.IsNoVoteError(err) {
return nil, fmt.Errorf(
"unsafe to vote for own proposal on top of %x: %w",
qc.GetSelector(),
qc.Identity(),
err,
)
}
return nil, fmt.Errorf(
"could not build state proposal on top of %x: %w",
qc.GetSelector(),
qc.Identity(),
err,
)
}
@ -92,7 +92,7 @@ func (bp *StateProducer[StateT, VoteT, PeerIDT, CollectedT]) MakeStateProposal(
if err != nil {
return nil, fmt.Errorf(
"could not vote on state proposal on top of %x: %w",
qc.GetSelector(),
qc.Identity(),
err,
)
}

View File

@ -507,7 +507,7 @@ func TestTimeoutProcessor_BuildVerifyTC(t *testing.T) {
StateID: state.Identifier,
}
v.On("SignVote", mock.Anything, mock.Anything).Return(&vote, nil).Once()
signers[s.Identity()] = verification.NewSigner(v)
signers[s.Identity()] = verification.NewSigner[*helper.TestState, *helper.TestVote, *helper.TestPeer](v)
}
// utility function which generates a valid timeout for every signer
@ -660,7 +660,7 @@ func createRealQC(
Rank: state.Rank,
FrameNumber: state.Rank,
Selector: state.Identifier,
Timestamp: time.Now().UnixMilli(),
Timestamp: uint64(time.Now().UnixMilli()),
AggregatedSignature: &helper.TestAggregatedSignature{PublicKey: make([]byte, 585), Signature: make([]byte, 74), Bitmask: []byte{0b11111111, 0b00000111}},
}, nil)
voteProcessor, err := voteProcessorFactory.Create(helper.Logger(), proposal, []byte{}, sigagg, votingProvider)

View File

@ -289,7 +289,7 @@ func (v *Validator[StateT, VoteT]) ValidateQuorumCertificate(
default:
return fmt.Errorf(
"cannot verify qc's aggregated signature (qc.Identifier: %x): %w",
qc.GetSelector(),
qc.Identity(),
err,
)
}
@ -508,7 +508,7 @@ func newInvalidQuorumCertificateError(
err error,
) error {
return models.InvalidQuorumCertificateError{
Identifier: qc.GetSelector(),
Identifier: qc.Identity(),
Rank: qc.GetRank(),
Err: err,
}

View File

@ -49,7 +49,7 @@ func verifyAggregatedSignatureOneMessage(
msg []byte, // message to verify against
) error {
valid := validator.VerifySignatureRaw(
aggregatedSig.GetPublicKey(),
aggregatedSig.GetPubKey(),
aggregatedSig.GetSignature(),
msg,
dsTag,

View File

@ -16,18 +16,18 @@ import (
// TestVoteProcessorFactory_CreateWithValidProposal checks if
// VoteProcessorFactory checks the proposer vote based on submitted proposal
func TestVoteProcessorFactory_CreateWithValidProposal(t *testing.T) {
mockedFactory := mocks.VoteProcessorFactory[*helper.TestState, *helper.TestVote]{}
mockedFactory := mocks.VoteProcessorFactory[*helper.TestState, *helper.TestVote, *helper.TestPeer]{}
proposal := helper.MakeSignedProposal[*helper.TestState, *helper.TestVote]()
mockedProcessor := &mocks.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote]{}
vote, err := proposal.ProposerVote()
require.NoError(t, err)
mockedProcessor.On("Process", vote).Return(nil).Once()
mockedFactory.On("Create", helper.Logger(), proposal, mock.Anything, mock.Anything).Return(mockedProcessor, nil).Once()
mockedFactory.On("Create", helper.Logger(), proposal, mock.Anything, mock.Anything, mock.Anything).Return(mockedProcessor, nil).Once()
voteProcessorFactory := &VoteProcessorFactory[*helper.TestState, *helper.TestVote, *helper.TestPeer]{
baseFactory: func(log consensus.TraceLogger, state *models.State[*helper.TestState], dsTag []byte, aggregator consensus.SignatureAggregator, votingProvider consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) {
return mockedFactory.Create(log, proposal, dsTag, aggregator)
return mockedFactory.Create(log, proposal, dsTag, aggregator, votingProvider)
},
}
@ -42,7 +42,7 @@ func TestVoteProcessorFactory_CreateWithValidProposal(t *testing.T) {
// TestVoteProcessorFactory_CreateWithInvalidVote tests that processing proposal with invalid vote doesn't return
// vote processor and returns correct error(sentinel or exception).
func TestVoteProcessorFactory_CreateWithInvalidVote(t *testing.T) {
mockedFactory := mocks.VoteProcessorFactory[*helper.TestState, *helper.TestVote]{}
mockedFactory := mocks.VoteProcessorFactory[*helper.TestState, *helper.TestVote, *helper.TestPeer]{}
t.Run("invalid-vote", func(t *testing.T) {
proposal := helper.MakeSignedProposal[*helper.TestState, *helper.TestVote]()
@ -50,11 +50,11 @@ func TestVoteProcessorFactory_CreateWithInvalidVote(t *testing.T) {
vote, err := proposal.ProposerVote()
require.NoError(t, err)
mockedProcessor.On("Process", vote).Return(models.NewInvalidVoteErrorf(vote, "")).Once()
mockedFactory.On("Create", helper.Logger(), proposal, mock.Anything, mock.Anything).Return(mockedProcessor, nil).Once()
mockedFactory.On("Create", helper.Logger(), proposal, mock.Anything, mock.Anything, mock.Anything).Return(mockedProcessor, nil).Once()
voteProcessorFactory := &VoteProcessorFactory[*helper.TestState, *helper.TestVote, *helper.TestPeer]{
baseFactory: func(log consensus.TraceLogger, state *models.State[*helper.TestState], dsTag []byte, aggregator consensus.SignatureAggregator, votingProvider consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) {
return mockedFactory.Create(log, proposal, dsTag, aggregator)
return mockedFactory.Create(log, proposal, dsTag, aggregator, votingProvider)
},
}
@ -73,11 +73,11 @@ func TestVoteProcessorFactory_CreateWithInvalidVote(t *testing.T) {
require.NoError(t, err)
mockedProcessor.On("Process", vote).Return(exception).Once()
mockedFactory.On("Create", helper.Logger(), proposal, mock.Anything, mock.Anything).Return(mockedProcessor, nil).Once()
mockedFactory.On("Create", helper.Logger(), proposal, mock.Anything, mock.Anything, mock.Anything).Return(mockedProcessor, nil).Once()
voteProcessorFactory := &VoteProcessorFactory[*helper.TestState, *helper.TestVote, *helper.TestPeer]{
baseFactory: func(log consensus.TraceLogger, state *models.State[*helper.TestState], dsTag []byte, aggregator consensus.SignatureAggregator, votingProvider consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) {
return mockedFactory.Create(log, proposal, dsTag, aggregator)
return mockedFactory.Create(log, proposal, dsTag, aggregator, votingProvider)
},
}
@ -96,15 +96,15 @@ func TestVoteProcessorFactory_CreateWithInvalidVote(t *testing.T) {
// TestVoteProcessorFactory_CreateProcessException tests that VoteProcessorFactory correctly handles exception
// while creating processor for requested proposal.
func TestVoteProcessorFactory_CreateProcessException(t *testing.T) {
mockedFactory := mocks.VoteProcessorFactory[*helper.TestState, *helper.TestVote]{}
mockedFactory := mocks.VoteProcessorFactory[*helper.TestState, *helper.TestVote, *helper.TestPeer]{}
proposal := helper.MakeSignedProposal[*helper.TestState, *helper.TestVote]()
exception := errors.New("create-exception")
mockedFactory.On("Create", helper.Logger(), proposal, mock.Anything, mock.Anything).Return(nil, exception).Once()
mockedFactory.On("Create", helper.Logger(), proposal, mock.Anything, mock.Anything, mock.Anything).Return(nil, exception).Once()
voteProcessorFactory := &VoteProcessorFactory[*helper.TestState, *helper.TestVote, *helper.TestPeer]{
baseFactory: func(log consensus.TraceLogger, state *models.State[*helper.TestState], dsTag []byte, aggregator consensus.SignatureAggregator, votingProvider consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) {
return mockedFactory.Create(log, proposal, dsTag, aggregator)
return mockedFactory.Create(log, proposal, dsTag, aggregator, votingProvider)
},
}

View File

@ -21,32 +21,39 @@ var (
type VerifyingVoteProcessorFactory[
StateT models.Unique,
VoteT models.Unique,
PeerIDT models.Unique,
] = func(
tracer consensus.TraceLogger,
proposal *models.SignedProposal[StateT, VoteT],
dsTag []byte,
aggregator consensus.SignatureAggregator,
votingProvider consensus.VotingProvider[StateT, VoteT, PeerIDT],
) (consensus.VerifyingVoteProcessor[StateT, VoteT], error)
// VoteCollector implements a state machine for transition between different
// states of vote collector
type VoteCollector[StateT models.Unique, VoteT models.Unique] struct {
type VoteCollector[
StateT models.Unique,
VoteT models.Unique,
PeerIDT models.Unique,
] struct {
sync.Mutex
tracer consensus.TraceLogger
workers consensus.Workers
notifier consensus.VoteAggregationConsumer[StateT, VoteT]
createVerifyingProcessor VerifyingVoteProcessorFactory[StateT, VoteT]
createVerifyingProcessor VerifyingVoteProcessorFactory[StateT, VoteT, PeerIDT]
dsTag []byte
aggregator consensus.SignatureAggregator
voter consensus.VotingProvider[StateT, VoteT, PeerIDT]
votesCache VotesCache[VoteT]
votesProcessor atomic.Value
}
var _ consensus.VoteCollector[*nilUnique, *nilUnique] = (*VoteCollector[*nilUnique, *nilUnique])(nil)
var _ consensus.VoteCollector[*nilUnique, *nilUnique] = (*VoteCollector[*nilUnique, *nilUnique, *nilUnique])(nil)
func (
m *VoteCollector[StateT, VoteT],
m *VoteCollector[StateT, VoteT, PeerIDT],
) atomicLoadProcessor() consensus.VoteProcessor[VoteT] {
return m.votesProcessor.Load().(*atomicValueWrapper[VoteT]).processor
}
@ -59,12 +66,21 @@ type atomicValueWrapper[VoteT models.Unique] struct {
processor consensus.VoteProcessor[VoteT]
}
func NewStateMachineFactory[StateT models.Unique, VoteT models.Unique](
func NewStateMachineFactory[
StateT models.Unique,
VoteT models.Unique,
PeerIDT models.Unique,
](
tracer consensus.TraceLogger,
notifier consensus.VoteAggregationConsumer[StateT, VoteT],
verifyingVoteProcessorFactory VerifyingVoteProcessorFactory[StateT, VoteT],
verifyingVoteProcessorFactory VerifyingVoteProcessorFactory[
StateT,
VoteT,
PeerIDT,
],
dsTag []byte,
aggregator consensus.SignatureAggregator,
voter consensus.VotingProvider[StateT, VoteT, PeerIDT],
) voteaggregator.NewCollectorFactoryMethod[StateT, VoteT] {
return func(rank uint64, workers consensus.Workers) (
consensus.VoteCollector[StateT, VoteT],
@ -78,20 +94,30 @@ func NewStateMachineFactory[StateT models.Unique, VoteT models.Unique](
verifyingVoteProcessorFactory,
dsTag,
aggregator,
voter,
), nil
}
}
func NewStateMachine[StateT models.Unique, VoteT models.Unique](
func NewStateMachine[
StateT models.Unique,
VoteT models.Unique,
PeerIDT models.Unique,
](
rank uint64,
tracer consensus.TraceLogger,
workers consensus.Workers,
notifier consensus.VoteAggregationConsumer[StateT, VoteT],
verifyingVoteProcessorFactory VerifyingVoteProcessorFactory[StateT, VoteT],
verifyingVoteProcessorFactory VerifyingVoteProcessorFactory[
StateT,
VoteT,
PeerIDT,
],
dsTag []byte,
aggregator consensus.SignatureAggregator,
) *VoteCollector[StateT, VoteT] {
sm := &VoteCollector[StateT, VoteT]{
voter consensus.VotingProvider[StateT, VoteT, PeerIDT],
) *VoteCollector[StateT, VoteT, PeerIDT] {
sm := &VoteCollector[StateT, VoteT, PeerIDT]{
tracer: tracer,
workers: workers,
notifier: notifier,
@ -99,6 +125,7 @@ func NewStateMachine[StateT models.Unique, VoteT models.Unique](
votesCache: *NewVotesCache[VoteT](rank),
dsTag: dsTag,
aggregator: aggregator,
voter: voter,
}
// without a state, we don't process votes (only cache them)
@ -111,7 +138,7 @@ func NewStateMachine[StateT models.Unique, VoteT models.Unique](
// AddVote adds a vote to current vote collector
// All expected errors are handled via callbacks to notifier.
// Under normal execution only exceptions are propagated to caller.
func (m *VoteCollector[StateT, VoteT]) AddVote(vote *VoteT) error {
func (m *VoteCollector[StateT, VoteT, PeerIDT]) AddVote(vote *VoteT) error {
// Cache vote
err := m.votesCache.AddVote(vote)
if err != nil {
@ -166,7 +193,7 @@ func (m *VoteCollector[StateT, VoteT]) AddVote(vote *VoteT) error {
// processVote uses compare-and-repeat pattern to process vote with underlying
// vote processor
func (m *VoteCollector[StateT, VoteT]) processVote(vote *VoteT) error {
func (m *VoteCollector[StateT, VoteT, PeerIDT]) processVote(vote *VoteT) error {
for {
processor := m.atomicLoadProcessor()
currentState := processor.Status()
@ -197,12 +224,12 @@ func (m *VoteCollector[StateT, VoteT]) processVote(vote *VoteT) error {
}
// Status returns the status of underlying vote processor
func (m *VoteCollector[StateT, VoteT]) Status() consensus.VoteCollectorStatus {
func (m *VoteCollector[StateT, VoteT, PeerIDT]) Status() consensus.VoteCollectorStatus {
return m.atomicLoadProcessor().Status()
}
// Rank returns rank associated with this collector
func (m *VoteCollector[StateT, VoteT]) Rank() uint64 {
func (m *VoteCollector[StateT, VoteT, PeerIDT]) Rank() uint64 {
return m.votesCache.Rank()
}
@ -220,7 +247,7 @@ func (m *VoteCollector[StateT, VoteT]) Rank() uint64 {
// CachingVotes -> VerifyingVotes
// CachingVotes -> Invalid
// VerifyingVotes -> Invalid
func (m *VoteCollector[StateT, VoteT]) ProcessState(
func (m *VoteCollector[StateT, VoteT, PeerIDT]) ProcessState(
proposal *models.SignedProposal[StateT, VoteT],
) error {
@ -299,7 +326,7 @@ func (m *VoteCollector[StateT, VoteT]) ProcessState(
// CAUTION, VoteConsumer implementations must be
// - NON-BLOCKING and consume the votes without noteworthy delay, and
// - CONCURRENCY SAFE
func (m *VoteCollector[StateT, VoteT]) RegisterVoteConsumer(
func (m *VoteCollector[StateT, VoteT, PeerIDT]) RegisterVoteConsumer(
consumer consensus.VoteConsumer[VoteT],
) {
m.votesCache.RegisterVoteConsumer(consumer)
@ -313,7 +340,7 @@ func (m *VoteCollector[StateT, VoteT]) RegisterVoteConsumer(
// `CachingVotes`
// - all other errors are unexpected and potential symptoms of internal bugs
// or state corruption (fatal)
func (m *VoteCollector[StateT, VoteT]) caching2Verifying(
func (m *VoteCollector[StateT, VoteT, PeerIDT]) caching2Verifying(
proposal *models.SignedProposal[StateT, VoteT],
) error {
stateID := proposal.State.Identifier
@ -322,6 +349,7 @@ func (m *VoteCollector[StateT, VoteT]) caching2Verifying(
proposal,
m.dsTag,
m.aggregator,
m.voter,
)
if err != nil {
return fmt.Errorf(
@ -346,7 +374,7 @@ func (m *VoteCollector[StateT, VoteT]) caching2Verifying(
return nil
}
func (m *VoteCollector[StateT, VoteT]) terminateVoteProcessing() {
func (m *VoteCollector[StateT, VoteT, PeerIDT]) terminateVoteProcessing() {
if m.Status() == consensus.VoteCollectorStatusInvalid {
return
}
@ -360,7 +388,7 @@ func (m *VoteCollector[StateT, VoteT]) terminateVoteProcessing() {
}
// processCachedVotes feeds all cached votes into the VoteProcessor
func (m *VoteCollector[StateT, VoteT]) processCachedVotes(
func (m *VoteCollector[StateT, VoteT, PeerIDT]) processCachedVotes(
state *models.State[StateT],
) {
cachedVotes := m.votesCache.All()

View File

@ -31,9 +31,9 @@ type StateMachineTestSuite struct {
rank uint64
notifier *mocks.VoteAggregationConsumer[*helper.TestState, *helper.TestVote]
workerPool *workerpool.WorkerPool
factoryMethod VerifyingVoteProcessorFactory[*helper.TestState, *helper.TestVote]
factoryMethod VerifyingVoteProcessorFactory[*helper.TestState, *helper.TestVote, *helper.TestPeer]
mockedProcessors map[models.Identity]*mocks.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote]
collector *VoteCollector[*helper.TestState, *helper.TestVote]
collector *VoteCollector[*helper.TestState, *helper.TestVote, *helper.TestPeer]
}
func (s *StateMachineTestSuite) TearDownTest() {
@ -48,7 +48,7 @@ func (s *StateMachineTestSuite) SetupTest() {
s.mockedProcessors = make(map[models.Identity]*mocks.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote])
s.notifier = mocks.NewVoteAggregationConsumer[*helper.TestState, *helper.TestVote](s.T())
s.factoryMethod = func(log consensus.TraceLogger, state *models.SignedProposal[*helper.TestState, *helper.TestVote], dsTag []byte, aggregator consensus.SignatureAggregator) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) {
s.factoryMethod = func(log consensus.TraceLogger, state *models.SignedProposal[*helper.TestState, *helper.TestVote], dsTag []byte, aggregator consensus.SignatureAggregator, voter consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) {
if processor, found := s.mockedProcessors[state.State.Identifier]; found {
return processor, nil
}
@ -56,7 +56,7 @@ func (s *StateMachineTestSuite) SetupTest() {
}
s.workerPool = workerpool.New(4)
s.collector = NewStateMachine(s.rank, helper.Logger(), s.workerPool, s.notifier, s.factoryMethod, []byte{}, consensus.SignatureAggregator(mocks.NewSignatureAggregator(s.T())))
s.collector = NewStateMachine(s.rank, helper.Logger(), s.workerPool, s.notifier, s.factoryMethod, []byte{}, consensus.SignatureAggregator(mocks.NewSignatureAggregator(s.T())), mocks.NewVotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer](s.T()))
}
// prepareMockedProcessor prepares a mocked processor and stores it in map, later it will be used
@ -96,7 +96,7 @@ func (s *StateMachineTestSuite) TestStatus_StateTransitions() {
// factory are handed through (potentially wrapped), but are not replaced.
func (s *StateMachineTestSuite) Test_FactoryErrorPropagation() {
factoryError := errors.New("factory error")
factory := func(log consensus.TraceLogger, state *models.SignedProposal[*helper.TestState, *helper.TestVote], dsTag []byte, aggregator consensus.SignatureAggregator) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) {
factory := func(log consensus.TraceLogger, state *models.SignedProposal[*helper.TestState, *helper.TestVote], dsTag []byte, aggregator consensus.SignatureAggregator, voter consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) {
return nil, factoryError
}
s.collector.createVerifyingProcessor = factory

View File

@ -242,7 +242,7 @@ func (s *VoteProcessorTestSuite) TestProcess_ConcurrentCreatingQC() {
Rank: s.proposal.State.Rank,
Selector: s.proposal.State.Identifier,
FrameNumber: s.proposal.State.Rank,
Timestamp: int64(s.proposal.State.Timestamp),
Timestamp: uint64(s.proposal.State.Timestamp),
AggregatedSignature: expectedSig,
}, nil)
var startupWg, shutdownWg sync.WaitGroup

View File

@ -60,6 +60,7 @@ const (
PathType uint32 = 0x0314
TraversalSubProofType uint32 = 0x0315
TraversalProofType uint32 = 0x0316
TimeoutStateType uint32 = 0x031C
TimeoutCertificateType uint32 = 0x031D
// Hypergraph types (0x0400 - 0x04FF)

View File

@ -6,37 +6,177 @@ import (
"slices"
"time"
"github.com/iden3/go-iden3-crypto/poseidon"
"github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
"source.quilibrium.com/quilibrium/monorepo/consensus"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
)
// Source implements models.QuorumCertificate.
func (g *QuorumCertificate) Equals(other models.QuorumCertificate) bool {
return bytes.Equal(g.Filter, other.GetFilter()) &&
g.Rank == other.GetRank() &&
g.FrameNumber == other.GetFrameNumber() &&
g.Identity() == other.Identity()
}
func (
g *QuorumCertificate,
) GetAggregatedSignature() models.AggregatedSignature {
return g.AggregateSignature
}
// Source implements models.Unique.
func (g *QuorumCertificate) Clone() models.Unique {
return proto.Clone(g).(*QuorumCertificate)
}
func (g *QuorumCertificate) Identity() consensus.Identity {
return consensus.Identity(g.Selector)
// GetSignature implements models.Unique.
func (g *QuorumCertificate) GetSignature() []byte {
return g.AggregateSignature.Signature
}
// Source implements models.Unique.
func (g *QuorumCertificate) Source() models.Identity {
return g.AggregateSignature.Identity()
}
// Source implements models.Unique.
func (g *QuorumCertificate) Identity() models.Identity {
return models.Identity(g.Selector)
}
// Source implements models.TimeoutCertificate.
func (g *TimeoutCertificate) Equals(other models.TimeoutCertificate) bool {
return bytes.Equal(g.Filter, other.GetFilter()) &&
g.Rank == other.GetRank() &&
slices.Equal(g.LatestRanks, other.GetLatestRanks()) &&
g.LatestQuorumCertificate.Equals(other.GetLatestQuorumCert())
}
func (
g *TimeoutCertificate,
) GetAggregatedSignature() models.AggregatedSignature {
return g.AggregateSignature
}
func (
g *TimeoutCertificate,
) GetLatestQuorumCert() models.QuorumCertificate {
return g.LatestQuorumCertificate
}
// Source implements models.Unique.
func (g *TimeoutCertificate) Clone() models.Unique {
return proto.Clone(g).(*TimeoutCertificate)
}
func (g *TimeoutCertificate) Identity() consensus.Identity {
return consensus.Identity(
// GetSignature implements models.Unique.
func (g *TimeoutCertificate) GetSignature() []byte {
return g.AggregateSignature.Signature
}
// Source implements models.Unique.
func (g *TimeoutCertificate) Source() models.Identity {
return models.Identity(
binary.BigEndian.AppendUint64(slices.Clone(g.Filter), g.Rank),
)
}
// Source implements models.Unique.
func (g *TimeoutCertificate) Identity() models.Identity {
return models.Identity(
binary.BigEndian.AppendUint64(slices.Clone(g.Filter), g.Rank),
)
}
// GetSignature implements models.Unique.
func (f *ProposalVote) Clone() models.Unique {
return proto.Clone(f).(*ProposalVote)
}
func (f *ProposalVote) Identity() consensus.Identity {
return consensus.Identity(f.PublicKeySignatureBls48581.Signature)
// GetSignature implements models.Unique.
func (f *ProposalVote) GetSignature() []byte {
return f.PublicKeySignatureBls48581.Signature
}
// Source implements models.Unique.
func (f *ProposalVote) Source() models.Identity {
return models.Identity(f.Selector)
}
// GetSignature implements models.Unique.
func (f *ProposalVote) Identity() models.Identity {
return models.Identity(f.PublicKeySignatureBls48581.Address)
}
func (g *GlobalFrame) Clone() models.Unique {
return proto.Clone(g).(*GlobalFrame)
}
// GetRank implements models.Unique.
func (g *GlobalFrame) GetRank() uint64 {
return g.Header.Rank
}
// GetSignature implements models.Unique.
func (g *GlobalFrame) GetSignature() []byte {
return g.Header.PublicKeySignatureBls48581.Signature
}
// GetTimestamp implements models.Unique.
func (g *GlobalFrame) GetTimestamp() uint64 {
return uint64(g.Header.Timestamp)
}
// Identity implements models.Unique.
func (g *GlobalFrame) Identity() models.Identity {
selectorBI, err := poseidon.HashBytes(g.Header.Output)
if err != nil {
return ""
}
return models.Identity(selectorBI.FillBytes(make([]byte, 32)))
}
// Source implements models.Unique.
func (g *GlobalFrame) Source() models.Identity {
return g.Header.PublicKeySignatureBls48581.Identity()
}
func (a *AppShardFrame) Clone() models.Unique {
return proto.Clone(a).(*AppShardFrame)
}
// GetRank implements models.Unique.
func (a *AppShardFrame) GetRank() uint64 {
return a.Header.Rank
}
// GetSignature implements models.Unique.
func (a *AppShardFrame) GetSignature() []byte {
return a.Header.PublicKeySignatureBls48581.Signature
}
// GetTimestamp implements models.Unique.
func (a *AppShardFrame) GetTimestamp() uint64 {
return uint64(a.Header.Timestamp)
}
// Identity implements models.Unique.
func (a *AppShardFrame) Identity() models.Identity {
selectorBI, err := poseidon.HashBytes(a.Header.Output)
if err != nil {
return ""
}
return models.Identity(selectorBI.FillBytes(make([]byte, 32)))
}
// Source implements models.Unique.
func (a *AppShardFrame) Source() models.Identity {
return a.Header.PublicKeySignatureBls48581.Identity()
}
func (s *SeniorityMerge) ToCanonicalBytes() ([]byte, error) {
@ -1477,6 +1617,11 @@ func (g *GlobalFrameHeader) ToCanonicalBytes() ([]byte, error) {
return nil, errors.Wrap(err, "to canonical bytes")
}
// Write rank
if err := binary.Write(buf, binary.BigEndian, g.Rank); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
// Write timestamp
if err := binary.Write(buf, binary.BigEndian, g.Timestamp); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
@ -1589,6 +1734,11 @@ func (g *GlobalFrameHeader) FromCanonicalBytes(data []byte) error {
return errors.Wrap(err, "from canonical bytes")
}
// Read rank
if err := binary.Read(buf, binary.BigEndian, &g.Rank); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
// Read timestamp
if err := binary.Read(buf, binary.BigEndian, &g.Timestamp); err != nil {
return errors.Wrap(err, "from canonical bytes")
@ -2219,6 +2369,175 @@ func (f *ProposalVote) FromCanonicalBytes(data []byte) error {
return nil
}
func (f *TimeoutState) ToCanonicalBytes() ([]byte, error) {
buf := new(bytes.Buffer)
// Write type prefix
if err := binary.Write(buf, binary.BigEndian, TimeoutStateType); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
// Write latest_quorum_certificate
latestQCBytes, err := f.LatestQuorumCertificate.ToCanonicalBytes()
if err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(latestQCBytes)),
); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if _, err := buf.Write(latestQCBytes); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
// Write prior_rank_timeout_certificate
if f.PriorRankTimeoutCertificate != nil {
priorTCBytes, err := f.PriorRankTimeoutCertificate.ToCanonicalBytes()
if err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(priorTCBytes)),
); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if _, err := buf.Write(priorTCBytes); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
} else {
if err := binary.Write(buf, binary.BigEndian, uint32(0)); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
}
// Write vote
if f.Vote != nil {
voteBytes, err := f.Vote.ToCanonicalBytes()
if err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(voteBytes)),
); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if _, err := buf.Write(voteBytes); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
} else {
if err := binary.Write(buf, binary.BigEndian, uint32(0)); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
}
// Write timeout_tick
if err := binary.Write(buf, binary.BigEndian, f.TimeoutTick); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
// Write timestamp
if err := binary.Write(buf, binary.BigEndian, f.Timestamp); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
return buf.Bytes(), nil
}
func (f *TimeoutState) FromCanonicalBytes(data []byte) error {
buf := bytes.NewBuffer(data)
// Read and verify type prefix
var typePrefix uint32
if err := binary.Read(buf, binary.BigEndian, &typePrefix); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
if typePrefix != TimeoutStateType {
return errors.Wrap(
errors.New("invalid type prefix"),
"from canonical bytes",
)
}
// Read latest_quorum_certificate
var latestQuorumCertLen uint32
if err := binary.Read(
buf,
binary.BigEndian,
&latestQuorumCertLen,
); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
if latestQuorumCertLen > 0 {
latestQuorumCertBytes := make([]byte, latestQuorumCertLen)
if _, err := buf.Read(latestQuorumCertBytes); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
f.LatestQuorumCertificate = &QuorumCertificate{}
if err := f.LatestQuorumCertificate.FromCanonicalBytes(
latestQuorumCertBytes,
); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
}
// Read prior_rank_timeout_certificate
var priorRankTimeoutCertLen uint32
if err := binary.Read(
buf,
binary.BigEndian,
&priorRankTimeoutCertLen,
); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
if priorRankTimeoutCertLen > 0 {
priorRankTimeoutBytes := make([]byte, priorRankTimeoutCertLen)
if _, err := buf.Read(priorRankTimeoutBytes); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
f.PriorRankTimeoutCertificate = &TimeoutCertificate{}
if err := f.PriorRankTimeoutCertificate.FromCanonicalBytes(
priorRankTimeoutBytes,
); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
}
// Read vote
var voteLen uint32
if err := binary.Read(buf, binary.BigEndian, &voteLen); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
if voteLen > 0 {
voteBytes := make([]byte, voteLen)
if _, err := buf.Read(voteBytes); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
f.Vote = &ProposalVote{}
if err := f.Vote.FromCanonicalBytes(voteBytes); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
}
// Read timeout_tick
if err := binary.Read(buf, binary.BigEndian, &f.TimeoutTick); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
// Read timestamp
if err := binary.Read(buf, binary.BigEndian, &f.Timestamp); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
return nil
}
func (f *QuorumCertificate) ToCanonicalBytes() ([]byte, error) {
buf := new(bytes.Buffer)
@ -2363,6 +2682,187 @@ func (f *QuorumCertificate) FromCanonicalBytes(data []byte) error {
return nil
}
func (t *TimeoutCertificate) ToCanonicalBytes() ([]byte, error) {
buf := new(bytes.Buffer)
// Write type prefix
if err := binary.Write(
buf,
binary.BigEndian,
TimeoutCertificateType,
); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
// Write filter
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(t.Filter)),
); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if _, err := buf.Write(t.Filter); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
// Write rank
if err := binary.Write(buf, binary.BigEndian, t.Rank); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
// Write latest_ranks
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(t.LatestRanks)),
); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
for _, r := range t.LatestRanks {
if err := binary.Write(buf, binary.BigEndian, r); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
}
// Write latest_quorum_certificate
if t.LatestQuorumCertificate != nil {
latestQCBytes, err := t.LatestQuorumCertificate.ToCanonicalBytes()
if err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(latestQCBytes)),
); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if _, err := buf.Write(latestQCBytes); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
} else {
if err := binary.Write(buf, binary.BigEndian, uint32(0)); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
}
// Write timestamp
if err := binary.Write(buf, binary.BigEndian, t.Timestamp); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
// Write aggregate_signature
if t.AggregateSignature != nil {
sigBytes, err := t.AggregateSignature.ToCanonicalBytes()
if err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(sigBytes)),
); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if _, err := buf.Write(sigBytes); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
} else {
if err := binary.Write(buf, binary.BigEndian, uint32(0)); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
}
return buf.Bytes(), nil
}
func (t *TimeoutCertificate) FromCanonicalBytes(data []byte) error {
buf := bytes.NewBuffer(data)
// Read and verify type prefix
var typePrefix uint32
if err := binary.Read(buf, binary.BigEndian, &typePrefix); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
if typePrefix != TimeoutCertificateType {
return errors.Wrap(
errors.New("invalid type prefix"),
"from canonical bytes",
)
}
// Read filter
var filterLen uint32
if err := binary.Read(buf, binary.BigEndian, &filterLen); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
t.Filter = make([]byte, filterLen)
if _, err := buf.Read(t.Filter); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
// Read rank
if err := binary.Read(buf, binary.BigEndian, &t.Rank); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
// Read latest_ranks
var latestRanksCount uint32
if err := binary.Read(buf, binary.BigEndian, &latestRanksCount); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
t.LatestRanks = make([]uint64, latestRanksCount)
if err := binary.Read(buf, binary.BigEndian, &t.LatestRanks); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
// Read latest_quorum_certificate
var latestQuorumCertLen uint32
if err := binary.Read(
buf,
binary.BigEndian,
&latestQuorumCertLen,
); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
if latestQuorumCertLen > 0 {
latestQuorumCertBytes := make([]byte, latestQuorumCertLen)
if _, err := buf.Read(latestQuorumCertBytes); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
t.LatestQuorumCertificate = &QuorumCertificate{}
if err := t.LatestQuorumCertificate.FromCanonicalBytes(
latestQuorumCertBytes,
); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
}
// Read timestamp
if err := binary.Read(buf, binary.BigEndian, &t.Timestamp); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
// Read aggregate_signature
var sigLen uint32
if err := binary.Read(buf, binary.BigEndian, &sigLen); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
if sigLen > 0 {
sigBytes := make([]byte, sigLen)
if _, err := buf.Read(sigBytes); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
t.AggregateSignature = &BLS48581AggregateSignature{}
if err := t.AggregateSignature.FromCanonicalBytes(sigBytes); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
}
return nil
}
func (g *GlobalFrame) ToCanonicalBytes() ([]byte, error) {
buf := new(bytes.Buffer)

File diff suppressed because it is too large Load Diff

View File

@ -117,44 +117,10 @@ message GlobalFrameHeader {
// A strictly monotonically-increasing frame number. Used for culling old
// frames past a configurable cutoff point.
uint64 frame_number = 1;
// The self-reported timestamp from the proof publisher, encoded as an int64
// of the Unix epoch in milliseconds. Should be good until
// 292278994-08-17 07:12:55.807, at which point, this is someone else's
// problem. Timestamps are imperfect, but smoothed in a rolling window to
// ensure a network and quorum-stable difficulty adjustment. Anomalies are
// bounded such that a timestamp beyond ten times the average issuance rate
// is discarded in preference to the runner up electees, unless there is
// simply no alternative available (for example, if a network outage occurred
// from an upgrade or bug).
int64 timestamp = 2;
// The difficulty level used for the frame. Difficulty is calculated based on
// the previous 60 timestamps correlated with difficulties, such that the
// interval smooths out to align to the type-defined rate. This is expected to
// increase subtly with clock speed and future hardware implementations, but
// due to incentive alignment associated with global proofs, not fastest clock
// in the west, should be gradual.
uint32 difficulty = 3;
// The output data from the VDF, serialized as bytes. For Wesolowski, this is
// an encoding of the 258 byte Y value concatenated with the 258 byte proof
// value.
bytes output = 4;
// The selector value of the previous frame's output, produced as a Poseidon
// hash of the output.
bytes parent_selector = 5;
// The 256 global commitment values
repeated bytes global_commitments = 6;
// The prover tree root commitment
bytes prover_tree_commitment = 7;
// The confirmation signatures of the frame
quilibrium.node.keys.pb.BLS48581AggregateSignature public_key_signature_bls48581 = 8;
}
message FrameHeader {
// The address dictates the depth of the shard, at a minimum, the app domain.
bytes address = 1;
// A strictly monotonically-increasing frame number. Used for culling old
// frames past a configurable cutoff point.
uint64 frame_number = 2;
// A strictly monotonically-increasing rank number. Disambiguates timeouts
// and allows for consistent determination of leader, without having to rely
// on parsing internal state.
uint64 rank = 2;
// The self-reported timestamp from the proof publisher, encoded as an int64
// of the Unix epoch in milliseconds. Should be good until
// 292278994-08-17 07:12:55.807, at which point, this is someone else's
@ -179,17 +145,59 @@ message FrameHeader {
// The selector value of the previous frame's output, produced as a Poseidon
// hash of the output.
bytes parent_selector = 6;
// The 256 global commitment values
repeated bytes global_commitments = 7;
// The prover tree root commitment
bytes prover_tree_commitment = 8;
// The confirmation signatures of the frame
quilibrium.node.keys.pb.BLS48581AggregateSignature public_key_signature_bls48581 = 9;
}
message FrameHeader {
// The address dictates the depth of the shard, at a minimum, the app domain.
bytes address = 1;
// A strictly monotonically-increasing frame number. Used for culling old
// frames past a configurable cutoff point.
uint64 frame_number = 2;
// A strictly monotonically-increasing rank number. Disambiguates timeouts
// and allows for consistent determination of leader, without having to rely
// on parsing internal state.
uint64 rank = 3;
// The self-reported timestamp from the proof publisher, encoded as an int64
// of the Unix epoch in milliseconds. Should be good until
// 292278994-08-17 07:12:55.807, at which point, this is someone else's
// problem. Timestamps are imperfect, but smoothed in a rolling window to
// ensure a network and quorum-stable difficulty adjustment. Anomalies are
// bounded such that a timestamp beyond ten times the average issuance rate
// is discarded in preference to the runner up electees, unless there is
// simply no alternative available (for example, if a network outage occurred
// from an upgrade or bug).
int64 timestamp = 4;
// The difficulty level used for the frame. Difficulty is calculated based on
// the previous 60 timestamps correlated with difficulties, such that the
// interval smooths out to align to the type-defined rate. This is expected to
// increase subtly with clock speed and future hardware implementations, but
// due to incentive alignment associated with global proofs, not fastest clock
// in the west, should be gradual.
uint32 difficulty = 5;
// The output data from the VDF, serialized as bytes. For Wesolowski, this is
// an encoding of the 258 byte Y value concatenated with the 258 byte proof
// value.
bytes output = 6;
// The selector value of the previous frame's output, produced as a Poseidon
// hash of the output.
bytes parent_selector = 7;
// The root commitment to the set of requests for the frame.
bytes requests_root = 7;
bytes requests_root = 8;
// The root commitments to to the hypergraph state at the address.
repeated bytes state_roots = 8;
repeated bytes state_roots = 9;
// The prover of the frame, incorporated into the input to the VDF.
bytes prover = 9;
bytes prover = 10;
// The prover's proposed fee multiplier, incorporated into sliding window
// averaging.
uint64 fee_multiplier_vote = 10;
uint64 fee_multiplier_vote = 11;
// The confirmation signatures of the frame
quilibrium.node.keys.pb.BLS48581AggregateSignature public_key_signature_bls48581 = 11;
quilibrium.node.keys.pb.BLS48581AggregateSignature public_key_signature_bls48581 = 12;
}
message ProverLivenessCheck {
@ -217,11 +225,32 @@ message ProposalVote {
// The selector being voted for
bytes selector = 4;
// The timestamp when the vote was created
int64 timestamp = 5;
uint64 timestamp = 5;
// The BLS signature with the voter's address
quilibrium.node.keys.pb.BLS48581AddressedSignature public_key_signature_bls48581 = 6;
}
message TimeoutState {
// The latest quorum certificate seen by the pacemaker.
QuorumCertificate latest_quorum_certificate = 1;
// The previous rank's timeout certificate, if applicable.
TimeoutCertificate prior_rank_timeout_certificate = 2;
// The signed payload which will become part of the new timeout certificate.
ProposalVote vote = 3;
// TimeoutTick is the number of times the `timeout.Controller` has
// (re-)emitted the timeout for this rank. When the timer for the rank's
// original duration expires, a `TimeoutState` with `TimeoutTick = 0` is
// broadcast. Subsequently, `timeout.Controller` re-broadcasts the
// `TimeoutState` periodically based on some internal heuristic. Each time
// we attempt a re-broadcast, the `TimeoutTick` is incremented. Incrementing
// the field prevents de-duplicated within the network layer, which in turn
// guarantees quick delivery of the `TimeoutState` after GST and facilitates
// recovery.
uint64 timeout_tick = 4;
// The timestamp of the message (not the timeout state)
uint64 timestamp = 5;
}
message QuorumCertificate {
// The filter for the prover's commitment in the trie
bytes filter = 1;
@ -231,8 +260,8 @@ message QuorumCertificate {
uint64 frame_number = 3;
// The selector (hash) of the confirmed frame
bytes selector = 4;
// The timestamp when the vote was created
int64 timestamp = 5;
// The timestamp of the message (not the certificate)
uint64 timestamp = 5;
// The aggregated BLS signature from all voters
quilibrium.node.keys.pb.BLS48581AggregateSignature aggregate_signature = 6;
}
@ -246,8 +275,10 @@ message TimeoutCertificate {
repeated uint64 latest_ranks = 3;
// The latest quorum certificate from all timeouts
QuorumCertificate latest_quorum_certificate = 4;
// The timestamp of the message (not the certificate)
uint64 timestamp = 5;
// The aggregated BLS signature from all voters
quilibrium.node.keys.pb.BLS48581AggregateSignature aggregate_signature = 5;
quilibrium.node.keys.pb.BLS48581AggregateSignature aggregate_signature = 6;
}
message GlobalFrame {

View File

@ -319,6 +319,10 @@ func (s *BLS48581AggregateSignature) Identity() string {
return string(s.GetPublicKey().GetKeyValue())
}
func (s *BLS48581AggregateSignature) GetPubKey() []byte {
return s.PublicKey.KeyValue
}
func (s *BLS48581Signature) Verify(
msg, context []byte,
blsVerifier BlsVerifier,