mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 18:37:26 +08:00
1291 lines
35 KiB
Go
1291 lines
35 KiB
Go
package consensus
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
// State represents a consensus engine state
|
|
type State string
|
|
|
|
const (
|
|
// StateStopped - Initial state, engine is not running
|
|
StateStopped State = "stopped"
|
|
// StateStarting - Engine is initializing
|
|
StateStarting State = "starting"
|
|
// StateLoading - Loading data and syncing with network
|
|
StateLoading State = "loading"
|
|
// StateCollecting - Collecting data for consensus round, prepares proposal
|
|
StateCollecting State = "collecting"
|
|
// StateLivenessCheck - Announces and awaits prover liveness
|
|
StateLivenessCheck State = "liveness_check"
|
|
// StateProving - Generating proof (prover only)
|
|
StateProving State = "proving"
|
|
// StatePublishing - Publishing relevant state
|
|
StatePublishing State = "publishing"
|
|
// StateVoting - Voting on proposals
|
|
StateVoting State = "voting"
|
|
// StateFinalizing - Finalizing consensus round
|
|
StateFinalizing State = "finalizing"
|
|
// StateVerifying - Verifying and publishing results
|
|
StateVerifying State = "verifying"
|
|
// StateStopping - Engine is shutting down
|
|
StateStopping State = "stopping"
|
|
)
|
|
|
|
// Event represents an event that can trigger state transitions
|
|
type Event string
|
|
|
|
const (
|
|
EventStart Event = "start"
|
|
EventStop Event = "stop"
|
|
EventSyncTimeout Event = "sync_timeout"
|
|
EventInduceSync Event = "induce_sync"
|
|
EventSyncComplete Event = "sync_complete"
|
|
EventInitComplete Event = "init_complete"
|
|
EventCollectionDone Event = "collection_done"
|
|
EventLivenessCheckReceived Event = "liveness_check_received"
|
|
EventLivenessTimeout Event = "liveness_timeout"
|
|
EventProverSignal Event = "prover_signal"
|
|
EventProofComplete Event = "proof_complete"
|
|
EventPublishComplete Event = "publish_complete"
|
|
EventPublishTimeout Event = "publish_timeout"
|
|
EventProposalReceived Event = "proposal_received"
|
|
EventVoteReceived Event = "vote_received"
|
|
EventQuorumReached Event = "quorum_reached"
|
|
EventVotingTimeout Event = "voting_timeout"
|
|
EventAggregationDone Event = "aggregation_done"
|
|
EventAggregationTimeout Event = "aggregation_timeout"
|
|
EventConfirmationReceived Event = "confirmation_received"
|
|
EventVerificationDone Event = "verification_done"
|
|
EventVerificationTimeout Event = "verification_timeout"
|
|
EventCleanupComplete Event = "cleanup_complete"
|
|
)
|
|
|
|
type Identity = string
|
|
|
|
// Unique defines important attributes for distinguishing relative basis of
|
|
// items.
|
|
type Unique interface {
|
|
// Provides the relevant identity of the given Unique.
|
|
Identity() Identity
|
|
// Clone should provide a shallow clone of the Unique.
|
|
Clone() Unique
|
|
// Rank indicates the ordinal basis of comparison, e.g. a frame number, a
|
|
// height.
|
|
Rank() uint64
|
|
}
|
|
|
|
// TransitionGuard is a function that determines if a transition should occur
|
|
type TransitionGuard[
|
|
StateT Unique,
|
|
VoteT Unique,
|
|
PeerIDT Unique,
|
|
CollectedT Unique,
|
|
] func(sm *StateMachine[StateT, VoteT, PeerIDT, CollectedT]) bool
|
|
|
|
// Transition defines a state transition
|
|
type Transition[
|
|
StateT Unique,
|
|
VoteT Unique,
|
|
PeerIDT Unique,
|
|
CollectedT Unique,
|
|
] struct {
|
|
From State
|
|
Event Event
|
|
To State
|
|
Guard TransitionGuard[StateT, VoteT, PeerIDT, CollectedT]
|
|
}
|
|
|
|
// TransitionListener is notified of state transitions
|
|
type TransitionListener[StateT Unique] interface {
|
|
OnTransition(
|
|
from State,
|
|
to State,
|
|
event Event,
|
|
)
|
|
}
|
|
|
|
type eventWrapper struct {
|
|
event Event
|
|
response chan error
|
|
}
|
|
|
|
// SyncProvider handles synchronization management
|
|
type SyncProvider[StateT Unique] interface {
|
|
// Performs synchronization to set internal state. Note that it is assumed
|
|
// that errors are transient and synchronization should be reattempted on
|
|
// failure. If some other process for synchronization is used and this should
|
|
// be bypassed, send nil on the error channel. Provided context may be
|
|
// canceled, should be used to halt long-running sync operations.
|
|
Synchronize(
|
|
existing *StateT,
|
|
ctx context.Context,
|
|
) (<-chan *StateT, <-chan error)
|
|
}
|
|
|
|
// VotingProvider handles voting logic by deferring decisions, collection, and
|
|
// state finalization to an outside implementation.
|
|
type VotingProvider[StateT Unique, VoteT Unique, PeerIDT Unique] interface {
|
|
// Sends a proposal for voting.
|
|
SendProposal(proposal *StateT, ctx context.Context) error
|
|
// DecideAndSendVote makes a decision, mapped by leader, and should handle any
|
|
// side effects (like publishing vote messages).
|
|
DecideAndSendVote(
|
|
proposals map[Identity]*StateT,
|
|
ctx context.Context,
|
|
) (PeerIDT, *VoteT, error)
|
|
// IsQuorum returns a response indicating whether or not quorum has been
|
|
// reached.
|
|
IsQuorum(
|
|
proposalVotes map[Identity]*VoteT,
|
|
ctx context.Context,
|
|
) (bool, error)
|
|
// FinalizeVotes performs any folding of proposed state required from VoteT
|
|
// onto StateT, proposed states and votes matched by PeerIDT, returns
|
|
// finalized state, chosen proposer PeerIDT.
|
|
FinalizeVotes(
|
|
proposals map[Identity]*StateT,
|
|
proposalVotes map[Identity]*VoteT,
|
|
ctx context.Context,
|
|
) (*StateT, PeerIDT, error)
|
|
// SendConfirmation sends confirmation of the finalized state.
|
|
SendConfirmation(finalized *StateT, ctx context.Context) error
|
|
}
|
|
|
|
// LeaderProvider handles leader selection. State is provided, if relevant to
|
|
// the upstream consensus engine.
|
|
type LeaderProvider[
|
|
StateT Unique,
|
|
PeerIDT Unique,
|
|
CollectedT Unique,
|
|
] interface {
|
|
// GetNextLeaders returns a list of node indices, in priority order. Note that
|
|
// it is assumed that if no error is returned, GetNextLeaders should produce
|
|
// a non-empty list. If a list of size smaller than minimumProvers is
|
|
// provided, the liveness check will loop until the list is greater than that.
|
|
GetNextLeaders(prior *StateT, ctx context.Context) ([]PeerIDT, error)
|
|
// ProveNextState prepares a non-finalized new state from the prior, to be
|
|
// proposed and voted upon. Provided context may be canceled, should be used
|
|
// to halt long-running prover operations.
|
|
ProveNextState(
|
|
prior *StateT,
|
|
collected CollectedT,
|
|
ctx context.Context,
|
|
) (*StateT, error)
|
|
}
|
|
|
|
// LivenessProvider handles liveness announcements ahead of proving, to
|
|
// pre-emptively choose the next prover. In expected leader scenarios, this
|
|
// enables a peer to determine if an honest next prover is offline, so that it
|
|
// can publish the next state without waiting.
|
|
type LivenessProvider[
|
|
StateT Unique,
|
|
PeerIDT Unique,
|
|
CollectedT Unique,
|
|
] interface {
|
|
// Collect returns the collected mutation operations ahead of liveness
|
|
// announcements.
|
|
Collect(ctx context.Context) (CollectedT, error)
|
|
// SendLiveness announces liveness ahead of the next prover deterimination and
|
|
// subsequent proving. Provides prior state and collected mutation operations
|
|
// if relevant.
|
|
SendLiveness(prior *StateT, collected CollectedT, ctx context.Context) error
|
|
}
|
|
|
|
// TraceLogger defines a simple tracing interface
|
|
type TraceLogger interface {
|
|
Trace(message string)
|
|
Error(message string, err error)
|
|
}
|
|
|
|
type nilTracer struct{}
|
|
|
|
func (nilTracer) Trace(message string) {}
|
|
func (nilTracer) Error(message string, err error) {}
|
|
|
|
// StateMachine manages consensus engine state transitions with generic state
|
|
// tracking. T represents the raw state bearing type, the implementation details
|
|
// are left to callers, who may augment their transitions to utilize the data
|
|
// if needed. If no method of fork choice is utilized external to this machine,
|
|
// this state machine provides BFT consensus (e.g. < 1/3 byzantine behaviors)
|
|
// provided assumptions outlined in interface types are fulfilled. The state
|
|
// transition patterns strictly assume a round-based state transition using
|
|
// cryptographic proofs.
|
|
//
|
|
// This implementation requires implementations of specific patterns:
|
|
// - A need to synchronize state from peers (SyncProvider)
|
|
// - A need to record voting from the upstream consumer to decide on consensus
|
|
// changes during the voting period (VotingProvider)
|
|
// - A need to decide on the next leader and prove (LeaderProvider)
|
|
// - A need to announce liveness ahead of long-running proof operations
|
|
// (LivenessProvider)
|
|
type StateMachine[
|
|
StateT Unique,
|
|
VoteT Unique,
|
|
PeerIDT Unique,
|
|
CollectedT Unique,
|
|
] struct {
|
|
mu sync.RWMutex
|
|
transitions map[State]map[Event]*Transition[
|
|
StateT, VoteT, PeerIDT, CollectedT,
|
|
]
|
|
stateConfigs map[State]*StateConfig[
|
|
StateT, VoteT, PeerIDT, CollectedT,
|
|
]
|
|
eventChan chan eventWrapper
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
timeoutTimer *time.Timer
|
|
behaviorCancel context.CancelFunc
|
|
|
|
// Internal state
|
|
machineState State
|
|
activeState *StateT
|
|
nextState *StateT
|
|
collected *CollectedT
|
|
id PeerIDT
|
|
nextProvers []PeerIDT
|
|
liveness map[uint64]map[Identity]CollectedT
|
|
votes map[uint64]map[Identity]*VoteT
|
|
proposals map[uint64]map[Identity]*StateT
|
|
confirmations map[uint64]map[Identity]*StateT
|
|
chosenProposer *PeerIDT
|
|
stateStartTime time.Time
|
|
transitionCount uint64
|
|
listeners []TransitionListener[StateT]
|
|
shouldEmitReceiveEventsOnSends bool
|
|
minimumProvers uint64
|
|
|
|
// Dependencies
|
|
syncProvider SyncProvider[StateT]
|
|
votingProvider VotingProvider[StateT, VoteT, PeerIDT]
|
|
leaderProvider LeaderProvider[StateT, PeerIDT, CollectedT]
|
|
livenessProvider LivenessProvider[StateT, PeerIDT, CollectedT]
|
|
traceLogger TraceLogger
|
|
}
|
|
|
|
// StateConfig defines configuration for a state with generic behaviors
|
|
type StateConfig[
|
|
StateT Unique,
|
|
VoteT Unique,
|
|
PeerIDT Unique,
|
|
CollectedT Unique,
|
|
] struct {
|
|
// Callbacks for state entry/exit
|
|
OnEnter StateCallback[StateT, VoteT, PeerIDT, CollectedT]
|
|
OnExit StateCallback[StateT, VoteT, PeerIDT, CollectedT]
|
|
|
|
// State behavior - runs continuously while in state
|
|
Behavior StateBehavior[StateT, VoteT, PeerIDT, CollectedT]
|
|
|
|
// Timeout configuration
|
|
Timeout time.Duration
|
|
OnTimeout Event
|
|
}
|
|
|
|
// StateCallback is called when entering or exiting a state
|
|
type StateCallback[
|
|
StateT Unique,
|
|
VoteT Unique,
|
|
PeerIDT Unique,
|
|
CollectedT Unique,
|
|
] func(
|
|
sm *StateMachine[StateT, VoteT, PeerIDT, CollectedT],
|
|
data *StateT,
|
|
event Event,
|
|
)
|
|
|
|
// StateBehavior defines the behavior while in a state
|
|
type StateBehavior[
|
|
StateT Unique,
|
|
VoteT Unique,
|
|
PeerIDT Unique,
|
|
CollectedT Unique,
|
|
] func(
|
|
sm *StateMachine[StateT, VoteT, PeerIDT, CollectedT],
|
|
data *StateT,
|
|
ctx context.Context,
|
|
)
|
|
|
|
// NewStateMachine creates a new generic state machine for consensus.
|
|
// `initialState` should be provided if available, this does not set the
|
|
// position of the state machine however, consumers will need to manually force
|
|
// a state machine's internal state if desired. Assumes some variety of pubsub-
|
|
// based semantics are used in send/receive based operations, if the pubsub
|
|
// implementation chosen does not receive messages published by itself, set
|
|
// shouldEmitReceiveEventsOnSends to true.
|
|
func NewStateMachine[
|
|
StateT Unique,
|
|
VoteT Unique,
|
|
PeerIDT Unique,
|
|
CollectedT Unique,
|
|
](
|
|
id PeerIDT,
|
|
initialState *StateT,
|
|
shouldEmitReceiveEventsOnSends bool,
|
|
minimumProvers uint64,
|
|
syncProvider SyncProvider[StateT],
|
|
votingProvider VotingProvider[StateT, VoteT, PeerIDT],
|
|
leaderProvider LeaderProvider[StateT, PeerIDT, CollectedT],
|
|
livenessProvider LivenessProvider[StateT, PeerIDT, CollectedT],
|
|
traceLogger TraceLogger,
|
|
) *StateMachine[StateT, VoteT, PeerIDT, CollectedT] {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
if traceLogger == nil {
|
|
traceLogger = nilTracer{}
|
|
}
|
|
sm := &StateMachine[StateT, VoteT, PeerIDT, CollectedT]{
|
|
machineState: StateStopped,
|
|
transitions: make(
|
|
map[State]map[Event]*Transition[StateT, VoteT, PeerIDT, CollectedT],
|
|
),
|
|
stateConfigs: make(
|
|
map[State]*StateConfig[StateT, VoteT, PeerIDT, CollectedT],
|
|
),
|
|
eventChan: make(chan eventWrapper, 100),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
activeState: initialState,
|
|
id: id,
|
|
votes: make(map[uint64]map[Identity]*VoteT),
|
|
proposals: make(map[uint64]map[Identity]*StateT),
|
|
liveness: make(map[uint64]map[Identity]CollectedT),
|
|
confirmations: make(map[uint64]map[Identity]*StateT),
|
|
listeners: make([]TransitionListener[StateT], 0),
|
|
shouldEmitReceiveEventsOnSends: shouldEmitReceiveEventsOnSends,
|
|
minimumProvers: minimumProvers,
|
|
syncProvider: syncProvider,
|
|
votingProvider: votingProvider,
|
|
leaderProvider: leaderProvider,
|
|
livenessProvider: livenessProvider,
|
|
traceLogger: traceLogger,
|
|
}
|
|
|
|
// Define state configurations
|
|
sm.defineStateConfigs()
|
|
|
|
// Define transitions
|
|
sm.defineTransitions()
|
|
|
|
// Start event processor
|
|
go sm.processEvents()
|
|
|
|
return sm
|
|
}
|
|
|
|
// defineStateConfigs sets up state configurations with behaviors
|
|
func (sm *StateMachine[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]) defineStateConfigs() {
|
|
sm.traceLogger.Trace("enter defineStateConfigs")
|
|
defer sm.traceLogger.Trace("exit defineStateConfigs")
|
|
// Starting state - just timeout to complete initialization
|
|
sm.stateConfigs[StateStarting] = &StateConfig[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]{
|
|
Timeout: 1 * time.Second,
|
|
OnTimeout: EventInitComplete,
|
|
}
|
|
|
|
type Config = StateConfig[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]
|
|
|
|
type SMT = StateMachine[StateT, VoteT, PeerIDT, CollectedT]
|
|
|
|
// Loading state - synchronize with network
|
|
sm.stateConfigs[StateLoading] = &Config{
|
|
Behavior: func(sm *SMT, data *StateT, ctx context.Context) {
|
|
sm.traceLogger.Trace("enter Loading behavior")
|
|
defer sm.traceLogger.Trace("exit Loading behavior")
|
|
if sm.syncProvider != nil {
|
|
newStateCh, errCh := sm.syncProvider.Synchronize(sm.activeState, ctx)
|
|
select {
|
|
case newState := <-newStateCh:
|
|
sm.mu.Lock()
|
|
sm.activeState = newState
|
|
sm.mu.Unlock()
|
|
nextLeaders, err := sm.leaderProvider.GetNextLeaders(data, ctx)
|
|
if err != nil {
|
|
sm.traceLogger.Error(
|
|
fmt.Sprintf("error encountered in %s", sm.machineState),
|
|
err,
|
|
)
|
|
return
|
|
}
|
|
found := false
|
|
for _, leader := range nextLeaders {
|
|
if leader.Identity() == sm.id.Identity() {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if found {
|
|
sm.SendEvent(EventSyncComplete)
|
|
} else {
|
|
sm.SendEvent(EventSyncTimeout)
|
|
}
|
|
case <-errCh:
|
|
sm.SendEvent(EventSyncTimeout)
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
},
|
|
Timeout: 10 * time.Hour,
|
|
OnTimeout: EventSyncTimeout,
|
|
}
|
|
|
|
// Collecting state - wait for frame or timeout
|
|
sm.stateConfigs[StateCollecting] = &Config{
|
|
Behavior: func(sm *SMT, data *StateT, ctx context.Context) {
|
|
sm.traceLogger.Trace("enter Collecting behavior")
|
|
defer sm.traceLogger.Trace("exit Collecting behavior")
|
|
collected, err := sm.livenessProvider.Collect(ctx)
|
|
if err != nil {
|
|
sm.traceLogger.Error(
|
|
fmt.Sprintf("error encountered in %s", sm.machineState),
|
|
err,
|
|
)
|
|
return
|
|
}
|
|
|
|
sm.mu.Lock()
|
|
sm.nextProvers = []PeerIDT{}
|
|
sm.chosenProposer = nil
|
|
sm.collected = &collected
|
|
sm.mu.Unlock()
|
|
|
|
nextProvers, err := sm.leaderProvider.GetNextLeaders(data, ctx)
|
|
if err != nil {
|
|
sm.traceLogger.Error(
|
|
fmt.Sprintf("error encountered in %s", sm.machineState),
|
|
err,
|
|
)
|
|
return
|
|
}
|
|
|
|
sm.mu.Lock()
|
|
sm.nextProvers = nextProvers
|
|
sm.mu.Unlock()
|
|
|
|
err = sm.livenessProvider.SendLiveness(data, collected, ctx)
|
|
if err != nil {
|
|
sm.traceLogger.Error(
|
|
fmt.Sprintf("error encountered in %s", sm.machineState),
|
|
err,
|
|
)
|
|
return
|
|
}
|
|
|
|
sm.mu.Lock()
|
|
if sm.shouldEmitReceiveEventsOnSends {
|
|
if _, ok := sm.liveness[collected.Rank()]; !ok {
|
|
sm.liveness[collected.Rank()] = make(map[Identity]CollectedT)
|
|
}
|
|
sm.liveness[collected.Rank()][sm.id.Identity()] = *sm.collected
|
|
}
|
|
sm.mu.Unlock()
|
|
|
|
if sm.shouldEmitReceiveEventsOnSends {
|
|
sm.SendEvent(EventLivenessCheckReceived)
|
|
}
|
|
|
|
sm.SendEvent(EventCollectionDone)
|
|
},
|
|
Timeout: 1 * time.Second,
|
|
OnTimeout: EventCollectionDone,
|
|
}
|
|
|
|
// Liveness check state
|
|
sm.stateConfigs[StateLivenessCheck] = &Config{
|
|
Behavior: func(sm *SMT, data *StateT, ctx context.Context) {
|
|
sm.traceLogger.Trace("enter Liveness behavior")
|
|
defer sm.traceLogger.Trace("exit Liveness behavior")
|
|
sm.mu.Lock()
|
|
nextProversLen := len(sm.nextProvers)
|
|
sm.mu.Unlock()
|
|
|
|
// If we're not meeting the minimum prover count, we should loop.
|
|
if nextProversLen < int(sm.minimumProvers) {
|
|
var err error
|
|
nextProvers, err := sm.leaderProvider.GetNextLeaders(data, ctx)
|
|
if err != nil {
|
|
sm.traceLogger.Error(
|
|
fmt.Sprintf("error encountered in %s", sm.machineState),
|
|
err,
|
|
)
|
|
return
|
|
}
|
|
sm.mu.Lock()
|
|
sm.nextProvers = nextProvers
|
|
sm.mu.Unlock()
|
|
}
|
|
|
|
sm.mu.Lock()
|
|
collected := *sm.collected
|
|
sm.mu.Unlock()
|
|
|
|
sm.mu.Lock()
|
|
livenessLen := len(sm.liveness[(*sm.activeState).Rank()+1])
|
|
sm.mu.Unlock()
|
|
|
|
// We have enough checks for consensus:
|
|
if livenessLen >= int(sm.minimumProvers) {
|
|
sm.SendEvent(EventProverSignal)
|
|
return
|
|
}
|
|
|
|
err := sm.livenessProvider.SendLiveness(data, collected, ctx)
|
|
if err != nil {
|
|
sm.traceLogger.Error(
|
|
fmt.Sprintf("error encountered in %s", sm.machineState),
|
|
err,
|
|
)
|
|
return
|
|
}
|
|
},
|
|
Timeout: 1 * time.Second,
|
|
OnTimeout: EventLivenessTimeout,
|
|
}
|
|
|
|
// Proving state - generate proof
|
|
sm.stateConfigs[StateProving] = &Config{
|
|
Behavior: func(sm *SMT, data *StateT, ctx context.Context) {
|
|
sm.traceLogger.Trace("enter Proving behavior")
|
|
defer sm.traceLogger.Trace("exit Proving behavior")
|
|
sm.mu.Lock()
|
|
collected := sm.collected
|
|
sm.mu.Unlock()
|
|
|
|
if collected == nil {
|
|
return
|
|
}
|
|
|
|
proposal, err := sm.leaderProvider.ProveNextState(
|
|
data,
|
|
*collected,
|
|
ctx,
|
|
)
|
|
if err != nil {
|
|
sm.traceLogger.Error(
|
|
fmt.Sprintf("error encountered in %s", sm.machineState),
|
|
err,
|
|
)
|
|
|
|
return
|
|
}
|
|
|
|
sm.mu.Lock()
|
|
sm.traceLogger.Trace(
|
|
fmt.Sprintf("adding proposal with rank %d", (*proposal).Rank()),
|
|
)
|
|
if _, ok := sm.proposals[(*proposal).Rank()]; !ok {
|
|
sm.proposals[(*proposal).Rank()] = make(map[Identity]*StateT)
|
|
}
|
|
sm.proposals[(*proposal).Rank()][sm.id.Identity()] = proposal
|
|
sm.mu.Unlock()
|
|
|
|
sm.SendEvent(EventProofComplete)
|
|
},
|
|
Timeout: 120 * time.Second,
|
|
OnTimeout: EventPublishTimeout,
|
|
}
|
|
|
|
// Publishing state - publish frame
|
|
sm.stateConfigs[StatePublishing] = &Config{
|
|
Behavior: func(sm *SMT, data *StateT, ctx context.Context) {
|
|
sm.traceLogger.Trace("enter Publishing behavior")
|
|
defer sm.traceLogger.Trace("exit Publishing behavior")
|
|
sm.mu.Lock()
|
|
if _, ok := sm.proposals[(*data).Rank()+1][sm.id.Identity()]; ok {
|
|
proposal := sm.proposals[(*data).Rank()+1][sm.id.Identity()]
|
|
sm.mu.Unlock()
|
|
|
|
err := sm.votingProvider.SendProposal(
|
|
proposal,
|
|
ctx,
|
|
)
|
|
if err != nil {
|
|
sm.traceLogger.Error(
|
|
fmt.Sprintf("error encountered in %s", sm.machineState),
|
|
err,
|
|
)
|
|
return
|
|
}
|
|
sm.SendEvent(EventPublishComplete)
|
|
}
|
|
},
|
|
Timeout: 1 * time.Second,
|
|
OnTimeout: EventPublishTimeout,
|
|
}
|
|
|
|
// Voting state - monitor for quorum
|
|
sm.stateConfigs[StateVoting] = &Config{
|
|
Behavior: func(sm *SMT, data *StateT, ctx context.Context) {
|
|
sm.traceLogger.Trace("enter Voting behavior")
|
|
defer sm.traceLogger.Trace("exit Voting behavior")
|
|
|
|
sm.mu.Lock()
|
|
|
|
if sm.chosenProposer == nil {
|
|
// We haven't voted yet
|
|
perfect := map[int]PeerIDT{} // all provers
|
|
live := map[int]PeerIDT{} // the provers who told us they're alive
|
|
for i, p := range sm.nextProvers {
|
|
perfect[i] = p
|
|
if _, ok := sm.liveness[(*sm.activeState).Rank()+1][p.Identity()]; ok {
|
|
live[i] = p
|
|
}
|
|
}
|
|
|
|
if len(sm.proposals[(*sm.activeState).Rank()+1]) < int(sm.minimumProvers) {
|
|
sm.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
if ctx == nil {
|
|
sm.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
sm.mu.Unlock()
|
|
return
|
|
default:
|
|
proposals := map[Identity]*StateT{}
|
|
for k, v := range sm.proposals[(*sm.activeState).Rank()+1] {
|
|
state := (*v).Clone().(StateT)
|
|
proposals[k] = &state
|
|
}
|
|
|
|
sm.mu.Unlock()
|
|
selectedPeer, vote, err := sm.votingProvider.DecideAndSendVote(
|
|
proposals,
|
|
ctx,
|
|
)
|
|
if err != nil {
|
|
sm.traceLogger.Error(
|
|
fmt.Sprintf("error encountered in %s", sm.machineState),
|
|
err,
|
|
)
|
|
break
|
|
}
|
|
sm.mu.Lock()
|
|
sm.chosenProposer = &selectedPeer
|
|
|
|
if sm.shouldEmitReceiveEventsOnSends {
|
|
if _, ok := sm.votes[(*sm.activeState).Rank()+1]; !ok {
|
|
sm.votes[(*sm.activeState).Rank()+1] = make(map[Identity]*VoteT)
|
|
}
|
|
sm.votes[(*sm.activeState).Rank()+1][sm.id.Identity()] = vote
|
|
sm.mu.Unlock()
|
|
sm.SendEvent(EventVoteReceived)
|
|
return
|
|
}
|
|
sm.mu.Unlock()
|
|
}
|
|
} else {
|
|
proposalVotes := map[Identity]*VoteT{}
|
|
for p, vp := range sm.votes[(*sm.activeState).Rank()+1] {
|
|
vclone := (*vp).Clone().(VoteT)
|
|
proposalVotes[p] = &vclone
|
|
}
|
|
haveEnoughProposals := len(sm.proposals[(*sm.activeState).Rank()+1]) >=
|
|
int(sm.minimumProvers)
|
|
sm.mu.Unlock()
|
|
isQuorum, err := sm.votingProvider.IsQuorum(proposalVotes, ctx)
|
|
if err != nil {
|
|
sm.traceLogger.Error(
|
|
fmt.Sprintf("error encountered in %s", sm.machineState),
|
|
err,
|
|
)
|
|
return
|
|
}
|
|
|
|
if isQuorum && haveEnoughProposals {
|
|
sm.SendEvent(EventQuorumReached)
|
|
}
|
|
}
|
|
},
|
|
Timeout: 1 * time.Second,
|
|
OnTimeout: EventVotingTimeout,
|
|
}
|
|
|
|
// Finalizing state
|
|
sm.stateConfigs[StateFinalizing] = &Config{
|
|
Behavior: func(sm *SMT, data *StateT, ctx context.Context) {
|
|
sm.mu.Lock()
|
|
proposals := map[Identity]*StateT{}
|
|
for k, v := range sm.proposals[(*sm.activeState).Rank()+1] {
|
|
state := (*v).Clone().(StateT)
|
|
proposals[k] = &state
|
|
}
|
|
proposalVotes := map[Identity]*VoteT{}
|
|
for p, vp := range sm.votes[(*sm.activeState).Rank()+1] {
|
|
vclone := (*vp).Clone().(VoteT)
|
|
proposalVotes[p] = &vclone
|
|
}
|
|
sm.mu.Unlock()
|
|
finalized, _, err := sm.votingProvider.FinalizeVotes(
|
|
proposals,
|
|
proposalVotes,
|
|
ctx,
|
|
)
|
|
if err != nil {
|
|
sm.traceLogger.Error(
|
|
fmt.Sprintf("error encountered in %s", sm.machineState),
|
|
err,
|
|
)
|
|
return
|
|
}
|
|
next := (*finalized).Clone().(StateT)
|
|
sm.mu.Lock()
|
|
sm.nextState = &next
|
|
sm.mu.Unlock()
|
|
sm.SendEvent(EventAggregationDone)
|
|
},
|
|
Timeout: 1 * time.Second,
|
|
OnTimeout: EventAggregationTimeout,
|
|
}
|
|
|
|
// Verifying state
|
|
sm.stateConfigs[StateVerifying] = &Config{
|
|
Behavior: func(sm *SMT, data *StateT, ctx context.Context) {
|
|
sm.traceLogger.Trace("enter Verifying behavior")
|
|
defer sm.traceLogger.Trace("exit Verifying behavior")
|
|
sm.mu.Lock()
|
|
if _, ok := sm.confirmations[(*sm.activeState).Rank()+1][sm.id.Identity()]; !ok &&
|
|
sm.nextState != nil {
|
|
nextState := sm.nextState
|
|
sm.mu.Unlock()
|
|
err := sm.votingProvider.SendConfirmation(nextState, ctx)
|
|
if err != nil {
|
|
sm.traceLogger.Error(
|
|
fmt.Sprintf("error encountered in %s", sm.machineState),
|
|
err,
|
|
)
|
|
}
|
|
sm.mu.Lock()
|
|
}
|
|
|
|
progressed := false
|
|
if sm.nextState != nil {
|
|
sm.activeState = sm.nextState
|
|
progressed = true
|
|
}
|
|
if progressed {
|
|
sm.nextState = nil
|
|
sm.collected = nil
|
|
delete(sm.liveness, (*sm.activeState).Rank())
|
|
delete(sm.proposals, (*sm.activeState).Rank())
|
|
delete(sm.votes, (*sm.activeState).Rank())
|
|
delete(sm.confirmations, (*sm.activeState).Rank())
|
|
sm.mu.Unlock()
|
|
sm.SendEvent(EventVerificationDone)
|
|
} else {
|
|
sm.mu.Unlock()
|
|
}
|
|
},
|
|
Timeout: 1 * time.Second,
|
|
OnTimeout: EventVerificationTimeout,
|
|
}
|
|
|
|
// Stopping state
|
|
sm.stateConfigs[StateStopping] = &Config{
|
|
Behavior: func(sm *SMT, data *StateT, ctx context.Context) {
|
|
sm.SendEvent(EventCleanupComplete)
|
|
},
|
|
Timeout: 30 * time.Second,
|
|
OnTimeout: EventCleanupComplete,
|
|
}
|
|
}
|
|
|
|
// defineTransitions sets up all possible state transitions
|
|
func (sm *StateMachine[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]) defineTransitions() {
|
|
sm.traceLogger.Trace("enter defineTransitions")
|
|
defer sm.traceLogger.Trace("exit defineTransitions")
|
|
|
|
// Helper to add transition
|
|
addTransition := func(
|
|
from State,
|
|
event Event,
|
|
to State,
|
|
guard TransitionGuard[StateT, VoteT, PeerIDT, CollectedT],
|
|
) {
|
|
if sm.transitions[from] == nil {
|
|
sm.transitions[from] = make(map[Event]*Transition[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
])
|
|
}
|
|
sm.transitions[from][event] = &Transition[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]{
|
|
From: from,
|
|
Event: event,
|
|
To: to,
|
|
Guard: guard,
|
|
}
|
|
}
|
|
|
|
// Basic flow transitions
|
|
addTransition(StateStopped, EventStart, StateStarting, nil)
|
|
addTransition(StateStarting, EventInitComplete, StateLoading, nil)
|
|
addTransition(StateLoading, EventSyncTimeout, StateLoading, nil)
|
|
addTransition(StateLoading, EventSyncComplete, StateCollecting, nil)
|
|
addTransition(StateCollecting, EventCollectionDone, StateLivenessCheck, nil)
|
|
addTransition(StateLivenessCheck, EventProverSignal, StateProving, nil)
|
|
|
|
// Loop indefinitely if nobody can be found
|
|
addTransition(
|
|
StateLivenessCheck,
|
|
EventLivenessTimeout,
|
|
StateLivenessCheck,
|
|
nil,
|
|
)
|
|
// Loop until we get enough of these
|
|
addTransition(
|
|
StateLivenessCheck,
|
|
EventLivenessCheckReceived,
|
|
StateLivenessCheck,
|
|
nil,
|
|
)
|
|
|
|
// Prover flow
|
|
addTransition(StateProving, EventProofComplete, StatePublishing, nil)
|
|
addTransition(StateProving, EventPublishTimeout, StateVoting, nil)
|
|
addTransition(StatePublishing, EventPublishComplete, StateVoting, nil)
|
|
addTransition(StatePublishing, EventPublishTimeout, StateVoting, nil)
|
|
|
|
// Common voting flow
|
|
addTransition(StateVoting, EventProposalReceived, StateVoting, nil)
|
|
addTransition(StateVoting, EventVoteReceived, StateVoting, nil)
|
|
addTransition(StateVoting, EventQuorumReached, StateFinalizing, nil)
|
|
addTransition(StateVoting, EventVotingTimeout, StateVoting, nil)
|
|
addTransition(StateFinalizing, EventAggregationDone, StateVerifying, nil)
|
|
addTransition(StateFinalizing, EventAggregationTimeout, StateFinalizing, nil)
|
|
addTransition(StateVerifying, EventVerificationDone, StateCollecting, nil)
|
|
addTransition(StateVerifying, EventVerificationTimeout, StateVerifying, nil)
|
|
|
|
// Stop or induce Sync transitions from any state
|
|
for _, state := range []State{
|
|
StateStarting,
|
|
StateLoading,
|
|
StateCollecting,
|
|
StateLivenessCheck,
|
|
StateProving,
|
|
StatePublishing,
|
|
StateVoting,
|
|
StateFinalizing,
|
|
StateVerifying,
|
|
} {
|
|
addTransition(state, EventStop, StateStopping, nil)
|
|
addTransition(state, EventInduceSync, StateLoading, nil)
|
|
}
|
|
|
|
addTransition(StateStopping, EventCleanupComplete, StateStopped, nil)
|
|
}
|
|
|
|
// Start begins the state machine
|
|
func (sm *StateMachine[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]) Start() error {
|
|
sm.traceLogger.Trace("enter start")
|
|
defer sm.traceLogger.Trace("exit start")
|
|
sm.SendEvent(EventStart)
|
|
return nil
|
|
}
|
|
|
|
// Stop halts the state machine
|
|
func (sm *StateMachine[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]) Stop() error {
|
|
sm.traceLogger.Trace("enter stop")
|
|
defer sm.traceLogger.Trace("exit stop")
|
|
sm.SendEvent(EventStop)
|
|
return nil
|
|
}
|
|
|
|
// SendEvent sends an event to the state machine
|
|
func (sm *StateMachine[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]) SendEvent(event Event) {
|
|
sm.traceLogger.Trace(fmt.Sprintf("enter sendEvent: %s", event))
|
|
defer sm.traceLogger.Trace(fmt.Sprintf("exit sendEvent: %s", event))
|
|
response := make(chan error, 1)
|
|
go func() {
|
|
select {
|
|
case sm.eventChan <- eventWrapper{event: event, response: response}:
|
|
<-response
|
|
case <-sm.ctx.Done():
|
|
return
|
|
}
|
|
}()
|
|
}
|
|
|
|
// processEvents handles events and transitions
|
|
func (sm *StateMachine[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]) processEvents() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
sm.traceLogger.Error(
|
|
"fatal error encountered",
|
|
errors.New(fmt.Sprintf("%+v", r)),
|
|
)
|
|
sm.Close()
|
|
}
|
|
}()
|
|
|
|
sm.traceLogger.Trace("enter processEvents")
|
|
defer sm.traceLogger.Trace("exit processEvents")
|
|
for {
|
|
select {
|
|
case <-sm.ctx.Done():
|
|
return
|
|
case wrapper := <-sm.eventChan:
|
|
err := sm.handleEvent(wrapper.event)
|
|
wrapper.response <- err
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleEvent processes a single event
|
|
func (sm *StateMachine[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]) handleEvent(event Event) error {
|
|
sm.traceLogger.Trace(fmt.Sprintf("enter handleEvent: %s", event))
|
|
defer sm.traceLogger.Trace(fmt.Sprintf("exit handleEvent: %s", event))
|
|
sm.mu.Lock()
|
|
|
|
currentState := sm.machineState
|
|
transitions, exists := sm.transitions[currentState]
|
|
if !exists {
|
|
sm.mu.Unlock()
|
|
|
|
return errors.Wrap(
|
|
fmt.Errorf("no transitions defined for state %s", currentState),
|
|
"handle event",
|
|
)
|
|
}
|
|
|
|
transition, exists := transitions[event]
|
|
if !exists {
|
|
sm.mu.Unlock()
|
|
|
|
return errors.Wrap(
|
|
fmt.Errorf(
|
|
"no transition for event %s in state %s",
|
|
event,
|
|
currentState,
|
|
),
|
|
"handle event",
|
|
)
|
|
}
|
|
|
|
// Check guard condition with the actual state
|
|
if transition.Guard != nil && !transition.Guard(sm) {
|
|
sm.mu.Unlock()
|
|
|
|
return errors.Wrap(
|
|
fmt.Errorf(
|
|
"transition guard failed for %s -> %s on %s",
|
|
currentState,
|
|
transition.To,
|
|
event,
|
|
),
|
|
"handle event",
|
|
)
|
|
}
|
|
|
|
sm.mu.Unlock()
|
|
|
|
// Execute transition
|
|
sm.executeTransition(currentState, transition.To, event)
|
|
return nil
|
|
}
|
|
|
|
// executeTransition performs the state transition
|
|
func (sm *StateMachine[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]) executeTransition(
|
|
from State,
|
|
to State,
|
|
event Event,
|
|
) {
|
|
sm.traceLogger.Trace(
|
|
fmt.Sprintf("enter executeTransition: %s -> %s [%s]", from, to, event),
|
|
)
|
|
defer sm.traceLogger.Trace(
|
|
fmt.Sprintf("exit executeTransition: %s -> %s [%s]", from, to, event),
|
|
)
|
|
sm.mu.Lock()
|
|
|
|
// Cancel any existing timeout and behavior
|
|
if sm.timeoutTimer != nil {
|
|
sm.timeoutTimer.Stop()
|
|
sm.timeoutTimer = nil
|
|
}
|
|
|
|
// Cancel existing behavior if any
|
|
if sm.behaviorCancel != nil {
|
|
sm.behaviorCancel()
|
|
sm.behaviorCancel = nil
|
|
}
|
|
|
|
// Call exit callback for current state
|
|
if config, exists := sm.stateConfigs[from]; exists && config.OnExit != nil {
|
|
sm.mu.Unlock()
|
|
config.OnExit(sm, sm.activeState, event)
|
|
sm.mu.Lock()
|
|
}
|
|
|
|
// Update state
|
|
sm.machineState = to
|
|
sm.stateStartTime = time.Now()
|
|
sm.transitionCount++
|
|
|
|
// Notify listeners
|
|
for _, listener := range sm.listeners {
|
|
listener.OnTransition(from, to, event)
|
|
}
|
|
|
|
// Call enter callback for new state
|
|
if config, exists := sm.stateConfigs[to]; exists {
|
|
if config.OnEnter != nil {
|
|
sm.mu.Unlock()
|
|
config.OnEnter(sm, sm.activeState, event)
|
|
sm.mu.Lock()
|
|
}
|
|
|
|
// Start state behavior if defined
|
|
if config.Behavior != nil {
|
|
behaviorCtx, cancel := context.WithCancel(sm.ctx)
|
|
sm.behaviorCancel = cancel
|
|
sm.mu.Unlock()
|
|
config.Behavior(sm, sm.activeState, behaviorCtx)
|
|
sm.mu.Lock()
|
|
}
|
|
|
|
// Set up timeout for new state
|
|
if config.Timeout > 0 && config.OnTimeout != "" {
|
|
sm.timeoutTimer = time.AfterFunc(config.Timeout, func() {
|
|
sm.SendEvent(config.OnTimeout)
|
|
})
|
|
}
|
|
}
|
|
sm.mu.Unlock()
|
|
}
|
|
|
|
// GetState returns the current state
|
|
func (sm *StateMachine[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]) GetState() State {
|
|
sm.traceLogger.Trace("enter getstate")
|
|
defer sm.traceLogger.Trace("exit getstate")
|
|
sm.mu.Lock()
|
|
defer sm.mu.Unlock()
|
|
return sm.machineState
|
|
}
|
|
|
|
// Additional methods for compatibility
|
|
func (sm *StateMachine[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]) GetStateTime() time.Duration {
|
|
sm.traceLogger.Trace("enter getstatetime")
|
|
defer sm.traceLogger.Trace("exit getstatetime")
|
|
return time.Since(sm.stateStartTime)
|
|
}
|
|
|
|
func (sm *StateMachine[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]) GetTransitionCount() uint64 {
|
|
sm.traceLogger.Trace("enter transitioncount")
|
|
defer sm.traceLogger.Trace("exit transitioncount")
|
|
return sm.transitionCount
|
|
}
|
|
|
|
func (sm *StateMachine[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]) AddListener(listener TransitionListener[StateT]) {
|
|
sm.traceLogger.Trace("enter addlistener")
|
|
defer sm.traceLogger.Trace("exit addlistener")
|
|
sm.mu.Lock()
|
|
defer sm.mu.Unlock()
|
|
sm.listeners = append(sm.listeners, listener)
|
|
}
|
|
|
|
func (sm *StateMachine[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]) Close() {
|
|
sm.traceLogger.Trace("enter close")
|
|
defer sm.traceLogger.Trace("exit close")
|
|
sm.mu.Lock()
|
|
defer sm.mu.Unlock()
|
|
sm.cancel()
|
|
if sm.timeoutTimer != nil {
|
|
sm.timeoutTimer.Stop()
|
|
}
|
|
if sm.behaviorCancel != nil {
|
|
sm.behaviorCancel()
|
|
}
|
|
sm.machineState = StateStopped
|
|
}
|
|
|
|
// ReceiveLivenessCheck receives a liveness announcement and captures
|
|
// collected mutation operations reported by the peer if relevant.
|
|
func (sm *StateMachine[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]) ReceiveLivenessCheck(peer PeerIDT, collected CollectedT) error {
|
|
sm.traceLogger.Trace("enter receivelivenesscheck")
|
|
defer sm.traceLogger.Trace("exit receivelivenesscheck")
|
|
sm.mu.Lock()
|
|
if _, ok := sm.liveness[collected.Rank()]; !ok {
|
|
sm.liveness[collected.Rank()] = make(map[Identity]CollectedT)
|
|
}
|
|
if _, ok := sm.liveness[collected.Rank()][peer.Identity()]; !ok {
|
|
sm.liveness[collected.Rank()][peer.Identity()] = collected
|
|
}
|
|
sm.mu.Unlock()
|
|
|
|
sm.SendEvent(EventLivenessCheckReceived)
|
|
return nil
|
|
}
|
|
|
|
// ReceiveProposal receives a proposed new state.
|
|
func (sm *StateMachine[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]) ReceiveProposal(peer PeerIDT, proposal *StateT) error {
|
|
sm.traceLogger.Trace("enter receiveproposal")
|
|
defer sm.traceLogger.Trace("exit receiveproposal")
|
|
sm.mu.Lock()
|
|
if _, ok := sm.proposals[(*proposal).Rank()]; !ok {
|
|
sm.proposals[(*proposal).Rank()] = make(map[Identity]*StateT)
|
|
}
|
|
if _, ok := sm.proposals[(*proposal).Rank()][peer.Identity()]; !ok {
|
|
sm.proposals[(*proposal).Rank()][peer.Identity()] = proposal
|
|
}
|
|
sm.mu.Unlock()
|
|
|
|
sm.SendEvent(EventProposalReceived)
|
|
return nil
|
|
}
|
|
|
|
// ReceiveVote captures a vote. Presumes structural and protocol validity of a
|
|
// vote has already been evaluated.
|
|
func (sm *StateMachine[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]) ReceiveVote(proposer PeerIDT, voter PeerIDT, vote *VoteT) error {
|
|
sm.traceLogger.Trace("enter receivevote")
|
|
defer sm.traceLogger.Trace("exit receivevote")
|
|
sm.mu.Lock()
|
|
|
|
if _, ok := sm.votes[(*vote).Rank()]; !ok {
|
|
sm.votes[(*vote).Rank()] = make(map[Identity]*VoteT)
|
|
}
|
|
if _, ok := sm.votes[(*vote).Rank()][voter.Identity()]; !ok {
|
|
sm.votes[(*vote).Rank()][voter.Identity()] = vote
|
|
} else if sm.votes[(*vote).Rank()][voter.Identity()] != vote {
|
|
sm.mu.Unlock()
|
|
return errors.Wrap(errors.New("received conflicting vote"), "receive vote")
|
|
}
|
|
sm.mu.Unlock()
|
|
|
|
sm.SendEvent(EventVoteReceived)
|
|
return nil
|
|
}
|
|
|
|
// ReceiveConfirmation captures a confirmation. Presumes structural and protocol
|
|
// validity of the state has already been evaluated.
|
|
func (sm *StateMachine[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]) ReceiveConfirmation(
|
|
peer PeerIDT,
|
|
confirmation *StateT,
|
|
) error {
|
|
sm.traceLogger.Trace("enter receiveconfirmation")
|
|
defer sm.traceLogger.Trace("exit receiveconfirmation")
|
|
sm.mu.Lock()
|
|
if _, ok := sm.confirmations[(*confirmation).Rank()]; !ok {
|
|
sm.confirmations[(*confirmation).Rank()] = make(map[Identity]*StateT)
|
|
}
|
|
if _, ok := sm.confirmations[(*confirmation).Rank()][peer.Identity()]; !ok {
|
|
sm.confirmations[(*confirmation).Rank()][peer.Identity()] = confirmation
|
|
}
|
|
sm.mu.Unlock()
|
|
|
|
sm.SendEvent(EventConfirmationReceived)
|
|
return nil
|
|
}
|