mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-22 02:47:26 +08:00
* experiment: reject bad peer info messages * v2.1.0.18 preview * add tagged sync * Add missing hypergraph changes * small tweaks to sync * allow local sync, use it for provers with workers * missing file * resolve build error * resolve sync issue, remove raw sync * resolve deletion promotion bug * resolve sync abstraction leak from tree deletion changes * rearrange prover sync * remove pruning from sync * restore removed sync flag * fix: sync, event stream deadlock, heuristic scoring of better shards * resolve hanging shutdown + pubsub proxy issue * further bugfixes: sync (restore old leaf sync), pubsub shutdown, merge events * fix: clean up rust ffi, background coverage events, and sync tweaks * fix: linking issue for channel, connectivity test aggression, sync regression, join tests * fix: disjoint sync, improper application of filter * resolve sync/reel/validation deadlock * adjust sync to handle no leaf edge cases, multi-path segment traversal * use simpler sync * faster, simpler sync with some debug extras * migration to recalculate * don't use batch * square up the roots * fix nil pointer * fix: seniority calculation, sync race condition, migration * make sync dumber * fix: tree deletion issue * fix: missing seniority merge request canonical serialization * address issues from previous commit test * stale workers should be cleared * remove missing gap check * rearrange collect, reduce sync logging noise * fix: the disjoint leaf/branch sync case * nuclear option on sync failures * v2.1.0.18, finalized
516 lines
14 KiB
Go
516 lines
14 KiB
Go
package global
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"slices"
|
|
|
|
"go.uber.org/zap"
|
|
"golang.org/x/crypto/sha3"
|
|
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
|
|
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
|
|
"source.quilibrium.com/quilibrium/monorepo/node/consensus/tracing"
|
|
keyedaggregator "source.quilibrium.com/quilibrium/monorepo/node/keyedaggregator"
|
|
keyedcollector "source.quilibrium.com/quilibrium/monorepo/node/keyedcollector"
|
|
"source.quilibrium.com/quilibrium/monorepo/protobufs"
|
|
)
|
|
|
|
const maxGlobalMessagesPerFrame = 100
|
|
|
|
var globalMessageAddress = bytes.Repeat([]byte{0xff}, 32)
|
|
|
|
type sequencedGlobalMessage struct {
|
|
sequence uint64
|
|
identity models.Identity
|
|
payload []byte
|
|
message *protobufs.Message
|
|
}
|
|
|
|
func newSequencedGlobalMessage(
|
|
sequence uint64,
|
|
payload []byte,
|
|
) *sequencedGlobalMessage {
|
|
copyPayload := slices.Clone(payload)
|
|
hash := sha3.Sum256(copyPayload)
|
|
return &sequencedGlobalMessage{
|
|
sequence: sequence,
|
|
identity: models.Identity(string(hash[:])),
|
|
payload: copyPayload,
|
|
}
|
|
}
|
|
|
|
var globalMessageTraits = keyedcollector.RecordTraits[sequencedGlobalMessage]{
|
|
Sequence: func(m *sequencedGlobalMessage) uint64 {
|
|
if m == nil {
|
|
return 0
|
|
}
|
|
return m.sequence
|
|
},
|
|
Identity: func(m *sequencedGlobalMessage) models.Identity {
|
|
if m == nil {
|
|
return ""
|
|
}
|
|
return m.identity
|
|
},
|
|
Equals: func(a, b *sequencedGlobalMessage) bool {
|
|
if a == nil || b == nil {
|
|
return a == b
|
|
}
|
|
return slices.Equal(a.payload, b.payload)
|
|
},
|
|
}
|
|
|
|
type globalMessageProcessorFactory struct {
|
|
engine *GlobalConsensusEngine
|
|
}
|
|
|
|
func (f *globalMessageProcessorFactory) Create(
|
|
sequence uint64,
|
|
) (keyedcollector.Processor[sequencedGlobalMessage], error) {
|
|
return &globalMessageProcessor{
|
|
engine: f.engine,
|
|
sequence: sequence,
|
|
}, nil
|
|
}
|
|
|
|
type globalMessageProcessor struct {
|
|
engine *GlobalConsensusEngine
|
|
sequence uint64
|
|
}
|
|
|
|
func (p *globalMessageProcessor) Process(
|
|
record *sequencedGlobalMessage,
|
|
) error {
|
|
if record == nil {
|
|
return keyedcollector.NewInvalidRecordError(
|
|
record,
|
|
errors.New("nil global message"),
|
|
)
|
|
}
|
|
|
|
if len(record.payload) < 4 {
|
|
return keyedcollector.NewInvalidRecordError(
|
|
record,
|
|
errors.New("global message payload too short"),
|
|
)
|
|
}
|
|
|
|
typePrefix := binary.BigEndian.Uint32(record.payload[:4])
|
|
if typePrefix != protobufs.MessageBundleType {
|
|
return keyedcollector.NewInvalidRecordError(
|
|
record,
|
|
fmt.Errorf("unexpected message type: %d", typePrefix),
|
|
)
|
|
}
|
|
|
|
message := &protobufs.Message{
|
|
Address: globalMessageAddress,
|
|
Payload: record.payload,
|
|
}
|
|
|
|
if err := p.enforceCollectorLimit(record); err != nil {
|
|
return err
|
|
}
|
|
|
|
qc, err := p.engine.clockStore.GetQuorumCertificate(nil, record.sequence-1)
|
|
if err != nil {
|
|
qc, err = p.engine.clockStore.GetLatestQuorumCertificate(nil)
|
|
}
|
|
if err != nil {
|
|
return keyedcollector.NewInvalidRecordError(record, err)
|
|
}
|
|
|
|
if err := p.engine.executionManager.ValidateMessage(
|
|
qc.FrameNumber+1,
|
|
message.Address,
|
|
message.Payload,
|
|
); err != nil {
|
|
return keyedcollector.NewInvalidRecordError(record, err)
|
|
}
|
|
|
|
record.message = message
|
|
return nil
|
|
}
|
|
|
|
func (p *globalMessageProcessor) enforceCollectorLimit(
|
|
record *sequencedGlobalMessage,
|
|
) error {
|
|
collector, found, err := p.engine.getMessageCollector(p.sequence)
|
|
if err != nil || !found {
|
|
return nil
|
|
}
|
|
|
|
if len(collector.Records()) >= maxGlobalMessagesPerFrame {
|
|
collector.Remove(record)
|
|
// p.engine.deferGlobalMessage(record.sequence+1, record.payload)
|
|
return keyedcollector.NewInvalidRecordError(
|
|
record,
|
|
fmt.Errorf("message limit reached for frame %d", p.sequence),
|
|
)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *GlobalConsensusEngine) initGlobalMessageAggregator() error {
|
|
tracer := tracing.NewZapTracer(e.logger.Named("global_message_collector"))
|
|
processorFactory := &globalMessageProcessorFactory{engine: e}
|
|
collectorFactory, err := keyedcollector.NewFactory(
|
|
tracer,
|
|
globalMessageTraits,
|
|
nil,
|
|
processorFactory,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("global message collector factory: %w", err)
|
|
}
|
|
|
|
e.messageCollectors = keyedaggregator.NewSequencedCollectors(
|
|
tracer,
|
|
0,
|
|
collectorFactory,
|
|
)
|
|
|
|
aggregator, err := keyedaggregator.NewSequencedAggregator(
|
|
tracer,
|
|
0,
|
|
e.messageCollectors,
|
|
func(m *sequencedGlobalMessage) uint64 {
|
|
if m == nil {
|
|
return 0
|
|
}
|
|
return m.sequence
|
|
},
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("global message aggregator: %w", err)
|
|
}
|
|
|
|
e.messageAggregator = aggregator
|
|
return nil
|
|
}
|
|
|
|
func (e *GlobalConsensusEngine) startGlobalMessageAggregator(
|
|
ctx lifecycle.SignalerContext,
|
|
ready lifecycle.ReadyFunc,
|
|
) {
|
|
if e.messageAggregator == nil {
|
|
ready()
|
|
<-ctx.Done()
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
if err := e.messageAggregator.ComponentManager.Start(ctx); err != nil {
|
|
ctx.Throw(err)
|
|
}
|
|
}()
|
|
|
|
<-e.messageAggregator.ComponentManager.Ready()
|
|
ready()
|
|
<-e.messageAggregator.ComponentManager.Done()
|
|
}
|
|
|
|
func (e *GlobalConsensusEngine) addGlobalMessage(data []byte) {
|
|
if e.messageAggregator == nil || len(data) == 0 {
|
|
return
|
|
}
|
|
|
|
payload := data // buildutils:allow-slice-alias slice is static
|
|
if len(data) >= 4 {
|
|
typePrefix := binary.BigEndian.Uint32(data[:4])
|
|
if typePrefix == protobufs.MessageBundleType {
|
|
bundle := &protobufs.MessageBundle{}
|
|
if err := bundle.FromCanonicalBytes(data); err != nil {
|
|
if e.logger != nil {
|
|
e.logger.Debug(
|
|
"failed to decode message bundle for collector",
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
return
|
|
}
|
|
|
|
// In prover-only mode, filter out non-prover messages
|
|
if e.proverOnlyMode.Load() {
|
|
bundle.Requests = e.filterProverOnlyRequests(bundle.Requests)
|
|
if len(bundle.Requests) == 0 {
|
|
// All requests were filtered out
|
|
return
|
|
}
|
|
}
|
|
|
|
if len(bundle.Requests) > maxGlobalMessagesPerFrame {
|
|
if e.logger != nil {
|
|
e.logger.Debug(
|
|
"truncating message bundle requests for collector",
|
|
zap.Int("original", len(bundle.Requests)),
|
|
zap.Int("limit", maxGlobalMessagesPerFrame),
|
|
)
|
|
}
|
|
bundle.Requests = bundle.Requests[:maxGlobalMessagesPerFrame]
|
|
}
|
|
|
|
e.logBundleRequestTypes(bundle)
|
|
|
|
encoded, err := bundle.ToCanonicalBytes()
|
|
if err != nil {
|
|
if e.logger != nil {
|
|
e.logger.Debug(
|
|
"failed to re-encode message bundle for collector",
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
return
|
|
}
|
|
payload = encoded
|
|
}
|
|
}
|
|
|
|
record := newSequencedGlobalMessage(e.currentRank+1, payload)
|
|
e.messageAggregator.Add(record)
|
|
}
|
|
|
|
// filterProverOnlyRequests filters a list of message requests to only include
|
|
// prover-related messages. This is used when in prover-only mode due to
|
|
// insufficient coverage.
|
|
func (e *GlobalConsensusEngine) filterProverOnlyRequests(
|
|
requests []*protobufs.MessageRequest,
|
|
) []*protobufs.MessageRequest {
|
|
filtered := make([]*protobufs.MessageRequest, 0, len(requests))
|
|
droppedCount := 0
|
|
|
|
for _, req := range requests {
|
|
if req == nil || req.GetRequest() == nil {
|
|
continue
|
|
}
|
|
|
|
// Only allow prover-related message types
|
|
switch req.GetRequest().(type) {
|
|
case *protobufs.MessageRequest_Join,
|
|
*protobufs.MessageRequest_Leave,
|
|
*protobufs.MessageRequest_Pause,
|
|
*protobufs.MessageRequest_Resume,
|
|
*protobufs.MessageRequest_Confirm,
|
|
*protobufs.MessageRequest_Reject,
|
|
*protobufs.MessageRequest_Kick,
|
|
*protobufs.MessageRequest_Update,
|
|
*protobufs.MessageRequest_SeniorityMerge:
|
|
// Prover messages are allowed
|
|
filtered = append(filtered, req)
|
|
default:
|
|
// All other messages are dropped in prover-only mode
|
|
droppedCount++
|
|
}
|
|
}
|
|
|
|
if droppedCount > 0 && e.logger != nil {
|
|
e.logger.Debug(
|
|
"dropped non-prover messages in prover-only mode",
|
|
zap.Int("dropped_count", droppedCount),
|
|
zap.Int("allowed_count", len(filtered)),
|
|
)
|
|
}
|
|
|
|
return filtered
|
|
}
|
|
|
|
func (e *GlobalConsensusEngine) logBundleRequestTypes(
|
|
bundle *protobufs.MessageBundle,
|
|
) {
|
|
requestTypes := make([]string, 0, len(bundle.Requests))
|
|
detailFields := make([]zap.Field, 0)
|
|
for idx, request := range bundle.Requests {
|
|
typeName, detailField, hasDetail := requestTypeNameAndDetail(idx, request)
|
|
requestTypes = append(requestTypes, typeName)
|
|
if hasDetail {
|
|
detailFields = append(detailFields, detailField)
|
|
}
|
|
}
|
|
|
|
fields := []zap.Field{
|
|
zap.Int("request_count", len(bundle.Requests)),
|
|
zap.Strings("request_types", requestTypes),
|
|
zap.Int64("bundle_timestamp", bundle.Timestamp),
|
|
}
|
|
fields = append(fields, detailFields...)
|
|
|
|
e.logger.Debug("collected global request bundle", fields...)
|
|
}
|
|
|
|
func requestTypeNameAndDetail(
|
|
idx int,
|
|
req *protobufs.MessageRequest,
|
|
) (string, zap.Field, bool) {
|
|
if req == nil || req.GetRequest() == nil {
|
|
return "nil_request", zap.Field{}, false
|
|
}
|
|
|
|
switch actual := req.GetRequest().(type) {
|
|
case *protobufs.MessageRequest_Join:
|
|
return "ProverJoin", zap.Field{}, false
|
|
case *protobufs.MessageRequest_Leave:
|
|
return "ProverLeave", zap.Field{}, false
|
|
case *protobufs.MessageRequest_Pause:
|
|
return "ProverPause", zap.Field{}, false
|
|
case *protobufs.MessageRequest_Resume:
|
|
return "ProverResume", zap.Field{}, false
|
|
case *protobufs.MessageRequest_Confirm:
|
|
return "ProverConfirm", zap.Field{}, false
|
|
case *protobufs.MessageRequest_Reject:
|
|
return "ProverReject", zap.Field{}, false
|
|
case *protobufs.MessageRequest_Kick:
|
|
return "ProverKick", zap.Field{}, false
|
|
case *protobufs.MessageRequest_Update:
|
|
return "ProverUpdate",
|
|
zap.Any(fmt.Sprintf("request_%d_prover_update", idx), actual.Update),
|
|
true
|
|
case *protobufs.MessageRequest_SeniorityMerge:
|
|
return "ProverSeniorityMerge",
|
|
zap.Any(fmt.Sprintf("request_%d_seniority_merge", idx), actual.SeniorityMerge),
|
|
true
|
|
case *protobufs.MessageRequest_TokenDeploy:
|
|
return "TokenDeploy",
|
|
zap.Any(fmt.Sprintf("request_%d_token_deploy", idx), actual.TokenDeploy),
|
|
true
|
|
case *protobufs.MessageRequest_TokenUpdate:
|
|
return "TokenUpdate",
|
|
zap.Any(fmt.Sprintf("request_%d_token_update", idx), actual.TokenUpdate),
|
|
true
|
|
case *protobufs.MessageRequest_Transaction:
|
|
return "Transaction",
|
|
zap.Any(fmt.Sprintf("request_%d_transaction", idx), actual.Transaction),
|
|
true
|
|
case *protobufs.MessageRequest_PendingTransaction:
|
|
return "PendingTransaction",
|
|
zap.Any(
|
|
fmt.Sprintf("request_%d_pending_transaction", idx),
|
|
actual.PendingTransaction,
|
|
),
|
|
true
|
|
case *protobufs.MessageRequest_MintTransaction:
|
|
return "MintTransaction",
|
|
zap.Any(fmt.Sprintf("request_%d_mint_transaction", idx), actual.MintTransaction),
|
|
true
|
|
case *protobufs.MessageRequest_HypergraphDeploy:
|
|
return "HypergraphDeploy",
|
|
zap.Any(fmt.Sprintf("request_%d_hypergraph_deploy", idx), actual.HypergraphDeploy),
|
|
true
|
|
case *protobufs.MessageRequest_HypergraphUpdate:
|
|
return "HypergraphUpdate",
|
|
zap.Any(fmt.Sprintf("request_%d_hypergraph_update", idx), actual.HypergraphUpdate),
|
|
true
|
|
case *protobufs.MessageRequest_VertexAdd:
|
|
return "VertexAdd",
|
|
zap.Any(fmt.Sprintf("request_%d_vertex_add", idx), actual.VertexAdd),
|
|
true
|
|
case *protobufs.MessageRequest_VertexRemove:
|
|
return "VertexRemove",
|
|
zap.Any(fmt.Sprintf("request_%d_vertex_remove", idx), actual.VertexRemove),
|
|
true
|
|
case *protobufs.MessageRequest_HyperedgeAdd:
|
|
return "HyperedgeAdd",
|
|
zap.Any(fmt.Sprintf("request_%d_hyperedge_add", idx), actual.HyperedgeAdd),
|
|
true
|
|
case *protobufs.MessageRequest_HyperedgeRemove:
|
|
return "HyperedgeRemove",
|
|
zap.Any(fmt.Sprintf("request_%d_hyperedge_remove", idx), actual.HyperedgeRemove),
|
|
true
|
|
case *protobufs.MessageRequest_ComputeDeploy:
|
|
return "ComputeDeploy",
|
|
zap.Any(fmt.Sprintf("request_%d_compute_deploy", idx), actual.ComputeDeploy),
|
|
true
|
|
case *protobufs.MessageRequest_ComputeUpdate:
|
|
return "ComputeUpdate",
|
|
zap.Any(fmt.Sprintf("request_%d_compute_update", idx), actual.ComputeUpdate),
|
|
true
|
|
case *protobufs.MessageRequest_CodeDeploy:
|
|
return "CodeDeploy",
|
|
zap.Any(fmt.Sprintf("request_%d_code_deploy", idx), actual.CodeDeploy),
|
|
true
|
|
case *protobufs.MessageRequest_CodeExecute:
|
|
return "CodeExecute",
|
|
zap.Any(fmt.Sprintf("request_%d_code_execute", idx), actual.CodeExecute),
|
|
true
|
|
case *protobufs.MessageRequest_CodeFinalize:
|
|
return "CodeFinalize",
|
|
zap.Any(fmt.Sprintf("request_%d_code_finalize", idx), actual.CodeFinalize),
|
|
true
|
|
case *protobufs.MessageRequest_Shard:
|
|
return "ShardFrame",
|
|
zap.Any(fmt.Sprintf("request_%d_shard_frame", idx), actual.Shard),
|
|
true
|
|
default:
|
|
return "unknown_request", zap.Field{}, false
|
|
}
|
|
}
|
|
|
|
func (e *GlobalConsensusEngine) getMessageCollector(
|
|
rank uint64,
|
|
) (keyedaggregator.Collector[sequencedGlobalMessage], bool, error) {
|
|
if e.messageCollectors == nil {
|
|
return nil, false, nil
|
|
}
|
|
return e.messageCollectors.GetCollector(rank)
|
|
}
|
|
|
|
func (e *GlobalConsensusEngine) deferGlobalMessage(
|
|
targetRank uint64,
|
|
payload []byte,
|
|
) {
|
|
if e == nil || len(payload) == 0 || targetRank == 0 {
|
|
return
|
|
}
|
|
|
|
cloned := slices.Clone(payload)
|
|
e.globalSpilloverMu.Lock()
|
|
e.globalMessageSpillover[targetRank] = append(
|
|
e.globalMessageSpillover[targetRank],
|
|
cloned,
|
|
)
|
|
pending := len(e.globalMessageSpillover[targetRank])
|
|
e.globalSpilloverMu.Unlock()
|
|
|
|
if e.logger != nil {
|
|
e.logger.Debug(
|
|
"deferred global message due to collector limit",
|
|
zap.Uint64("target_rank", targetRank),
|
|
zap.Int("pending", pending),
|
|
)
|
|
}
|
|
}
|
|
|
|
func (e *GlobalConsensusEngine) flushDeferredGlobalMessages(targetRank uint64) {
|
|
if e == nil || e.messageAggregator == nil || targetRank == 0 {
|
|
return
|
|
}
|
|
|
|
e.globalSpilloverMu.Lock()
|
|
payloads := e.globalMessageSpillover[targetRank]
|
|
if len(payloads) > 0 {
|
|
delete(e.globalMessageSpillover, targetRank)
|
|
}
|
|
e.globalSpilloverMu.Unlock()
|
|
|
|
if len(payloads) == 0 {
|
|
return
|
|
}
|
|
|
|
for _, payload := range payloads {
|
|
e.messageAggregator.Add(
|
|
newSequencedGlobalMessage(targetRank, payload),
|
|
)
|
|
}
|
|
|
|
if e.logger != nil {
|
|
e.logger.Debug(
|
|
"replayed deferred global messages",
|
|
zap.Uint64("target_rank", targetRank),
|
|
zap.Int("count", len(payloads)),
|
|
)
|
|
}
|
|
}
|