From 6d15fbfc7d958ef4ced962279ec4662d97ef6117 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Thu, 9 Oct 2025 02:27:05 -0500 Subject: [PATCH] fix: weave in lock/unlock semantics to liveness provider --- .../global/consensus_liveness_provider.go | 22 ++++++++ .../engines/compute_execution_engine.go | 48 +++++++++++++---- .../engines/global_execution_engine.go | 52 ++++++++++++++----- .../engines/global_execution_engine_test.go | 2 +- .../engines/hypergraph_execution_engine.go | 48 +++++++++++++---- .../engines/token_execution_engine.go | 50 ++++++++++++++---- node/execution/manager/execution_manager.go | 23 ++++++++ types/execution/execution_engine.go | 2 +- 8 files changed, 202 insertions(+), 45 deletions(-) diff --git a/node/consensus/global/consensus_liveness_provider.go b/node/consensus/global/consensus_liveness_provider.go index dc03e2a..3b6bc7e 100644 --- a/node/consensus/global/consensus_liveness_provider.go +++ b/node/consensus/global/consensus_liveness_provider.go @@ -96,9 +96,31 @@ func (p *GlobalLivenessProvider) Collect( continue } + err = p.engine.executionManager.Lock( + frameNumber, + message.Address, + message.Payload, + ) + if err != nil { + p.engine.logger.Debug( + "message failed lock", + zap.Int("message_index", i), + zap.Error(err), + ) + continue + } + acceptedMessages = append(acceptedMessages, message) } + err := p.engine.executionManager.Unlock() + if err != nil { + p.engine.logger.Error( + "unable to unlock", + zap.Error(err), + ) + } + commitments := make([]*tries.VectorCommitmentTree, 256) for i := range 256 { commitments[i] = &tries.VectorCommitmentTree{} diff --git a/node/execution/engines/compute_execution_engine.go b/node/execution/engines/compute_execution_engine.go index ad9f727..86bceb7 100644 --- a/node/execution/engines/compute_execution_engine.go +++ b/node/execution/engines/compute_execution_engine.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "math/big" "slices" + "strings" "sync" "github.com/pkg/errors" @@ -285,21 +286,48 @@ func (e *ComputeExecutionEngine) Lock( return nil } + if len(message) > 4 && + binary.BigEndian.Uint32(message[:4]) == protobufs.MessageBundleType { + bundle := &protobufs.MessageBundle{} + err = bundle.FromCanonicalBytes(message) + if err != nil { + return errors.Wrap(err, "lock") + } + + for _, r := range bundle.Requests { + req, err := r.ToCanonicalBytes() + if err != nil { + return errors.Wrap(err, "lock") + } + + if err = intrinsic.Lock(frameNumber, req[8:]); err != nil { + return err + } + } + } + return errors.Wrap(intrinsic.Lock(frameNumber, message), "lock") } -func (e *ComputeExecutionEngine) Unlock( - frameNumber uint64, - address []byte, - message []byte, -) error { - intrinsic, err := e.tryGetIntrinsic(address) - if err != nil { - // non-applicable - return nil +func (e *ComputeExecutionEngine) Unlock() error { + e.intrinsicsMutex.RLock() + errs := []string{} + for _, intrinsic := range e.intrinsics { + err := intrinsic.Unlock() + if err != nil { + errs = append(errs, err.Error()) + } + } + e.intrinsicsMutex.RUnlock() + + if len(errs) != 0 { + return errors.Wrap( + errors.Errorf("multiple errors: %s", strings.Join(errs, ", ")), + "unlock", + ) } - return errors.Wrap(intrinsic.Unlock(), "unlock") + return nil } func (e *ComputeExecutionEngine) GetCost(message []byte) (*big.Int, error) { diff --git a/node/execution/engines/global_execution_engine.go b/node/execution/engines/global_execution_engine.go index ec41fdb..0bed7c1 100644 --- a/node/execution/engines/global_execution_engine.go +++ b/node/execution/engines/global_execution_engine.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "math/big" "slices" + "strings" "sync" "github.com/pkg/errors" @@ -474,21 +475,48 @@ func (e *GlobalExecutionEngine) Lock( return nil } - return errors.Wrap(intrinsic.Lock(frameNumber, message), "lock") -} + if len(message) > 4 && + binary.BigEndian.Uint32(message[:4]) == protobufs.MessageBundleType { + bundle := &protobufs.MessageBundle{} + err = bundle.FromCanonicalBytes(message) + if err != nil { + return errors.Wrap(err, "lock") + } -func (e *GlobalExecutionEngine) Unlock( - frameNumber uint64, - address []byte, - message []byte, -) error { - intrinsic, err := e.tryGetIntrinsic(address) - if err != nil { - // non-applicable - return nil + for _, r := range bundle.Requests { + req, err := r.ToCanonicalBytes() + if err != nil { + return errors.Wrap(err, "lock") + } + + if err = intrinsic.Lock(frameNumber, req[8:]); err != nil { + return err + } + } } - return errors.Wrap(intrinsic.Unlock(), "unlock") + return intrinsic.Lock(frameNumber, message) +} + +func (e *GlobalExecutionEngine) Unlock() error { + e.intrinsicsMutex.RLock() + errs := []string{} + for _, intrinsic := range e.intrinsics { + err := intrinsic.Unlock() + if err != nil { + errs = append(errs, err.Error()) + } + } + e.intrinsicsMutex.RUnlock() + + if len(errs) != 0 { + return errors.Wrap( + errors.Errorf("multiple errors: %s", strings.Join(errs, ", ")), + "unlock", + ) + } + + return nil } func (e *GlobalExecutionEngine) tryGetIntrinsic(address []byte) ( diff --git a/node/execution/engines/global_execution_engine_test.go b/node/execution/engines/global_execution_engine_test.go index 9042de1..763c1b4 100644 --- a/node/execution/engines/global_execution_engine_test.go +++ b/node/execution/engines/global_execution_engine_test.go @@ -110,7 +110,7 @@ func TestGlobalExecutionEngine_ProcessMessage(t *testing.T) { }, address: []byte("invalid_address"), wantErr: true, - errContains: "invalid address for global execution engine", + errContains: "invalid shard", }, } diff --git a/node/execution/engines/hypergraph_execution_engine.go b/node/execution/engines/hypergraph_execution_engine.go index 59a77b7..500b3c4 100644 --- a/node/execution/engines/hypergraph_execution_engine.go +++ b/node/execution/engines/hypergraph_execution_engine.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "math/big" "slices" + "strings" "sync" "github.com/pkg/errors" @@ -477,21 +478,48 @@ func (e *HypergraphExecutionEngine) Lock( return nil } + if len(message) > 4 && + binary.BigEndian.Uint32(message[:4]) == protobufs.MessageBundleType { + bundle := &protobufs.MessageBundle{} + err = bundle.FromCanonicalBytes(message) + if err != nil { + return errors.Wrap(err, "lock") + } + + for _, r := range bundle.Requests { + req, err := r.ToCanonicalBytes() + if err != nil { + return errors.Wrap(err, "lock") + } + + if err = intrinsic.Lock(frameNumber, req[8:]); err != nil { + return err + } + } + } + return errors.Wrap(intrinsic.Lock(frameNumber, message), "lock") } -func (e *HypergraphExecutionEngine) Unlock( - frameNumber uint64, - address []byte, - message []byte, -) error { - intrinsic, err := e.tryGetIntrinsic(address) - if err != nil { - // non-applicable - return nil +func (e *HypergraphExecutionEngine) Unlock() error { + e.intrinsicsMutex.RLock() + errs := []string{} + for _, intrinsic := range e.intrinsics { + err := intrinsic.Unlock() + if err != nil { + errs = append(errs, err.Error()) + } + } + e.intrinsicsMutex.RUnlock() + + if len(errs) != 0 { + return errors.Wrap( + errors.Errorf("multiple errors: %s", strings.Join(errs, ", ")), + "unlock", + ) } - return errors.Wrap(intrinsic.Unlock(), "unlock") + return nil } func (e *HypergraphExecutionEngine) handleBundle( diff --git a/node/execution/engines/token_execution_engine.go b/node/execution/engines/token_execution_engine.go index 978dec2..ee5e51e 100644 --- a/node/execution/engines/token_execution_engine.go +++ b/node/execution/engines/token_execution_engine.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "math/big" "slices" + "strings" "sync" "github.com/pkg/errors" @@ -537,21 +538,48 @@ func (e *TokenExecutionEngine) Lock( return nil } + if len(message) > 4 && + binary.BigEndian.Uint32(message[:4]) == protobufs.MessageBundleType { + bundle := &protobufs.MessageBundle{} + err = bundle.FromCanonicalBytes(message) + if err != nil { + return errors.Wrap(err, "lock") + } + + for _, r := range bundle.Requests { + req, err := r.ToCanonicalBytes() + if err != nil { + return errors.Wrap(err, "lock") + } + + if err = intrinsic.Lock(frameNumber, req[8:]); err != nil { + return err + } + } + } + return errors.Wrap(intrinsic.Lock(frameNumber, message), "lock") } -func (e *TokenExecutionEngine) Unlock( - frameNumber uint64, - address []byte, - message []byte, -) error { - intrinsic, err := e.tryGetIntrinsic(address) - if err != nil { - // non-applicable - return nil +func (e *TokenExecutionEngine) Unlock() error { + e.intrinsicsMutex.RLock() + errs := []string{} + for _, intrinsic := range e.intrinsics { + err := intrinsic.Unlock() + if err != nil { + errs = append(errs, err.Error()) + } + } + e.intrinsicsMutex.RUnlock() + + if len(errs) != 0 { + return errors.Wrap( + errors.Errorf("multiple errors: %s", strings.Join(errs, ", ")), + "unlock", + ) } - return errors.Wrap(intrinsic.Unlock(), "unlock") + return nil } func (e *TokenExecutionEngine) handleBundle( @@ -713,7 +741,7 @@ func (e *TokenExecutionEngine) processIndividualMessage( } // Otherwise, try to handle it as an operation on existing intrinsic - intrinsic, err := e.tryGetIntrinsic(address) + intrinsic, err := e.tryGetIntrinsic(domain) if err != nil { return nil, errors.Wrap(err, "process individual message") } diff --git a/node/execution/manager/execution_manager.go b/node/execution/manager/execution_manager.go index 4bccdec..7312a89 100644 --- a/node/execution/manager/execution_manager.go +++ b/node/execution/manager/execution_manager.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "math/big" "slices" + "strings" "sync" "time" @@ -638,6 +639,28 @@ func (m *ExecutionEngineManager) Lock( return engine.Lock(frameNumber, address, message) } +func (m *ExecutionEngineManager) Unlock() error { + m.enginesMu.RLock() + defer m.enginesMu.RUnlock() + + errs := []string{} + for _, engine := range m.engines { + err := engine.Unlock() + if err != nil { + errs = append(errs, err.Error()) + } + } + + if len(errs) != 0 { + return errors.Wrap( + errors.Errorf("multiple errors: %s", strings.Join(errs, ", ")), + "unlock", + ) + } + + return nil +} + // ValidateMessage validates a message without materializing state changes func (m *ExecutionEngineManager) ValidateMessage( frameNumber uint64, diff --git a/types/execution/execution_engine.go b/types/execution/execution_engine.go index 12239f3..718130b 100644 --- a/types/execution/execution_engine.go +++ b/types/execution/execution_engine.go @@ -30,7 +30,7 @@ type ShardExecutionEngine interface { message []byte, ) (*protobufs.MessageRequest, error) Lock(frameNumber uint64, address []byte, message []byte) error - Unlock(frameNumber uint64, address []byte, message []byte) error + Unlock() error GetCost(message []byte) (*big.Int, error) GetCapabilities() []*protobufs.Capability }