ceremonyclient/node/execution/intrinsics/global/global_intrinsic.go
Cassandra Heart ce4f77b140
v2.1.0.19 (#515)
* v2.1.0.19

* enhanced error logging, fix seniority marker join blocker, fix sync message size limit defaults

* resolve signature failure

* additional error logging for merge-related signatures

* fix: one-shot sync message size, app shard TC signature size, collector/hotstuff race condition, expired joins blocking new joins due to pruning disable

* remove compat with old 2.0.0 blossomsub

* fix: resolve abandoned prover joins

* reload prover registry

* fix stale worker proposal edge

* add full sanity check on join before submitting to identify bug

* resolve non-fallthrough condition that should be fallthrough

* fix: resolve rare SIGFPE, fix orphan expired joins blocking workers from reallocating

* add reconnect fallback if no peers are found with variable reconnect time (#511)

Co-authored-by: Tyler Sturos <55340199+tjsturos@users.noreply.github.com>

* update base peer count to 1 (#513)

* fix: expired prover join frames, starting port ranges, proposer getting stuck, and seniority on joins

* fix: panic on shutdown, libp2p discovery picking inaccessible peers, coverage event check not in shutdown logic, amend app shard worker behavior to mirror global for prover root reconciliation

* fix: shutdown scenario quirks, reload hanging

* fix: do not bailout early on shutdown of coverage check

* fix: force registry refresh on worker waiting for registration

* add more logging to wait for prover

* fix: worker manager refreshes the filter on allocation, snapshots blocking close on shutdown

* tweak: force shutdown after five seconds for app worker

* fix: don't loop when shutting down

* fix: slight reordering, also added named workers to trace hanging shutdowns

* use deterministic key for peer id of workers to stop flagging workers as sybil attacks

* fix: remove pubsub stop from app consensus engine as it shouldn't manage pubsub lifecycle, integrate shutdown context to PerformSync to prevent stuck syncs from halting respawn

* fix: blossomsub pubsub interface does not properly track subscription status

* fix: subscribe order to avoid nil panic

* switch from dnsaddr to dns4

* add missing quic-v1

* additional logging to isolate respawn quirks

* fix: dnsaddr -> dns4 for blossomsub

* sort-of fix: apply sledgehammer to restart logic

* fix: restore proper respawn logic, fix frozen hypergraph post respawn, unsubscribe from bitmask previously missing

---------

Co-authored-by: winged-pegasus <55340199+winged-pegasus@users.noreply.github.com>
Co-authored-by: Tyler Sturos <55340199+tjsturos@users.noreply.github.com>
2026-02-26 04:20:13 -06:00

2300 lines
53 KiB
Go

package global
import (
"bytes"
"encoding/binary"
"fmt"
"math/big"
"sync"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
observability "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/consensus"
"source.quilibrium.com/quilibrium/monorepo/types/crypto"
"source.quilibrium.com/quilibrium/monorepo/types/execution/intrinsics"
"source.quilibrium.com/quilibrium/monorepo/types/execution/state"
"source.quilibrium.com/quilibrium/monorepo/types/hypergraph"
"source.quilibrium.com/quilibrium/monorepo/types/keys"
"source.quilibrium.com/quilibrium/monorepo/types/schema"
"source.quilibrium.com/quilibrium/monorepo/types/store"
)
type GlobalIntrinsic struct {
logger *zap.Logger
lockedWrites map[string]struct{}
lockedReads map[string]int
lockedWritesMx sync.RWMutex
lockedReadsMx sync.RWMutex
state state.State
rdfHypergraphSchema string
rdfMultiprover *schema.RDFMultiprover
hypergraph hypergraph.Hypergraph
keyManager keys.KeyManager
frameProver crypto.FrameProver
frameStore store.ClockStore
rewardIssuance consensus.RewardIssuance
proverRegistry consensus.ProverRegistry
blsConstructor crypto.BlsConstructor
shardsStore store.ShardsStore
}
var GLOBAL_RDF_SCHEMA = `BASE <https://types.quilibrium.com/schema-repository/>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX qcl: <https://types.quilibrium.com/qcl/>
PREFIX prover: <https://types.quilibrium.com/schema-repository/global/prover/>
PREFIX allocation: <https://types.quilibrium.com/schema-repository/global/allocation/>
PREFIX reward: <https://types.quilibrium.com/schema-repository/global/reward/>
prover:Prover a rdfs:Class.
prover:PublicKey a rdfs:Property;
rdfs:domain qcl:ByteArray;
qcl:size 585;
qcl:order 0;
rdfs:range prover:Prover.
prover:Status a rdfs:Property;
rdfs:domain qcl:Uint;
qcl:size 1;
qcl:order 1;
rdfs:range prover:Prover.
prover:AvailableStorage a rdfs:Property;
rdfs:domain qcl:Uint;
qcl:size 8;
qcl:order 2;
rdfs:range prover:Prover.
prover:Seniority a rdfs:Property;
rdfs:domain qcl:Uint;
qcl:size 8;
qcl:order 3;
rdfs:range prover:Prover.
prover:KickFrameNumber a rdfs:Property;
rdfs:domain qcl:Uint;
qcl:size 8;
qcl:order 4;
rdfs:range prover:Prover.
allocation:ProverAllocation a rdfs:Class.
allocation:Prover a rdfs:Property;
rdfs:domain prover:Prover;
qcl:order 0;
rdfs:range allocation:ProverAllocation.
allocation:Status a rdfs:Property;
rdfs:domain qcl:Uint;
qcl:size 1;
qcl:order 1;
rdfs:range allocation:ProverAllocation.
allocation:ConfirmationFilter a rdfs:Property;
rdfs:domain qcl:ByteArray;
qcl:size 64;
qcl:order 2;
rdfs:range allocation:ProverAllocation.
allocation:RejectionFilter a rdfs:Property;
rdfs:domain qcl:ByteArray;
qcl:size 64;
qcl:order 3;
rdfs:range allocation:ProverAllocation.
allocation:JoinFrameNumber a rdfs:Property;
rdfs:domain qcl:Uint;
qcl:size 8;
qcl:order 4;
rdfs:range allocation:ProverAllocation.
allocation:LeaveFrameNumber a rdfs:Property;
rdfs:domain qcl:Uint;
qcl:size 8;
qcl:order 5;
rdfs:range allocation:ProverAllocation.
allocation:PauseFrameNumber a rdfs:Property;
rdfs:domain qcl:Uint;
qcl:size 8;
qcl:order 6;
rdfs:range allocation:ProverAllocation.
allocation:ResumeFrameNumber a rdfs:Property;
rdfs:domain qcl:Uint;
qcl:size 8;
qcl:order 7;
rdfs:range allocation:ProverAllocation.
allocation:KickFrameNumber a rdfs:Property;
rdfs:domain qcl:Uint;
qcl:size 8;
qcl:order 8;
rdfs:range allocation:ProverAllocation.
allocation:JoinConfirmFrameNumber a rdfs:Property;
rdfs:domain qcl:Uint;
qcl:size 8;
qcl:order 9;
rdfs:range allocation:ProverAllocation.
allocation:JoinRejectFrameNumber a rdfs:Property;
rdfs:domain qcl:Uint;
qcl:size 8;
qcl:order 10;
rdfs:range allocation:ProverAllocation.
allocation:LeaveConfirmFrameNumber a rdfs:Property;
rdfs:domain qcl:Uint;
qcl:size 8;
qcl:order 11;
rdfs:range allocation:ProverAllocation.
allocation:LeaveRejectFrameNumber a rdfs:Property;
rdfs:domain qcl:Uint;
qcl:size 8;
qcl:order 12;
rdfs:range allocation:ProverAllocation.
allocation:LastActiveFrameNumber a rdfs:Property;
rdfs:domain qcl:Uint;
qcl:size 8;
qcl:order 13;
rdfs:range allocation:ProverAllocation.
reward:ProverReward a rdfs:Class.
reward:DelegateAddress a rdfs:Property;
rdfs:domain qcl:ByteArray;
qcl:size 32;
qcl:order 0;
rdfs:range reward:ProverReward.
reward:Balance a rdfs:Property;
rdfs:domain qcl:ByteArray;
qcl:size 32;
qcl:order 1;
rdfs:range reward:ProverReward.
`
// GetRDFSchema implements intrinsics.Intrinsic.
func (a *GlobalIntrinsic) GetRDFSchema() (
map[string]map[string]*schema.RDFTag,
error,
) {
tags, err := a.rdfMultiprover.GetSchemaMap(a.rdfHypergraphSchema)
return tags, errors.Wrap(err, "get rdf schema")
}
// SumCheck implements intrinsics.Intrinsic.
func (a *GlobalIntrinsic) SumCheck() bool {
return true
}
// Address implements intrinsics.Intrinsic.
func (a *GlobalIntrinsic) Address() []byte {
return intrinsics.GLOBAL_INTRINSIC_ADDRESS[:]
}
// Commit implements intrinsics.Intrinsic.
func (a *GlobalIntrinsic) Commit() (state.State, error) {
timer := prometheus.NewTimer(
observability.CommitDuration.WithLabelValues("global"),
)
defer timer.ObserveDuration()
if a.state == nil {
observability.CommitErrors.WithLabelValues("global").Inc()
return nil, errors.Wrap(errors.New("nothing to commit"), "commit")
}
if err := a.state.Commit(); err != nil {
observability.CommitErrors.WithLabelValues("global").Inc()
return a.state, errors.Wrap(err, "commit")
}
observability.CommitTotal.WithLabelValues("global").Inc()
return a.state, nil
}
// Deploy implements intrinsics.Intrinsic.
func (a *GlobalIntrinsic) Deploy(
domain [32]byte,
provers [][]byte,
creator []byte,
fee *big.Int,
contextData []byte,
frameNumber uint64,
state state.State,
) (state.State, []byte, error) {
return nil, nil, errors.Wrap(
errors.New("global intrinsic cannot be deployed"),
"deploy",
)
}
func (a *GlobalIntrinsic) Validate(
frameNumber uint64,
input []byte,
) error {
timer := prometheus.NewTimer(
observability.ValidateDuration.WithLabelValues("global"),
)
defer timer.ObserveDuration()
// Check type prefix to determine request type
if len(input) < 4 {
observability.ValidateErrors.WithLabelValues(
"global",
"invalid_input",
).Inc()
return errors.Wrap(errors.New("input too short"), "validate")
}
// Read the type prefix
typePrefix := binary.BigEndian.Uint32(input[:4])
// Handle each type based on type prefix
switch typePrefix {
case protobufs.ProverJoinType:
// Parse ProverJoin directly from input
pbJoin := &protobufs.ProverJoin{}
if err := pbJoin.FromCanonicalBytes(input); err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_join",
).Inc()
return errors.Wrap(err, "validate")
}
// Convert from protobuf to intrinsics type
op, err := ProverJoinFromProtobuf(
pbJoin,
a.hypergraph,
nil,
nil,
a.keyManager,
a.frameProver,
a.frameStore,
)
if err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_join",
).Inc()
return errors.Wrap(err, "validate")
}
// Set runtime dependencies
op.rdfMultiprover = a.rdfMultiprover
op.hypergraph = a.hypergraph
op.keyManager = a.keyManager
valid, err := op.Verify(frameNumber)
if err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_join",
).Inc()
return errors.Wrap(err, "validate")
}
if !valid {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_join",
).Inc()
return errors.Wrap(errors.New("invalid prover join"), "validate")
}
observability.ValidateTotal.WithLabelValues("global", "prover_join").Inc()
return nil
case protobufs.ProverLeaveType:
// Parse ProverLeave directly from input
pbLeave := &protobufs.ProverLeave{}
if err := pbLeave.FromCanonicalBytes(input); err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_leave",
).Inc()
return errors.Wrap(err, "validate")
}
// Convert from protobuf to intrinsics type
op, err := ProverLeaveFromProtobuf(
pbLeave,
a.hypergraph,
nil,
nil,
a.keyManager,
)
if err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_leave",
).Inc()
return errors.Wrap(err, "validate")
}
// Set runtime dependencies
op.rdfMultiprover = a.rdfMultiprover
op.hypergraph = a.hypergraph
op.keyManager = a.keyManager
valid, err := op.Verify(frameNumber)
if err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_leave",
).Inc()
return errors.Wrap(err, "validate")
}
if !valid {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_leave",
).Inc()
return errors.Wrap(errors.New("invalid prover leave"), "validate")
}
observability.ValidateTotal.WithLabelValues(
"global",
"prover_leave",
).Inc()
return nil
case protobufs.ProverPauseType:
// Parse ProverPause directly from input
pbPause := &protobufs.ProverPause{}
if err := pbPause.FromCanonicalBytes(input); err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_pause",
).Inc()
return errors.Wrap(err, "validate")
}
// Convert from protobuf to intrinsics type
op, err := ProverPauseFromProtobuf(
pbPause,
a.hypergraph,
nil,
nil,
a.keyManager,
)
if err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_pause",
).Inc()
return errors.Wrap(err, "validate")
}
// Set runtime dependencies
op.rdfMultiprover = a.rdfMultiprover
op.hypergraph = a.hypergraph
op.keyManager = a.keyManager
valid, err := op.Verify(frameNumber)
if err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_pause",
).Inc()
return errors.Wrap(err, "validate")
}
if !valid {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_pause",
).Inc()
return errors.Wrap(errors.New("invalid prover pause"), "validate")
}
observability.ValidateTotal.WithLabelValues(
"global",
"prover_pause",
).Inc()
return nil
case protobufs.ProverResumeType:
// Parse ProverResume directly from input
pbResume := &protobufs.ProverResume{}
if err := pbResume.FromCanonicalBytes(input); err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_resume",
).Inc()
return errors.Wrap(err, "validate")
}
// Convert from protobuf to intrinsics type
op, err := ProverResumeFromProtobuf(
pbResume,
a.hypergraph,
nil,
nil,
a.keyManager,
)
if err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_resume",
).Inc()
return errors.Wrap(err, "validate")
}
// Set runtime dependencies
op.rdfMultiprover = a.rdfMultiprover
op.hypergraph = a.hypergraph
op.keyManager = a.keyManager
valid, err := op.Verify(frameNumber)
if err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_resume",
).Inc()
return errors.Wrap(err, "validate")
}
if !valid {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_resume",
).Inc()
return errors.Wrap(
errors.New("invalid prover resume"),
"validate",
)
}
observability.ValidateTotal.WithLabelValues(
"global",
"prover_resume",
).Inc()
return nil
case protobufs.ProverConfirmType:
// Parse ProverConfirm directly from input
pbConfirm := &protobufs.ProverConfirm{}
if err := pbConfirm.FromCanonicalBytes(input); err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_confirm",
).Inc()
return errors.Wrap(err, "validate")
}
// Convert from protobuf to intrinsics type
op, err := ProverConfirmFromProtobuf(
pbConfirm,
a.hypergraph,
nil,
nil,
a.keyManager,
)
if err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_confirm",
).Inc()
return errors.Wrap(err, "validate")
}
// Set runtime dependencies
op.rdfMultiprover = a.rdfMultiprover
op.hypergraph = a.hypergraph
op.keyManager = a.keyManager
valid, err := op.Verify(frameNumber)
if err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_confirm",
).Inc()
return errors.Wrap(err, "validate")
}
if !valid {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_confirm",
).Inc()
return errors.Wrap(
errors.New("invalid prover confirm"),
"validate",
)
}
observability.ValidateTotal.WithLabelValues(
"global",
"prover_confirm",
).Inc()
return nil
case protobufs.ProverRejectType:
// Parse ProverReject directly from input
pbReject := &protobufs.ProverReject{}
if err := pbReject.FromCanonicalBytes(input); err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_reject",
).Inc()
return errors.Wrap(err, "validate")
}
// Convert from protobuf to intrinsics type
op, err := ProverRejectFromProtobuf(
pbReject,
a.hypergraph,
nil,
nil,
a.keyManager,
)
if err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_reject",
).Inc()
return errors.Wrap(err, "validate")
}
// Set runtime dependencies
op.rdfMultiprover = a.rdfMultiprover
op.hypergraph = a.hypergraph
op.keyManager = a.keyManager
valid, err := op.Verify(frameNumber)
if err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_reject",
).Inc()
return errors.Wrap(err, "validate")
}
if !valid {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_reject",
).Inc()
return errors.Wrap(
errors.New("invalid prover reject"),
"validate",
)
}
observability.ValidateTotal.WithLabelValues(
"global",
"prover_reject",
).Inc()
return nil
case protobufs.ProverKickType:
// Parse ProverKick directly from input
pbKick := &protobufs.ProverKick{}
if err := pbKick.FromCanonicalBytes(input); err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_kick",
).Inc()
return errors.Wrap(err, "validate")
}
// Convert from protobuf to intrinsics type
op, err := ProverKickFromProtobuf(pbKick, a.hypergraph, nil, a.keyManager)
if err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_kick",
).Inc()
return errors.Wrap(err, "validate")
}
// Set runtime dependencies
op.rdfMultiprover = a.rdfMultiprover
op.hypergraph = a.hypergraph
valid, err := op.Verify(frameNumber)
if err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_kick",
).Inc()
return errors.Wrap(err, "validate")
}
if !valid {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_kick",
).Inc()
return errors.Wrap(
errors.New("invalid prover kick"),
"validate",
)
}
observability.ValidateTotal.WithLabelValues("global", "prover_kick").Inc()
return nil
case protobufs.FrameHeaderType:
pbHeader := &protobufs.FrameHeader{}
if err := pbHeader.FromCanonicalBytes(input); err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_shard_update",
).Inc()
return errors.Wrap(err, "validate")
}
op, err := NewProverShardUpdate(
a.logger,
pbHeader,
a.keyManager,
a.hypergraph,
a.rdfMultiprover,
a.frameProver,
a.rewardIssuance,
a.proverRegistry,
a.blsConstructor,
)
if err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_shard_update",
).Inc()
return errors.Wrap(err, "validate")
}
valid, err := op.Verify(frameNumber)
if err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_shard_update",
).Inc()
return errors.Wrap(err, "validate")
}
if !valid {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_shard_update",
).Inc()
return errors.Wrap(
errors.New("invalid prover shard update"),
"validate",
)
}
observability.ValidateTotal.WithLabelValues(
"global",
"prover_shard_update",
).Inc()
return nil
case protobufs.ProverSeniorityMergeType:
// Parse ProverSeniorityMerge directly from input
pb := &protobufs.ProverSeniorityMerge{}
if err := pb.FromCanonicalBytes(input); err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_seniority_merge",
).Inc()
return errors.Wrap(err, "validate")
}
// Convert from protobuf to intrinsics type
op, err := ProverSeniorityMergeFromProtobuf(
pb,
a.hypergraph,
a.rdfMultiprover,
a.keyManager,
)
if err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_seniority_merge",
).Inc()
return errors.Wrap(err, "validate")
}
valid, err := op.Verify(frameNumber)
if err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_seniority_merge",
).Inc()
return errors.Wrap(err, "validate")
}
if !valid {
observability.ValidateErrors.WithLabelValues(
"global",
"prover_seniority_merge",
).Inc()
return errors.Wrap(
errors.New("invalid prover seniority merge"),
"validate",
)
}
observability.ValidateTotal.WithLabelValues(
"global",
"prover_seniority_merge",
).Inc()
return nil
case protobufs.ShardSplitType:
pb := &protobufs.ShardSplit{}
if err := pb.FromCanonicalBytes(input); err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"shard_split",
).Inc()
return errors.Wrap(err, "validate")
}
op, err := ShardSplitFromProtobuf(
pb,
a.hypergraph,
a.keyManager,
a.shardsStore,
a.proverRegistry,
)
if err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"shard_split",
).Inc()
return errors.Wrap(err, "validate")
}
valid, err := op.Verify(frameNumber)
if err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"shard_split",
).Inc()
return errors.Wrap(err, "validate")
}
if !valid {
observability.ValidateErrors.WithLabelValues(
"global",
"shard_split",
).Inc()
return errors.Wrap(
errors.New("invalid shard split"),
"validate",
)
}
observability.ValidateTotal.WithLabelValues(
"global",
"shard_split",
).Inc()
return nil
case protobufs.ShardMergeType:
pb := &protobufs.ShardMerge{}
if err := pb.FromCanonicalBytes(input); err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"shard_merge",
).Inc()
return errors.Wrap(err, "validate")
}
op, err := ShardMergeFromProtobuf(
pb,
a.hypergraph,
a.keyManager,
a.shardsStore,
a.proverRegistry,
)
if err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"shard_merge",
).Inc()
return errors.Wrap(err, "validate")
}
valid, err := op.Verify(frameNumber)
if err != nil {
observability.ValidateErrors.WithLabelValues(
"global",
"shard_merge",
).Inc()
return errors.Wrap(err, "validate")
}
if !valid {
observability.ValidateErrors.WithLabelValues(
"global",
"shard_merge",
).Inc()
return errors.Wrap(
errors.New("invalid shard merge"),
"validate",
)
}
observability.ValidateTotal.WithLabelValues(
"global",
"shard_merge",
).Inc()
return nil
default:
observability.ValidateErrors.WithLabelValues(
"global",
"unknown_type",
).Inc()
return errors.Wrap(
errors.New("unknown global request type"),
"validate",
)
}
}
// InvokeStep implements intrinsics.Intrinsic.
func (a *GlobalIntrinsic) InvokeStep(
frameNumber uint64,
input []byte,
fee *big.Int,
feeMultiplier *big.Int,
state state.State,
) (state.State, error) {
timer := prometheus.NewTimer(
observability.InvokeStepDuration.WithLabelValues("global"),
)
defer timer.ObserveDuration()
// Check type prefix to determine request type
if len(input) < 4 {
observability.InvokeStepErrors.WithLabelValues(
"global",
"invalid_input",
).Inc()
return nil, errors.Wrap(errors.New("input too short"), "invoke step")
}
// Read the type prefix
typePrefix := binary.BigEndian.Uint32(input[:4])
// Handle each type based on type prefix
switch typePrefix {
case protobufs.ProverJoinType:
opTimer := prometheus.NewTimer(
observability.OperationDuration.WithLabelValues("global", "prover_join"),
)
defer opTimer.ObserveDuration()
// Parse ProverJoin directly from input
pbJoin := &protobufs.ProverJoin{}
if err := pbJoin.FromCanonicalBytes(input); err != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_join",
).Inc()
return nil, errors.Wrap(err, "invoke step")
}
// Convert from protobuf to intrinsics type
op, err := ProverJoinFromProtobuf(
pbJoin,
a.hypergraph,
nil,
nil,
a.keyManager,
a.frameProver,
a.frameStore,
)
if err != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_join",
).Inc()
return nil, errors.Wrap(err, "invoke step")
}
// Set runtime dependencies
op.rdfMultiprover = a.rdfMultiprover
op.hypergraph = a.hypergraph
op.keyManager = a.keyManager
matTimer := prometheus.NewTimer(
observability.MaterializeDuration.WithLabelValues("global"),
)
resultState, matErr := op.Materialize(frameNumber, state)
matTimer.ObserveDuration()
if matErr != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_join",
).Inc()
return nil, errors.Wrap(matErr, "invoke step")
}
observability.InvokeStepTotal.WithLabelValues("global", "prover_join").Inc()
return resultState, nil
case protobufs.ProverLeaveType:
opTimer := prometheus.NewTimer(
observability.OperationDuration.WithLabelValues("global", "prover_leave"),
)
defer opTimer.ObserveDuration()
// Parse ProverLeave directly from input
pbLeave := &protobufs.ProverLeave{}
if err := pbLeave.FromCanonicalBytes(input); err != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_leave",
).Inc()
return nil, errors.Wrap(err, "invoke step")
}
// Convert from protobuf to intrinsics type
op, err := ProverLeaveFromProtobuf(
pbLeave,
a.hypergraph,
nil,
nil,
a.keyManager,
)
if err != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_leave",
).Inc()
return nil, errors.Wrap(err, "invoke step")
}
// Set runtime dependencies
op.rdfMultiprover = a.rdfMultiprover
op.hypergraph = a.hypergraph
op.keyManager = a.keyManager
matTimer := prometheus.NewTimer(
observability.MaterializeDuration.WithLabelValues("global"),
)
resultState, matErr := op.Materialize(frameNumber, state)
matTimer.ObserveDuration()
if matErr != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_leave",
).Inc()
return nil, errors.Wrap(matErr, "invoke step")
}
observability.InvokeStepTotal.WithLabelValues(
"global",
"prover_leave",
).Inc()
return resultState, nil
case protobufs.ProverPauseType:
opTimer := prometheus.NewTimer(
observability.OperationDuration.WithLabelValues("global", "prover_pause"),
)
defer opTimer.ObserveDuration()
// Parse ProverPause directly from input
pbPause := &protobufs.ProverPause{}
if err := pbPause.FromCanonicalBytes(input); err != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_pause",
).Inc()
return nil, errors.Wrap(err, "invoke step")
}
// Convert from protobuf to intrinsics type
op, err := ProverPauseFromProtobuf(
pbPause,
a.hypergraph,
nil,
nil,
a.keyManager,
)
if err != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_pause",
).Inc()
return nil, errors.Wrap(err, "invoke step")
}
// Set runtime dependencies
op.rdfMultiprover = a.rdfMultiprover
op.hypergraph = a.hypergraph
op.keyManager = a.keyManager
matTimer := prometheus.NewTimer(
observability.MaterializeDuration.WithLabelValues("global"),
)
resultState, matErr := op.Materialize(frameNumber, state)
matTimer.ObserveDuration()
if matErr != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_pause",
).Inc()
return nil, errors.Wrap(matErr, "invoke step")
}
observability.InvokeStepTotal.WithLabelValues(
"global",
"prover_pause",
).Inc()
return resultState, nil
case protobufs.ProverResumeType:
opTimer := prometheus.NewTimer(
observability.OperationDuration.WithLabelValues(
"global",
"prover_resume",
),
)
defer opTimer.ObserveDuration()
// Parse ProverResume directly from input
pbResume := &protobufs.ProverResume{}
if err := pbResume.FromCanonicalBytes(input); err != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_resume",
).Inc()
return nil, errors.Wrap(err, "invoke step")
}
// Convert from protobuf to intrinsics type
op, err := ProverResumeFromProtobuf(
pbResume,
a.hypergraph,
nil,
nil,
a.keyManager,
)
if err != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_resume",
).Inc()
return nil, errors.Wrap(err, "invoke step")
}
// Set runtime dependencies
op.rdfMultiprover = a.rdfMultiprover
op.hypergraph = a.hypergraph
op.keyManager = a.keyManager
matTimer := prometheus.NewTimer(
observability.MaterializeDuration.WithLabelValues("global"),
)
resultState, matErr := op.Materialize(frameNumber, state)
matTimer.ObserveDuration()
if matErr != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_resume",
).Inc()
return nil, errors.Wrap(matErr, "invoke step")
}
observability.InvokeStepTotal.WithLabelValues(
"global",
"prover_resume",
).Inc()
return resultState, nil
case protobufs.ProverConfirmType:
opTimer := prometheus.NewTimer(
observability.OperationDuration.WithLabelValues(
"global",
"prover_confirm",
),
)
defer opTimer.ObserveDuration()
// Parse ProverConfirm directly from input
pbConfirm := &protobufs.ProverConfirm{}
if err := pbConfirm.FromCanonicalBytes(input); err != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_confirm",
).Inc()
return nil, errors.Wrap(err, "invoke step")
}
// Convert from protobuf to intrinsics type
op, err := ProverConfirmFromProtobuf(
pbConfirm,
a.hypergraph,
nil,
nil,
a.keyManager,
)
if err != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_confirm",
).Inc()
return nil, errors.Wrap(err, "invoke step")
}
// Set runtime dependencies
op.rdfMultiprover = a.rdfMultiprover
op.hypergraph = a.hypergraph
op.keyManager = a.keyManager
matTimer := prometheus.NewTimer(
observability.MaterializeDuration.WithLabelValues("global"),
)
resultState, matErr := op.Materialize(frameNumber, state)
matTimer.ObserveDuration()
if matErr != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_confirm",
).Inc()
return nil, errors.Wrap(matErr, "invoke step")
}
observability.InvokeStepTotal.WithLabelValues(
"global",
"prover_confirm",
).Inc()
return resultState, nil
case protobufs.ProverRejectType:
opTimer := prometheus.NewTimer(
observability.OperationDuration.WithLabelValues(
"global",
"prover_reject",
),
)
defer opTimer.ObserveDuration()
// Parse ProverReject directly from input
pbReject := &protobufs.ProverReject{}
if err := pbReject.FromCanonicalBytes(input); err != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_reject",
).Inc()
return nil, errors.Wrap(err, "invoke step")
}
// Convert from protobuf to intrinsics type
op, err := ProverRejectFromProtobuf(
pbReject,
a.hypergraph,
nil,
nil,
a.keyManager,
)
if err != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_reject",
).Inc()
return nil, errors.Wrap(err, "invoke step")
}
// Set runtime dependencies
op.rdfMultiprover = a.rdfMultiprover
op.hypergraph = a.hypergraph
op.keyManager = a.keyManager
matTimer := prometheus.NewTimer(
observability.MaterializeDuration.WithLabelValues("global"),
)
resultState, matErr := op.Materialize(frameNumber, state)
matTimer.ObserveDuration()
if matErr != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_reject",
).Inc()
return nil, errors.Wrap(matErr, "invoke step")
}
observability.InvokeStepTotal.WithLabelValues(
"global",
"prover_reject",
).Inc()
return resultState, nil
case protobufs.ProverKickType:
opTimer := prometheus.NewTimer(
observability.OperationDuration.WithLabelValues("global", "prover_kick"),
)
defer opTimer.ObserveDuration()
// Parse ProverKick directly from input
pbKick := &protobufs.ProverKick{}
if err := pbKick.FromCanonicalBytes(input); err != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_kick",
).Inc()
return nil, errors.Wrap(err, "invoke step")
}
// Convert from protobuf to intrinsics type
op, err := ProverKickFromProtobuf(pbKick, a.hypergraph, nil, a.keyManager)
if err != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_kick",
).Inc()
return nil, errors.Wrap(err, "invoke step")
}
// Set runtime dependencies
op.rdfMultiprover = a.rdfMultiprover
op.hypergraph = a.hypergraph
matTimer := prometheus.NewTimer(
observability.MaterializeDuration.WithLabelValues("global"),
)
resultState, matErr := op.Materialize(frameNumber, state)
matTimer.ObserveDuration()
if matErr != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_kick",
).Inc()
return nil, errors.Wrap(matErr, "invoke step")
}
observability.InvokeStepTotal.WithLabelValues("global", "prover_kick").Inc()
return resultState, nil
case protobufs.FrameHeaderType:
opTimer := prometheus.NewTimer(
observability.OperationDuration.WithLabelValues(
"global",
"prover_shard_update",
),
)
defer opTimer.ObserveDuration()
pbHeader := &protobufs.FrameHeader{}
if err := pbHeader.FromCanonicalBytes(input); err != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_shard_update",
).Inc()
return nil, errors.Wrap(err, "invoke step")
}
op, err := NewProverShardUpdate(
a.logger,
pbHeader,
a.keyManager,
a.hypergraph,
a.rdfMultiprover,
a.frameProver,
a.rewardIssuance,
a.proverRegistry,
a.blsConstructor,
)
if err != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_shard_update",
).Inc()
return nil, errors.Wrap(err, "invoke step")
}
matTimer := prometheus.NewTimer(
observability.MaterializeDuration.WithLabelValues("global"),
)
resultState, matErr := op.Materialize(frameNumber, state)
matTimer.ObserveDuration()
if matErr != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_shard_update",
).Inc()
return nil, errors.Wrap(matErr, "invoke step")
}
observability.InvokeStepTotal.WithLabelValues(
"global",
"prover_shard_update",
).Inc()
return resultState, nil
case protobufs.ProverSeniorityMergeType:
opTimer := prometheus.NewTimer(
observability.OperationDuration.WithLabelValues(
"global",
"prover_seniority_merge",
),
)
defer opTimer.ObserveDuration()
// Parse ProverSeniorityMerge directly from input
pb := &protobufs.ProverSeniorityMerge{}
if err := pb.FromCanonicalBytes(input); err != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_seniority_merge",
).Inc()
return nil, errors.Wrap(err, "invoke step")
}
// Convert from protobuf to intrinsics type
op, err := ProverSeniorityMergeFromProtobuf(
pb,
a.hypergraph,
a.rdfMultiprover,
a.keyManager,
)
if err != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_seniority_merge",
).Inc()
return nil, errors.Wrap(err, "invoke step")
}
matTimer := prometheus.NewTimer(
observability.MaterializeDuration.WithLabelValues("global"),
)
resultState, matErr := op.Materialize(frameNumber, state)
matTimer.ObserveDuration()
if matErr != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"prover_seniority_merge",
).Inc()
return nil, errors.Wrap(matErr, "invoke step")
}
observability.InvokeStepTotal.WithLabelValues(
"global",
"prover_seniority_merge",
).Inc()
return resultState, nil
case protobufs.ShardSplitType:
opTimer := prometheus.NewTimer(
observability.OperationDuration.WithLabelValues(
"global",
"shard_split",
),
)
defer opTimer.ObserveDuration()
pb := &protobufs.ShardSplit{}
if err := pb.FromCanonicalBytes(input); err != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"shard_split",
).Inc()
return nil, errors.Wrap(err, "invoke step")
}
op, err := ShardSplitFromProtobuf(
pb,
a.hypergraph,
a.keyManager,
a.shardsStore,
a.proverRegistry,
)
if err != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"shard_split",
).Inc()
return nil, errors.Wrap(err, "invoke step")
}
matTimer := prometheus.NewTimer(
observability.MaterializeDuration.WithLabelValues("global"),
)
resultState, matErr := op.Materialize(frameNumber, state)
matTimer.ObserveDuration()
if matErr != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"shard_split",
).Inc()
return nil, errors.Wrap(matErr, "invoke step")
}
observability.InvokeStepTotal.WithLabelValues(
"global",
"shard_split",
).Inc()
return resultState, nil
case protobufs.ShardMergeType:
opTimer := prometheus.NewTimer(
observability.OperationDuration.WithLabelValues(
"global",
"shard_merge",
),
)
defer opTimer.ObserveDuration()
pb := &protobufs.ShardMerge{}
if err := pb.FromCanonicalBytes(input); err != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"shard_merge",
).Inc()
return nil, errors.Wrap(err, "invoke step")
}
op, err := ShardMergeFromProtobuf(
pb,
a.hypergraph,
a.keyManager,
a.shardsStore,
a.proverRegistry,
)
if err != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"shard_merge",
).Inc()
return nil, errors.Wrap(err, "invoke step")
}
matTimer := prometheus.NewTimer(
observability.MaterializeDuration.WithLabelValues("global"),
)
resultState, matErr := op.Materialize(frameNumber, state)
matTimer.ObserveDuration()
if matErr != nil {
observability.InvokeStepErrors.WithLabelValues(
"global",
"shard_merge",
).Inc()
return nil, errors.Wrap(matErr, "invoke step")
}
observability.InvokeStepTotal.WithLabelValues(
"global",
"shard_merge",
).Inc()
return resultState, nil
default:
observability.InvokeStepErrors.WithLabelValues(
"global",
"unknown_type",
).Inc()
return nil, errors.Wrap(
errors.New("unknown global request type"),
"invoke step",
)
}
}
// Lock implements intrinsics.Intrinsic.
func (a *GlobalIntrinsic) Lock(
frameNumber uint64,
input []byte,
) ([][]byte, error) {
a.lockedReadsMx.Lock()
a.lockedWritesMx.Lock()
defer a.lockedReadsMx.Unlock()
defer a.lockedWritesMx.Unlock()
if a.lockedReads == nil {
a.lockedReads = make(map[string]int)
}
if a.lockedWrites == nil {
a.lockedWrites = make(map[string]struct{})
}
// Check type prefix to determine request type
if len(input) < 4 {
observability.LockErrors.WithLabelValues(
"global",
"invalid_input",
).Inc()
return nil, errors.Wrap(errors.New("input too short"), "lock")
}
// Read the type prefix
typePrefix := binary.BigEndian.Uint32(input[:4])
var reads, writes [][]byte
var err error
// Handle each type based on type prefix
switch typePrefix {
case protobufs.ProverJoinType:
reads, writes, err = a.tryLockJoin(frameNumber, input)
if err != nil {
return nil, err
}
observability.LockTotal.WithLabelValues("global", "prover_join").Inc()
case protobufs.ProverLeaveType:
reads, writes, err = a.tryLockLeave(frameNumber, input)
if err != nil {
return nil, err
}
observability.LockTotal.WithLabelValues(
"global",
"prover_leave",
).Inc()
case protobufs.ProverPauseType:
reads, writes, err = a.tryLockPause(frameNumber, input)
if err != nil {
return nil, err
}
observability.LockTotal.WithLabelValues(
"global",
"prover_pause",
).Inc()
case protobufs.ProverResumeType:
reads, writes, err = a.tryLockResume(frameNumber, input)
if err != nil {
return nil, err
}
observability.LockTotal.WithLabelValues(
"global",
"prover_resume",
).Inc()
case protobufs.ProverConfirmType:
reads, writes, err = a.tryLockConfirm(frameNumber, input)
if err != nil {
return nil, err
}
observability.LockTotal.WithLabelValues(
"global",
"prover_confirm",
).Inc()
case protobufs.ProverRejectType:
reads, writes, err = a.tryLockReject(frameNumber, input)
if err != nil {
return nil, err
}
observability.LockTotal.WithLabelValues(
"global",
"prover_reject",
).Inc()
case protobufs.ProverKickType:
reads, writes, err = a.tryLockKick(frameNumber, input)
if err != nil {
return nil, err
}
observability.LockTotal.WithLabelValues("global", "prover_kick").Inc()
case protobufs.ProverSeniorityMergeType:
reads, writes, err = a.tryLockSeniorityMerge(frameNumber, input)
if err != nil {
return nil, err
}
observability.LockTotal.WithLabelValues(
"global",
"prover_seniority_merge",
).Inc()
case protobufs.ShardSplitType:
reads, writes, err = a.tryLockShardSplit(frameNumber, input)
if err != nil {
return nil, err
}
observability.LockTotal.WithLabelValues(
"global",
"shard_split",
).Inc()
case protobufs.ShardMergeType:
reads, writes, err = a.tryLockShardMerge(frameNumber, input)
if err != nil {
return nil, err
}
observability.LockTotal.WithLabelValues(
"global",
"shard_merge",
).Inc()
default:
observability.LockErrors.WithLabelValues(
"global",
"unknown_type",
).Inc()
return nil, errors.Wrap(
errors.New("unknown global request type"),
"lock",
)
}
for _, address := range writes {
if _, ok := a.lockedWrites[string(address)]; ok {
return nil, errors.Wrap(
fmt.Errorf("address %x is already locked for writing", address),
"lock",
)
}
if _, ok := a.lockedReads[string(address)]; ok {
return nil, errors.Wrap(
fmt.Errorf("address %x is already locked for reading", address),
"lock",
)
}
}
for _, address := range reads {
if _, ok := a.lockedWrites[string(address)]; ok {
return nil, errors.Wrap(
fmt.Errorf("address %x is already locked for writing", address),
"lock",
)
}
}
set := map[string]struct{}{}
for _, address := range writes {
a.lockedWrites[string(address)] = struct{}{}
a.lockedReads[string(address)] = a.lockedReads[string(address)] + 1
set[string(address)] = struct{}{}
}
for _, address := range reads {
a.lockedReads[string(address)] = a.lockedReads[string(address)] + 1
set[string(address)] = struct{}{}
}
result := [][]byte{}
for a := range set {
result = append(result, []byte(a))
}
return result, nil
}
// Unlock implements intrinsics.Intrinsic.
func (a *GlobalIntrinsic) Unlock() error {
a.lockedReadsMx.Lock()
a.lockedWritesMx.Lock()
defer a.lockedReadsMx.Unlock()
defer a.lockedWritesMx.Unlock()
a.lockedReads = make(map[string]int)
a.lockedWrites = make(map[string]struct{})
return nil
}
func (a *GlobalIntrinsic) tryLockJoin(frameNumber uint64, input []byte) (
[][]byte,
[][]byte,
error,
) {
// Parse ProverJoin directly from input
pbJoin := &protobufs.ProverJoin{}
if err := pbJoin.FromCanonicalBytes(input); err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_join",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
// Convert from protobuf to intrinsics type
op, err := ProverJoinFromProtobuf(
pbJoin,
a.hypergraph,
nil,
nil,
a.keyManager,
a.frameProver,
a.frameStore,
)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_join",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
// Set runtime dependencies
op.rdfMultiprover = a.rdfMultiprover
op.hypergraph = a.hypergraph
op.keyManager = a.keyManager
reads, err := op.GetReadAddresses(frameNumber)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_join",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
writes, err := op.GetWriteAddresses(frameNumber)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_join",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
return reads, writes, nil
}
func (a *GlobalIntrinsic) tryLockLeave(frameNumber uint64, input []byte) (
[][]byte,
[][]byte,
error,
) {
// Parse ProverLeave directly from input
pbLeave := &protobufs.ProverLeave{}
if err := pbLeave.FromCanonicalBytes(input); err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_leave",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
// Convert from protobuf to intrinsics type
op, err := ProverLeaveFromProtobuf(
pbLeave,
a.hypergraph,
nil,
nil,
a.keyManager,
)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_leave",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
// Set runtime dependencies
op.rdfMultiprover = a.rdfMultiprover
op.hypergraph = a.hypergraph
op.keyManager = a.keyManager
reads, err := op.GetReadAddresses(frameNumber)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_leave",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
writes, err := op.GetWriteAddresses(frameNumber)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_leave",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
return reads, writes, nil
}
func (a *GlobalIntrinsic) tryLockPause(frameNumber uint64, input []byte) (
[][]byte,
[][]byte,
error,
) {
// Parse ProverPause directly from input
pbPause := &protobufs.ProverPause{}
if err := pbPause.FromCanonicalBytes(input); err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_pause",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
// Convert from protobuf to intrinsics type
op, err := ProverPauseFromProtobuf(
pbPause,
a.hypergraph,
nil,
nil,
a.keyManager,
)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_pause",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
// Set runtime dependencies
op.rdfMultiprover = a.rdfMultiprover
op.hypergraph = a.hypergraph
op.keyManager = a.keyManager
reads, err := op.GetReadAddresses(frameNumber)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_pause",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
writes, err := op.GetWriteAddresses(frameNumber)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_pause",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
return reads, writes, nil
}
func (a *GlobalIntrinsic) tryLockResume(frameNumber uint64, input []byte) (
[][]byte,
[][]byte,
error,
) {
// Parse ProverResume directly from input
pbResume := &protobufs.ProverResume{}
if err := pbResume.FromCanonicalBytes(input); err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_resume",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
// Convert from protobuf to intrinsics type
op, err := ProverResumeFromProtobuf(
pbResume,
a.hypergraph,
nil,
nil,
a.keyManager,
)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_resume",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
// Set runtime dependencies
op.rdfMultiprover = a.rdfMultiprover
op.hypergraph = a.hypergraph
op.keyManager = a.keyManager
reads, err := op.GetReadAddresses(frameNumber)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_resume",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
writes, err := op.GetWriteAddresses(frameNumber)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_resume",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
return reads, writes, nil
}
func (a *GlobalIntrinsic) tryLockConfirm(frameNumber uint64, input []byte) (
[][]byte,
[][]byte,
error,
) {
// Parse ProverConfirm directly from input
pbConfirm := &protobufs.ProverConfirm{}
if err := pbConfirm.FromCanonicalBytes(input); err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_confirm",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
// Convert from protobuf to intrinsics type
op, err := ProverConfirmFromProtobuf(
pbConfirm,
a.hypergraph,
nil,
nil,
a.keyManager,
)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_confirm",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
// Set runtime dependencies
op.rdfMultiprover = a.rdfMultiprover
op.hypergraph = a.hypergraph
op.keyManager = a.keyManager
reads, err := op.GetReadAddresses(frameNumber)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_confirm",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
writes, err := op.GetWriteAddresses(frameNumber)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_confirm",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
return reads, writes, nil
}
func (a *GlobalIntrinsic) tryLockReject(frameNumber uint64, input []byte) (
[][]byte,
[][]byte,
error,
) {
// Parse ProverReject directly from input
pbReject := &protobufs.ProverReject{}
if err := pbReject.FromCanonicalBytes(input); err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_reject",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
// Convert from protobuf to intrinsics type
op, err := ProverRejectFromProtobuf(
pbReject,
a.hypergraph,
nil,
nil,
a.keyManager,
)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_reject",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
// Set runtime dependencies
op.rdfMultiprover = a.rdfMultiprover
op.hypergraph = a.hypergraph
op.keyManager = a.keyManager
reads, err := op.GetReadAddresses(frameNumber)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_reject",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
writes, err := op.GetWriteAddresses(frameNumber)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_reject",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
return reads, writes, nil
}
func (a *GlobalIntrinsic) tryLockKick(frameNumber uint64, input []byte) (
[][]byte,
[][]byte,
error,
) {
// Parse ProverKick directly from input
pbKick := &protobufs.ProverKick{}
if err := pbKick.FromCanonicalBytes(input); err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_kick",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
// Convert from protobuf to intrinsics type
op, err := ProverKickFromProtobuf(pbKick, a.hypergraph, nil, a.keyManager)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_kick",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
// Set runtime dependencies
op.rdfMultiprover = a.rdfMultiprover
op.hypergraph = a.hypergraph
reads, err := op.GetReadAddresses(frameNumber)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_kick",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
writes, err := op.GetWriteAddresses(frameNumber)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_kick",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
return reads, writes, nil
}
func (a *GlobalIntrinsic) tryLockSeniorityMerge(
frameNumber uint64,
input []byte,
) (
[][]byte,
[][]byte,
error,
) {
// Parse ProverSeniorityMerge directly from input
pb := &protobufs.ProverSeniorityMerge{}
if err := pb.FromCanonicalBytes(input); err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_seniority_merge",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
// Convert from protobuf to intrinsics type
op, err := ProverSeniorityMergeFromProtobuf(
pb,
a.hypergraph,
a.rdfMultiprover,
a.keyManager,
)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_seniority_merge",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
reads, err := op.GetReadAddresses(frameNumber)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_seniority_merge",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
writes, err := op.GetWriteAddresses(frameNumber)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"prover_seniority_merge",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
return reads, writes, nil
}
func (a *GlobalIntrinsic) tryLockShardSplit(
frameNumber uint64,
input []byte,
) (
[][]byte,
[][]byte,
error,
) {
pb := &protobufs.ShardSplit{}
if err := pb.FromCanonicalBytes(input); err != nil {
observability.LockErrors.WithLabelValues(
"global",
"shard_split",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
op, err := ShardSplitFromProtobuf(
pb,
a.hypergraph,
a.keyManager,
a.shardsStore,
a.proverRegistry,
)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"shard_split",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
reads, err := op.GetReadAddresses(frameNumber)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"shard_split",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
writes, err := op.GetWriteAddresses(frameNumber)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"shard_split",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
return reads, writes, nil
}
func (a *GlobalIntrinsic) tryLockShardMerge(
frameNumber uint64,
input []byte,
) (
[][]byte,
[][]byte,
error,
) {
pb := &protobufs.ShardMerge{}
if err := pb.FromCanonicalBytes(input); err != nil {
observability.LockErrors.WithLabelValues(
"global",
"shard_merge",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
op, err := ShardMergeFromProtobuf(
pb,
a.hypergraph,
a.keyManager,
a.shardsStore,
a.proverRegistry,
)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"shard_merge",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
reads, err := op.GetReadAddresses(frameNumber)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"shard_merge",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
writes, err := op.GetWriteAddresses(frameNumber)
if err != nil {
observability.LockErrors.WithLabelValues(
"global",
"shard_merge",
).Inc()
return nil, nil, errors.Wrap(err, "lock")
}
return reads, writes, nil
}
// LoadGlobalIntrinsic loads the global intrinsic from the global intrinsic
// address. The global intrinsic is implicitly deployed and always exists at the
// global address.
func LoadGlobalIntrinsic(
logger *zap.Logger,
address []byte,
hypergraph hypergraph.Hypergraph,
inclusionProver crypto.InclusionProver,
keyManager keys.KeyManager,
frameProver crypto.FrameProver,
frameStore store.ClockStore,
rewardIssuance consensus.RewardIssuance,
proverRegistry consensus.ProverRegistry,
blsConstructor crypto.BlsConstructor,
shardsStore store.ShardsStore,
) (*GlobalIntrinsic, error) {
// Verify the address is the global intrinsic address
if !bytes.Equal(address, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:]) {
return nil, errors.Wrap(
errors.New("invalid address for global intrinsic"),
"load global intrinsic",
)
}
parser := &schema.TurtleRDFParser{}
rdfMultiprover := schema.NewRDFMultiprover(parser, inclusionProver)
// The global intrinsic doesn't need any initialization since it's implicitly
// deployed
return &GlobalIntrinsic{
logger: logger,
lockedWrites: make(map[string]struct{}),
lockedReads: make(map[string]int),
state: nil,
rdfHypergraphSchema: GLOBAL_RDF_SCHEMA,
rdfMultiprover: rdfMultiprover,
hypergraph: hypergraph,
keyManager: keyManager,
frameProver: frameProver,
frameStore: frameStore,
rewardIssuance: rewardIssuance,
proverRegistry: proverRegistry,
blsConstructor: blsConstructor,
shardsStore: shardsStore,
}, nil
}
var _ intrinsics.Intrinsic = (*GlobalIntrinsic)(nil)