From f59eb4ba0a3a3f193cc6f6ff734aa144b56d8c8e Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Thu, 9 Oct 2025 04:44:33 -0500 Subject: [PATCH] fix lock fallthrough, add missing allocation update --- node/consensus/global/event_distributor.go | 3 +- node/consensus/provers/prover_registry.go | 185 +++++++++++++++++- .../engines/compute_execution_engine.go | 2 + .../engines/global_execution_engine.go | 2 + .../engines/hypergraph_execution_engine.go | 2 + .../engines/token_execution_engine.go | 2 + types/mocks/shard_execution.go | 8 +- 7 files changed, 192 insertions(+), 12 deletions(-) diff --git a/node/consensus/global/event_distributor.go b/node/consensus/global/event_distributor.go index 524f5ca..96fa62c 100644 --- a/node/consensus/global/event_distributor.go +++ b/node/consensus/global/event_distributor.go @@ -432,7 +432,8 @@ func (e *GlobalConsensusEngine) evaluateForProposals( zap.Uint64("join_frame_number", allocation.JoinFrameNumber), zap.Uint64("frame_number", data.Frame.Header.FrameNumber), ) - pending = allocation.Status == 0 && + pending = allocation.Status == + typesconsensus.ProverStatusJoining && allocation.JoinFrameNumber+360 <= data.Frame.Header.FrameNumber } } diff --git a/node/consensus/provers/prover_registry.go b/node/consensus/provers/prover_registry.go index b93c4da..e053237 100644 --- a/node/consensus/provers/prover_registry.go +++ b/node/consensus/provers/prover_registry.go @@ -1020,9 +1020,6 @@ func (r *ProverRegistry) extractGlobalState() error { } } allocationsFound++ - default: - r.logger.Debug("unknown vertex type", zap.String("type", typeName)) - return errors.Wrap(errors.New("invalid type"), "extract global state") } } @@ -1290,7 +1287,7 @@ func (r *ProverRegistry) processProverChange( zap.Uint8("status", uint8(mappedStatus)), ) - // Extract filters + // Extract data confirmationFilter, err := r.rdfMultiprover.Get( global.GLOBAL_RDF_SCHEMA, "allocation:ProverAllocation", @@ -1301,8 +1298,185 @@ func (r *ProverRegistry) processProverChange( return errors.Wrap(err, "process prover change") } + rejectionFilter, _ := r.rdfMultiprover.Get( + global.GLOBAL_RDF_SCHEMA, + "allocation:ProverAllocation", + "RejectionFilter", + data, + ) + + joinFrameNumberBytes, _ := r.rdfMultiprover.Get( + global.GLOBAL_RDF_SCHEMA, + "allocation:ProverAllocation", + "JoinFrameNumber", + data, + ) + + var joinFrameNumber uint64 + if len(joinFrameNumberBytes) != 0 { + joinFrameNumber = binary.BigEndian.Uint64(joinFrameNumberBytes) + } + + leaveFrameNumberBytes, _ := r.rdfMultiprover.Get( + global.GLOBAL_RDF_SCHEMA, + "allocation:ProverAllocation", + "LeaveFrameNumber", + data, + ) + + var leaveFrameNumber uint64 + if len(leaveFrameNumberBytes) != 0 { + leaveFrameNumber = binary.BigEndian.Uint64(leaveFrameNumberBytes) + } + + pauseFrameNumberBytes, _ := r.rdfMultiprover.Get( + global.GLOBAL_RDF_SCHEMA, + "allocation:ProverAllocation", + "PauseFrameNumber", + data, + ) + + var pauseFrameNumber uint64 + if len(pauseFrameNumberBytes) != 0 { + pauseFrameNumber = binary.BigEndian.Uint64(pauseFrameNumberBytes) + } + + resumeFrameNumberBytes, _ := r.rdfMultiprover.Get( + global.GLOBAL_RDF_SCHEMA, + "allocation:ProverAllocation", + "ResumeFrameNumber", + data, + ) + + var resumeFrameNumber uint64 + if len(resumeFrameNumberBytes) != 0 { + resumeFrameNumber = binary.BigEndian.Uint64(resumeFrameNumberBytes) + } + + kickFrameNumberBytes, _ := r.rdfMultiprover.Get( + global.GLOBAL_RDF_SCHEMA, + "allocation:ProverAllocation", + "KickFrameNumber", + data, + ) + + var kickFrameNumber uint64 + if len(kickFrameNumberBytes) != 0 { + kickFrameNumber = binary.BigEndian.Uint64(kickFrameNumberBytes) + } + + joinConfirmFrameNumberBytes, _ := r.rdfMultiprover.Get( + global.GLOBAL_RDF_SCHEMA, + "allocation:ProverAllocation", + "JoinConfirmFrameNumber", + data, + ) + + var joinConfirmFrameNumber uint64 + if len(joinConfirmFrameNumberBytes) != 0 { + joinConfirmFrameNumber = binary.BigEndian.Uint64( + joinConfirmFrameNumberBytes, + ) + } + + joinRejectFrameNumberBytes, _ := r.rdfMultiprover.Get( + global.GLOBAL_RDF_SCHEMA, + "allocation:ProverAllocation", + "JoinRejectFrameNumber", + data, + ) + + var joinRejectFrameNumber uint64 + if len(joinRejectFrameNumberBytes) != 0 { + joinRejectFrameNumber = binary.BigEndian.Uint64( + joinRejectFrameNumberBytes, + ) + } + + leaveConfirmFrameNumberBytes, _ := r.rdfMultiprover.Get( + global.GLOBAL_RDF_SCHEMA, + "allocation:ProverAllocation", + "LeaveConfirmFrameNumber", + data, + ) + + var leaveConfirmFrameNumber uint64 + if len(leaveConfirmFrameNumberBytes) != 0 { + leaveConfirmFrameNumber = binary.BigEndian.Uint64( + leaveConfirmFrameNumberBytes, + ) + } + + leaveRejectFrameNumberBytes, _ := r.rdfMultiprover.Get( + global.GLOBAL_RDF_SCHEMA, + "allocation:ProverAllocation", + "LeaveRejectFrameNumber", + data, + ) + + var leaveRejectFrameNumber uint64 + if len(leaveRejectFrameNumberBytes) != 0 { + leaveRejectFrameNumber = binary.BigEndian.Uint64( + leaveRejectFrameNumberBytes, + ) + } + + lastActiveFrameNumberBytes, _ := r.rdfMultiprover.Get( + global.GLOBAL_RDF_SCHEMA, + "allocation:ProverAllocation", + "LastActiveFrameNumber", + data, + ) + + var lastActiveFrameNumber uint64 + if len(lastActiveFrameNumberBytes) != 0 { + lastActiveFrameNumber = binary.BigEndian.Uint64( + lastActiveFrameNumberBytes, + ) + } + // Find the prover this allocation belongs to if proverInfo, exists := r.proverCache[string(proverRef[32:])]; exists { + for i, allocation := range proverInfo.Allocations { + if bytes.Equal(allocation.ConfirmationFilter, confirmationFilter) { + proverInfo.Allocations[i].Status = mappedStatus + proverInfo.Allocations[i].RejectionFilter = rejectionFilter + proverInfo.Allocations[i].JoinFrameNumber = joinFrameNumber + proverInfo.Allocations[i].LeaveFrameNumber = leaveFrameNumber + proverInfo.Allocations[i].PauseFrameNumber = pauseFrameNumber + proverInfo.Allocations[i].ResumeFrameNumber = resumeFrameNumber + proverInfo.Allocations[i].KickFrameNumber = kickFrameNumber + proverInfo.Allocations[i].JoinConfirmFrameNumber = + joinConfirmFrameNumber + proverInfo.Allocations[i].JoinRejectFrameNumber = + joinRejectFrameNumber + proverInfo.Allocations[i].LeaveConfirmFrameNumber = + leaveConfirmFrameNumber + proverInfo.Allocations[i].LeaveRejectFrameNumber = + leaveRejectFrameNumber + proverInfo.Allocations[i].LastActiveFrameNumber = + lastActiveFrameNumber + } + } + proverInfo.Allocations = append( + proverInfo.Allocations, + consensus.ProverAllocationInfo{ + Status: mappedStatus, + ConfirmationFilter: confirmationFilter, + RejectionFilter: rejectionFilter, + JoinFrameNumber: joinFrameNumber, + LeaveFrameNumber: leaveFrameNumber, + PauseFrameNumber: pauseFrameNumber, + ResumeFrameNumber: resumeFrameNumber, + KickFrameNumber: kickFrameNumber, + JoinConfirmFrameNumber: joinConfirmFrameNumber, + JoinRejectFrameNumber: joinRejectFrameNumber, + LeaveConfirmFrameNumber: leaveConfirmFrameNumber, + LeaveRejectFrameNumber: leaveRejectFrameNumber, + LastActiveFrameNumber: lastActiveFrameNumber, + }, + ) + // Update tries based on allocation status if mappedStatus == consensus.ProverStatusActive && len(confirmationFilter) > 0 { @@ -1314,7 +1488,8 @@ func (r *ProverRegistry) processProverChange( ); err != nil { return errors.Wrap(err, "process prover change") } - } else { + } else if mappedStatus == consensus.ProverStatusKicked || + mappedStatus == consensus.ProverStatusUnknown { // Remove from filter trie if not active if err := r.removeProverFromTrie( proverRef[32:], diff --git a/node/execution/engines/compute_execution_engine.go b/node/execution/engines/compute_execution_engine.go index 86bceb7..df481c2 100644 --- a/node/execution/engines/compute_execution_engine.go +++ b/node/execution/engines/compute_execution_engine.go @@ -304,6 +304,8 @@ func (e *ComputeExecutionEngine) Lock( return err } } + + return nil } return errors.Wrap(intrinsic.Lock(frameNumber, message), "lock") diff --git a/node/execution/engines/global_execution_engine.go b/node/execution/engines/global_execution_engine.go index 0bed7c1..9011c2e 100644 --- a/node/execution/engines/global_execution_engine.go +++ b/node/execution/engines/global_execution_engine.go @@ -493,6 +493,8 @@ func (e *GlobalExecutionEngine) Lock( return err } } + + return nil } return intrinsic.Lock(frameNumber, message) diff --git a/node/execution/engines/hypergraph_execution_engine.go b/node/execution/engines/hypergraph_execution_engine.go index 500b3c4..80580a9 100644 --- a/node/execution/engines/hypergraph_execution_engine.go +++ b/node/execution/engines/hypergraph_execution_engine.go @@ -496,6 +496,8 @@ func (e *HypergraphExecutionEngine) Lock( return err } } + + return nil } return errors.Wrap(intrinsic.Lock(frameNumber, message), "lock") diff --git a/node/execution/engines/token_execution_engine.go b/node/execution/engines/token_execution_engine.go index ee5e51e..6cc5223 100644 --- a/node/execution/engines/token_execution_engine.go +++ b/node/execution/engines/token_execution_engine.go @@ -556,6 +556,8 @@ func (e *TokenExecutionEngine) Lock( return err } } + + return nil } return errors.Wrap(intrinsic.Lock(frameNumber, message), "lock") diff --git a/types/mocks/shard_execution.go b/types/mocks/shard_execution.go index 0af8f43..c7d58f6 100644 --- a/types/mocks/shard_execution.go +++ b/types/mocks/shard_execution.go @@ -26,12 +26,8 @@ func (m *MockShardExecutionEngine) Lock( } // Unlock implements execution.ShardExecutionEngine. -func (m *MockShardExecutionEngine) Unlock( - frameNumber uint64, - address []byte, - message []byte, -) error { - args := m.Called(frameNumber, address, message) +func (m *MockShardExecutionEngine) Unlock() error { + args := m.Called() return args.Error(0) }