diff --git a/config/version.go b/config/version.go index d19b918..cfdf5e4 100644 --- a/config/version.go +++ b/config/version.go @@ -43,7 +43,7 @@ func FormatVersion(version []byte) string { } func GetPatchNumber() byte { - return 0x09 + return 0x0a } func GetRCNumber() byte { diff --git a/consensus/consensus_liveness.go b/consensus/consensus_liveness.go index 426f522..e8132c8 100644 --- a/consensus/consensus_liveness.go +++ b/consensus/consensus_liveness.go @@ -17,7 +17,11 @@ type LivenessProvider[ ] interface { // Collect returns the collected mutation operations ahead of liveness // announcements. - Collect(ctx context.Context) (CollectedT, error) + Collect( + ctx context.Context, + frameNumber uint64, + rank uint64, + ) (CollectedT, error) // SendLiveness announces liveness ahead of the next prover deterimination and // subsequent proving. Provides prior state and collected mutation operations // if relevant. diff --git a/consensus/consensus_sync.go b/consensus/consensus_sync.go index 64e12a4..1bcea39 100644 --- a/consensus/consensus_sync.go +++ b/consensus/consensus_sync.go @@ -4,7 +4,6 @@ import ( "context" "source.quilibrium.com/quilibrium/monorepo/consensus/models" - "source.quilibrium.com/quilibrium/monorepo/types/tries" ) // SyncProvider handles synchronization management @@ -19,13 +18,6 @@ type SyncProvider[StateT models.Unique] interface { existing *StateT, ) (<-chan *StateT, <-chan error) - // Initiates hypersync with a prover. - HyperSync( - ctx context.Context, - prover []byte, - shardKey tries.ShardKey, - ) - // Enqueues state information to begin synchronization with a given peer. If // expectedIdentity is provided, may use this to determine if the initial // frameNumber for which synchronization begins is the correct fork. diff --git a/consensus/integration/instance_test.go b/consensus/integration/instance_test.go index 1b10056..fdd4240 100644 --- a/consensus/integration/instance_test.go +++ b/consensus/integration/instance_test.go @@ -450,11 +450,15 @@ func NewInstance(t *testing.T, options ...Option) *Instance { }, ) in.voting.On("FinalizeTimeout", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( - func(ctx context.Context, rank uint64, latestQuorumCertificate models.QuorumCertificate, latestQuorumCertificateRanks []uint64, aggregatedSignature models.AggregatedSignature) (models.TimeoutCertificate, error) { + func(ctx context.Context, rank uint64, latestQuorumCertificate models.QuorumCertificate, latestQuorumCertificateRanks []consensus.TimeoutSignerInfo, aggregatedSignature models.AggregatedSignature) (models.TimeoutCertificate, error) { + ranks := []uint64{} + for _, i := range latestQuorumCertificateRanks { + ranks = append(ranks, i.NewestQCRank) + } return &helper.TestTimeoutCertificate{ Filter: nil, Rank: rank, - LatestRanks: latestQuorumCertificateRanks, + LatestRanks: ranks, LatestQuorumCert: latestQuorumCertificate, AggregatedSignature: aggregatedSignature, }, nil diff --git a/consensus/timeoutaggregator/timeout_aggregator.go b/consensus/timeoutaggregator/timeout_aggregator.go index f00c25c..7ec1e8d 100644 --- a/consensus/timeoutaggregator/timeout_aggregator.go +++ b/consensus/timeoutaggregator/timeout_aggregator.go @@ -161,11 +161,6 @@ func (t *TimeoutAggregator[VoteT]) processQueuedTimeout( timeoutState.Rank, err) } - t.tracer.Trace( - "adding timeout to collector", - consensus.Uint64Param("timeout_rank", timeoutState.Rank), - consensus.IdentityParam("timeout_voter", (*timeoutState.Vote).Identity()), - ) err = collector.AddTimeout(timeoutState) if err != nil { return fmt.Errorf("could not process TO for rank %d: %w", @@ -182,7 +177,6 @@ func (t *TimeoutAggregator[VoteT]) AddTimeout( ) { // drop stale objects if timeoutState.Rank < t.lowestRetainedRank.Value() { - t.tracer.Trace("drop stale timeouts") return } diff --git a/consensus/timeoutaggregator/timeout_collectors.go b/consensus/timeoutaggregator/timeout_collectors.go index 7e58936..73dddd4 100644 --- a/consensus/timeoutaggregator/timeout_collectors.go +++ b/consensus/timeoutaggregator/timeout_collectors.go @@ -89,7 +89,6 @@ func (t *TimeoutCollectors[VoteT]) GetOrCreateCollector(rank uint64) ( } t.lock.Unlock() - t.tracer.Trace("timeout collector has been created") return collector, true, nil } @@ -151,6 +150,4 @@ func (t *TimeoutCollectors[VoteT]) PruneUpToRank(lowestRetainedRank uint64) { } t.lowestRetainedRank = lowestRetainedRank t.lock.Unlock() - - t.tracer.Trace("pruned timeout collectors") } diff --git a/consensus/timeoutcollector/timeout_processor.go b/consensus/timeoutcollector/timeout_processor.go index ed2eadc..ee75278 100644 --- a/consensus/timeoutcollector/timeout_processor.go +++ b/consensus/timeoutcollector/timeout_processor.go @@ -143,12 +143,6 @@ func (p *TimeoutProcessor[StateT, VoteT, PeerIDT]) Process( } if p.tcTracker.Done() { - p.tracer.Trace( - "timeout tracker done", - consensus.Uint64Param("processor_rank", p.rank), - consensus.Uint64Param("timeout_rank", timeout.Rank), - consensus.IdentityParam("timeout_voter", (*timeout.Vote).Identity()), - ) return nil } @@ -164,22 +158,9 @@ func (p *TimeoutProcessor[StateT, VoteT, PeerIDT]) Process( return fmt.Errorf("validating timeout failed: %w", err) } if p.tcTracker.Done() { - p.tracer.Trace( - "timeout tracker done", - consensus.Uint64Param("processor_rank", p.rank), - consensus.Uint64Param("timeout_rank", timeout.Rank), - consensus.IdentityParam("timeout_voter", (*timeout.Vote).Identity()), - ) return nil } - p.tracer.Trace( - "adding timeout to signature aggregator", - consensus.Uint64Param("processor_rank", p.rank), - consensus.Uint64Param("timeout_rank", timeout.Rank), - consensus.IdentityParam("timeout_voter", (*timeout.Vote).Identity()), - ) - // CAUTION: for correctness it is critical that we update the // `newestQCTracker` first, _before_ we add the TO's signature to // `sigAggregator`. Reasoning: @@ -245,10 +226,6 @@ func (p *TimeoutProcessor[StateT, VoteT, PeerIDT]) Process( // true only once. willBuildTC := p.tcTracker.Track(totalWeight) if !willBuildTC { - p.tracer.Trace( - "insufficient weight to build tc", - consensus.Uint64Param("total_weight", totalWeight), - ) // either we do not have enough timeouts to build a TC, or another thread // has already passed this gate and created a TC return nil diff --git a/node/consensus/app/app_consensus_engine.go b/node/consensus/app/app_consensus_engine.go index bc301de..5f02510 100644 --- a/node/consensus/app/app_consensus_engine.go +++ b/node/consensus/app/app_consensus_engine.go @@ -2274,6 +2274,7 @@ func (e *AppConsensusEngine) OnRankChange(oldRank uint64, newRank uint64) { commitments, err := e.livenessProvider.Collect( context.Background(), frame.Header.FrameNumber, + newRank, ) if err != nil { e.logger.Error("could not collect commitments", zap.Error(err)) diff --git a/node/consensus/app/consensus_liveness_provider.go b/node/consensus/app/consensus_liveness_provider.go index 7adecfc..632fffe 100644 --- a/node/consensus/app/consensus_liveness_provider.go +++ b/node/consensus/app/consensus_liveness_provider.go @@ -17,6 +17,7 @@ type AppLivenessProvider struct { func (p *AppLivenessProvider) Collect( ctx context.Context, frameNumber uint64, + rank uint64, ) (CollectedCommitments, error) { if p.engine.GetFrame() == nil { return CollectedCommitments{}, errors.Wrap( diff --git a/node/consensus/global/app_shard_cache.go b/node/consensus/global/app_shard_cache.go new file mode 100644 index 0000000..3279572 --- /dev/null +++ b/node/consensus/global/app_shard_cache.go @@ -0,0 +1,218 @@ +package global + +import ( + "encoding/binary" + "math/big" + "slices" + + "github.com/pkg/errors" + "source.quilibrium.com/quilibrium/monorepo/hypergraph" + "source.quilibrium.com/quilibrium/monorepo/protobufs" + thypergraph "source.quilibrium.com/quilibrium/monorepo/types/hypergraph" + "source.quilibrium.com/quilibrium/monorepo/types/store" + "source.quilibrium.com/quilibrium/monorepo/types/tries" +) + +type appShardCacheEntry struct { + info *protobufs.AppShardInfo + shardKey []byte +} + +func (entry *appShardCacheEntry) toResponse( + includeShardKey bool, +) *protobufs.AppShardInfo { + response := &protobufs.AppShardInfo{ + Prefix: entry.info.Prefix, + Size: entry.info.Size, + DataShards: entry.info.DataShards, + Commitment: entry.info.Commitment, + } + if includeShardKey { + response.ShardKey = entry.shardKey + } + return response +} + +func (e *GlobalConsensusEngine) rebuildAppShardCache( + rank uint64, +) error { + hg, ok := e.hypergraph.(*hypergraph.HypergraphCRDT) + if !ok { + return errors.New("hypergraph does not support caching") + } + + shards, err := e.shardsStore.RangeAppShards() + if err != nil { + return errors.Wrap(err, "range app shards") + } + + cache := make(map[string]*appShardCacheEntry, len(shards)) + for _, shard := range shards { + entry := e.buildAppShardCacheEntry(hg, shard) + if entry == nil { + continue + } + cache[appShardCacheKey(shard)] = entry + } + + e.appShardCacheMu.Lock() + e.appShardCache = cache + e.appShardCacheRank = rank + e.appShardCacheMu.Unlock() + + return nil +} + +func (e *GlobalConsensusEngine) buildAppShardCacheEntry( + hg *hypergraph.HypergraphCRDT, + shard store.ShardInfo, +) *appShardCacheEntry { + shardKey, ok := toTriesShardKey(shard) + if !ok { + return nil + } + + commitment, size, dataShards := e.computeAppShardMetadata( + hg, + shardKey, + shard, + ) + + info := &protobufs.AppShardInfo{ + Prefix: slices.Clone(shard.Path), + Size: size, + DataShards: dataShards, + Commitment: commitment, + } + + return &appShardCacheEntry{ + info: info, + shardKey: slices.Concat(shard.L1, shard.L2), + } +} + +func (e *GlobalConsensusEngine) computeAppShardMetadata( + hg *hypergraph.HypergraphCRDT, + shardKey tries.ShardKey, + shard store.ShardInfo, +) ([][]byte, []byte, uint64) { + fullPrefix := buildAppShardFullPrefix(shard) + phaseSets := []thypergraph.IdSet{ + hg.GetVertexAddsSet(shardKey), + hg.GetVertexRemovesSet(shardKey), + hg.GetHyperedgeAddsSet(shardKey), + hg.GetHyperedgeRemovesSet(shardKey), + } + + size := big.NewInt(0) + commitments := make([][]byte, 0, len(phaseSets)) + var dataShards uint64 + + for _, ps := range phaseSets { + commitment, shardCount, nodeSize := getNodeMetadata(ps, fullPrefix) + commitments = append(commitments, commitment) + if nodeSize != nil { + size = nodeSize + } + dataShards += shardCount + } + + return commitments, size.Bytes(), dataShards +} + +func (e *GlobalConsensusEngine) getAppShardInfoForShard( + hg *hypergraph.HypergraphCRDT, + shard store.ShardInfo, + includeShardKey bool, +) *protobufs.AppShardInfo { + key := appShardCacheKey(shard) + e.appShardCacheMu.RLock() + entry := e.appShardCache[key] + e.appShardCacheMu.RUnlock() + + if entry == nil { + entry = e.buildAppShardCacheEntry(hg, shard) + if entry != nil { + e.appShardCacheMu.Lock() + e.appShardCache[key] = entry + e.appShardCacheMu.Unlock() + } + } + + if entry == nil { + info := &protobufs.AppShardInfo{ + Prefix: shard.Path, + Commitment: [][]byte{ + make([]byte, 64), + make([]byte, 64), + make([]byte, 64), + make([]byte, 64), + }, + } + if includeShardKey { + info.ShardKey = slices.Concat(shard.L1, shard.L2) + } + return info + } + + return entry.toResponse(includeShardKey) +} + +func appShardCacheKey(shard store.ShardInfo) string { + buf := make([]byte, 0, len(shard.L1)+len(shard.L2)+4*len(shard.Path)) + buf = append(buf, shard.L1...) + buf = append(buf, shard.L2...) + + tmp := make([]byte, 4) + for _, p := range shard.Path { + binary.BigEndian.PutUint32(tmp, p) + buf = append(buf, tmp...) + } + + return string(buf) +} + +func buildAppShardFullPrefix(shard store.ShardInfo) []int { + base := tries.GetFullPath(shard.L2) + fullPrefix := make([]int, 0, len(base)+len(shard.Path)) + fullPrefix = append(fullPrefix, base...) + for _, p := range shard.Path { + fullPrefix = append(fullPrefix, int(p)) + } + return fullPrefix +} + +func getNodeMetadata( + ps thypergraph.IdSet, + fullPrefix []int, +) ([]byte, uint64, *big.Int) { + node, err := ps.GetTree().GetByPath(fullPrefix) + if err != nil { + return make([]byte, 64), 0, nil + } + + switch n := node.(type) { + case *tries.LazyVectorCommitmentBranchNode: + return slices.Clone(n.Commitment), + uint64(n.LeafCount), + new(big.Int).Set(n.Size) + case *tries.LazyVectorCommitmentLeafNode: + return slices.Clone(n.Commitment), 1, new(big.Int).Set(n.Size) + default: + return make([]byte, 64), 0, nil + } +} + +func toTriesShardKey(shard store.ShardInfo) (tries.ShardKey, bool) { + var l1 [3]byte + var l2 [32]byte + if len(shard.L1) != len(l1) || len(shard.L2) != len(l2) { + return tries.ShardKey{}, false + } + copy(l1[:], shard.L1) + copy(l2[:], shard.L2) + return tries.ShardKey{ + L1: l1, + L2: l2, + }, true +} diff --git a/node/consensus/global/consensus_leader_provider.go b/node/consensus/global/consensus_leader_provider.go index 75d9afe..07d1e11 100644 --- a/node/consensus/global/consensus_leader_provider.go +++ b/node/consensus/global/consensus_leader_provider.go @@ -145,7 +145,11 @@ func (p *GlobalLeaderProvider) ProveNextState( ) } - _, err = p.engine.livenessProvider.Collect(ctx, prior.Header.FrameNumber+1) + _, err = p.engine.livenessProvider.Collect( + ctx, + prior.Header.FrameNumber+1, + rank, + ) if err != nil { return nil, models.NewNoVoteErrorf("could not collect: %+w", err) } @@ -188,7 +192,7 @@ func (p *GlobalLeaderProvider) ProveNextState( } // Get current timestamp and difficulty - timestamp := time.Now().UnixMilli() + timestamp := time.Now().Add(10 * time.Second).UnixMilli() difficulty := p.engine.difficultyAdjuster.GetNextDifficulty( rank, timestamp, diff --git a/node/consensus/global/consensus_liveness_provider.go b/node/consensus/global/consensus_liveness_provider.go index 1d00932..faee23b 100644 --- a/node/consensus/global/consensus_liveness_provider.go +++ b/node/consensus/global/consensus_liveness_provider.go @@ -22,6 +22,7 @@ type GlobalLivenessProvider struct { func (p *GlobalLivenessProvider) Collect( ctx context.Context, frameNumber uint64, + rank uint64, ) (GlobalCollectedCommitments, error) { timer := prometheus.NewTimer(shardCommitmentCollectionDuration) defer timer.ObserveDuration() @@ -98,7 +99,6 @@ func (p *GlobalLivenessProvider) Collect( proverRoot := make([]byte, 64) - // TODO(2.1.1+): Refactor this with caching commitSet, err := p.engine.hypergraph.Commit(frameNumber) if err != nil { p.engine.logger.Error( @@ -109,6 +109,14 @@ func (p *GlobalLivenessProvider) Collect( } collected := 0 + if err := p.engine.rebuildAppShardCache(rank); err != nil { + p.engine.logger.Warn( + "could not rebuild app shard cache", + zap.Uint64("rank", rank), + zap.Error(err), + ) + } + // The poseidon hash's field is < 0x3fff...ffff, so we use the upper two bits // to fold the four hypergraph phase/sets into the three different tree // partitions the L1 key designates diff --git a/node/consensus/global/event_distributor.go b/node/consensus/global/event_distributor.go index d433531..5cd01b0 100644 --- a/node/consensus/global/event_distributor.go +++ b/node/consensus/global/event_distributor.go @@ -30,6 +30,7 @@ import ( typesconsensus "source.quilibrium.com/quilibrium/monorepo/types/consensus" "source.quilibrium.com/quilibrium/monorepo/types/crypto" "source.quilibrium.com/quilibrium/monorepo/types/schema" + "source.quilibrium.com/quilibrium/monorepo/types/store" ) func (e *GlobalConsensusEngine) eventDistributorLoop( @@ -385,12 +386,26 @@ func (e *GlobalConsensusEngine) evaluateForProposals( pendingFilters := [][]byte{} proposalDescriptors := []provers.ShardDescriptor{} decideDescriptors := []provers.ShardDescriptor{} - shards, err := e.shardsStore.RangeAppShards() + appShards, err := e.shardsStore.RangeAppShards() if err != nil { e.logger.Error("could not obtain app shard info", zap.Error(err)) return } + // consolidate into high level L2 shards: + shardMap := map[string]store.ShardInfo{} + for _, s := range appShards { + shardMap[string(s.L2)] = s + } + + shards := []store.ShardInfo{} + for _, s := range shardMap { + shards = append(shards, store.ShardInfo{ + L1: s.L1, + L2: s.L2, + }) + } + registry, err := e.keyStore.GetKeyRegistryByProver(data.Frame.Header.Prover) if err != nil { e.logger.Info( @@ -496,8 +511,6 @@ func (e *GlobalConsensusEngine) evaluateForProposals( resp, err := e.getAppShardsFromProver( client, slices.Concat(info.L1, info.L2), - info.Path, - data.Frame.Header.Prover, ) if err != nil { e.logger.Debug("could not get app shards from prover", zap.Error(err)) @@ -531,7 +544,7 @@ func (e *GlobalConsensusEngine) evaluateForProposals( if bytes.Equal(allocation.ConfirmationFilter, bp) { allocated = allocation.Status != 4 if e.config.P2P.Network != 0 || - data.Frame.Header.FrameNumber > 252840 { + data.Frame.Header.FrameNumber > 255840 { e.logger.Info( "checking pending status of allocation", zap.Int("status", int(allocation.Status)), @@ -824,8 +837,6 @@ func (e *GlobalConsensusEngine) publishKeyRegistry() { func (e *GlobalConsensusEngine) getAppShardsFromProver( client protobufs.GlobalServiceClient, shardKey []byte, - path []uint32, - prover []byte, ) ( *protobufs.GetAppShardsResponse, error, @@ -858,5 +869,4 @@ func (e *GlobalConsensusEngine) getAppShardsFromProver( } return response, nil - } diff --git a/node/consensus/global/global_consensus_engine.go b/node/consensus/global/global_consensus_engine.go index f2d267f..4b10985 100644 --- a/node/consensus/global/global_consensus_engine.go +++ b/node/consensus/global/global_consensus_engine.go @@ -175,6 +175,9 @@ type GlobalConsensusEngine struct { keyRegistryDigestCacheMu sync.Mutex peerAuthCache map[string]time.Time peerAuthCacheMu sync.RWMutex + appShardCache map[string]*appShardCacheEntry + appShardCacheMu sync.RWMutex + appShardCacheRank uint64 // Transaction cross-shard lock tracking txLockMap map[uint64]map[string]map[string]*LockedTransaction @@ -299,6 +302,7 @@ func NewGlobalConsensusEngine( peerAuthCache: make(map[string]time.Time), alertPublicKey: []byte{}, txLockMap: make(map[uint64]map[string]map[string]*LockedTransaction), + appShardCache: make(map[string]*appShardCacheEntry), } if config.Engine.AlertKey != "" { @@ -634,6 +638,16 @@ func NewGlobalConsensusEngine( if err != nil { return nil, err } + + if latest == nil { + if err := engine.rebuildAppShardCache(0); err != nil { + logger.Warn("could not prime app shard cache", zap.Error(err)) + } + } else { + if err := engine.rebuildAppShardCache(latest.FinalizedRank); err != nil { + logger.Warn("could not prime app shard cache", zap.Error(err)) + } + } engine.timeoutAggregator, err = voting.NewGlobalTimeoutAggregator[GlobalPeerID]( tracing.NewZapTracer(logger), engine, diff --git a/node/consensus/global/message_processors.go b/node/consensus/global/message_processors.go index 95ef00f..1d477ae 100644 --- a/node/consensus/global/message_processors.go +++ b/node/consensus/global/message_processors.go @@ -279,6 +279,11 @@ func (e *GlobalConsensusEngine) handleFrameMessage( return } + frame, err := e.globalTimeReel.GetHead() + if err == nil && frame != nil { + e.currentRank = frame.GetRank() + } + // Success metric recorded at the end of processing framesProcessedTotal.WithLabelValues("success").Inc() default: diff --git a/node/consensus/global/message_validation.go b/node/consensus/global/message_validation.go index cd6e4c0..d8ce9b4 100644 --- a/node/consensus/global/message_validation.go +++ b/node/consensus/global/message_validation.go @@ -452,7 +452,7 @@ func (e *GlobalConsensusEngine) validateFrameMessage( return tp2p.ValidationResultReject } - if frametime.GlobalFrameSince(frame) > 20*time.Second { + if e.currentRank > frame.GetRank()+2 { frameValidationTotal.WithLabelValues("ignore").Inc() return tp2p.ValidationResultIgnore } diff --git a/node/consensus/global/services.go b/node/consensus/global/services.go index 43e8db0..7a1c11b 100644 --- a/node/consensus/global/services.go +++ b/node/consensus/global/services.go @@ -14,11 +14,11 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "source.quilibrium.com/quilibrium/monorepo/hypergraph" qgrpc "source.quilibrium.com/quilibrium/monorepo/node/internal/grpc" "source.quilibrium.com/quilibrium/monorepo/node/rpc" "source.quilibrium.com/quilibrium/monorepo/protobufs" "source.quilibrium.com/quilibrium/monorepo/types/store" - "source.quilibrium.com/quilibrium/monorepo/types/tries" ) func (e *GlobalConsensusEngine) GetGlobalFrame( @@ -177,65 +177,15 @@ func (e *GlobalConsensusEngine) GetAppShards( Info: []*protobufs.AppShardInfo{}, } - fullPrefix := []uint32{} - for _, p := range tries.GetFullPath(req.ShardKey[3:]) { - fullPrefix = append(fullPrefix, uint32(p)) + hg, ok := e.hypergraph.(*hypergraph.HypergraphCRDT) + if !ok { + return nil, errors.New("hypergraph does not support caching") } - fullPrefix = slices.Concat(fullPrefix, req.Prefix) + includeShardKey := len(req.ShardKey) != 35 for _, shard := range shards { - size := big.NewInt(0) - commitment := [][]byte{} - dataShards := uint64(0) - for _, ps := range []protobufs.HypergraphPhaseSet{0, 1, 2, 3} { - c, err := e.hypergraph.GetChildrenForPath( - ctx, - &protobufs.GetChildrenForPathRequest{ - ShardKey: req.ShardKey, - Path: shard.Path, - PhaseSet: protobufs.HypergraphPhaseSet(ps), - }, - ) - if err != nil { - commitment = append(commitment, make([]byte, 64)) - continue - } - - if len(c.PathSegments) == 0 { - commitment = append(commitment, make([]byte, 64)) - continue - } - - branch, leaf := selectPathSegmentForPrefix(c.PathSegments, fullPrefix) - if branch == nil && leaf == nil { - commitment = append(commitment, make([]byte, 64)) - continue - } - - if branch != nil { - size = size.Add(size, new(big.Int).SetBytes(branch.Size)) - commitment = append(commitment, branch.Commitment) - dataShards += branch.LeafCount - continue - } - - size = size.Add(size, new(big.Int).SetBytes(leaf.Size)) - commitment = append(commitment, leaf.Commitment) - dataShards += 1 - } - - shardKey := []byte{} - if len(req.ShardKey) != 35 { - shardKey = slices.Concat(shard.L1, shard.L2) - } - - response.Info = append(response.Info, &protobufs.AppShardInfo{ - Prefix: shard.Path, - Size: size.Bytes(), - Commitment: commitment, - DataShards: dataShards, - ShardKey: shardKey, - }) + info := e.getAppShardInfoForShard(hg, shard, includeShardKey) + response.Info = append(response.Info, info) } return response, nil diff --git a/node/execution/engines/global_execution_engine_test.go b/node/execution/engines/global_execution_engine_test.go index 6a15e4a..9bb249a 100644 --- a/node/execution/engines/global_execution_engine_test.go +++ b/node/execution/engines/global_execution_engine_test.go @@ -397,7 +397,7 @@ func TestGlobalExecutionEngine_AllOperationTypes(t *testing.T) { // Process message state := hgstate.NewHypergraphState(mockHG) - responses, err := engine.ProcessMessage(252840, big.NewInt(1), intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], msg.Payload, state) + responses, err := engine.ProcessMessage(255840, big.NewInt(1), intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], msg.Payload, state) assert.NoError(t, err) assert.NotNil(t, responses) diff --git a/node/execution/intrinsics/global/global_intrinsic_rbls48581_integration_test.go b/node/execution/intrinsics/global/global_intrinsic_rbls48581_integration_test.go index 3d61783..a4cea97 100644 --- a/node/execution/intrinsics/global/global_intrinsic_rbls48581_integration_test.go +++ b/node/execution/intrinsics/global/global_intrinsic_rbls48581_integration_test.go @@ -356,7 +356,7 @@ func TestGlobalProverOperations_Integration(t *testing.T) { err = rm.Set(global.GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], "allocation:ProverAllocation", "Status", []byte{0}, allocationTree) // joining require.NoError(t, err) joinFrameBytes := make([]byte, 8) - binary.BigEndian.PutUint64(joinFrameBytes, 252840) + binary.BigEndian.PutUint64(joinFrameBytes, 255840) err = rm.Set(global.GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], "allocation:ProverAllocation", "JoinFrameNumber", joinFrameBytes, allocationTree) require.NoError(t, err) @@ -373,8 +373,8 @@ func TestGlobalProverOperations_Integration(t *testing.T) { hg.SetVertexData(txn, [64]byte(slices.Concat(intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], allocationAddress)), allocationTree) txn.Commit() - // Try to confirm at frame 252840 + 360 - confirmFrame := uint64(252840 + 360) + // Try to confirm at frame 255840 + 360 + confirmFrame := uint64(255840 + 360) proverConfirm, err := global.NewProverConfirm(filter, confirmFrame, keyManager, hg, rm) require.NoError(t, err) diff --git a/node/execution/intrinsics/global/global_intrinsic_state_machine_integration_test.go b/node/execution/intrinsics/global/global_intrinsic_state_machine_integration_test.go index 6d1ac55..b92eaea 100644 --- a/node/execution/intrinsics/global/global_intrinsic_state_machine_integration_test.go +++ b/node/execution/intrinsics/global/global_intrinsic_state_machine_integration_test.go @@ -115,9 +115,9 @@ func TestProverJoinConfirmFlow(t *testing.T) { pebbleDB := store.NewPebbleDB(zap.L(), &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/global"}, 0) frameStore := store.NewPebbleClockStore(pebbleDB, zap.L()) - // Test 1: Join at frame 252840 + // Test 1: Join at frame 255840 t.Run("Join and confirm after 360 frames", func(t *testing.T) { - joinFrame := uint64(252840) + joinFrame := uint64(255840) // Create and prove join proverJoin, err := global.NewProverJoin([][]byte{filter}, joinFrame, nil, nil, keyManager, hg, rdfMultiprover, vdf.NewWesolowskiFrameProver(zap.L()), frameStore) @@ -171,7 +171,7 @@ func TestProverJoinRejectFlow(t *testing.T) { frameStore := store.NewPebbleClockStore(pebbleDB, zap.L()) t.Run("Join and reject immediately", func(t *testing.T) { - joinFrame := uint64(252840) + joinFrame := uint64(255840) // Create and prove join proverJoin, err := global.NewProverJoin([][]byte{filter}, joinFrame, nil, nil, keyManager, hg, rdfMultiprover, vdf.NewWesolowskiFrameProver(zap.L()), frameStore) @@ -213,7 +213,7 @@ func TestProverPauseResumeFlow(t *testing.T) { frameStore := store.NewPebbleClockStore(pebbleDB, zap.L()) // First join and confirm to get to active state - joinFrame := uint64(252840) + joinFrame := uint64(255840) proverJoin, err := global.NewProverJoin([][]byte{filter}, joinFrame, nil, nil, keyManager, hg, rdfMultiprover, vdf.NewWesolowskiFrameProver(zap.L()), frameStore) require.NoError(t, err) err = proverJoin.Prove(joinFrame) @@ -275,7 +275,7 @@ func TestProverLeaveFlow(t *testing.T) { frameStore := store.NewPebbleClockStore(pebbleDB, zap.L()) // First join and confirm to get to active state - joinFrame := uint64(252840) + joinFrame := uint64(255840) proverJoin, err := global.NewProverJoin([][]byte{filter}, joinFrame, nil, nil, keyManager, hg, rdfMultiprover, vdf.NewWesolowskiFrameProver(zap.L()), frameStore) require.NoError(t, err) err = proverJoin.Prove(joinFrame) @@ -348,7 +348,7 @@ func TestProverLeaveRejectFlow(t *testing.T) { frameStore := store.NewPebbleClockStore(pebbleDB, zap.L()) // First join and confirm to get to active state - joinFrame := uint64(252840) + joinFrame := uint64(255840) proverJoin, err := global.NewProverJoin([][]byte{filter}, joinFrame, nil, nil, keyManager, hg, rdfMultiprover, vdf.NewWesolowskiFrameProver(zap.L()), frameStore) require.NoError(t, err) err = proverJoin.Prove(joinFrame) @@ -401,7 +401,7 @@ func TestProverTimingEdgeCases(t *testing.T) { pebbleDB := store.NewPebbleDB(zap.L(), &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/global"}, 0) frameStore := store.NewPebbleClockStore(pebbleDB, zap.L()) - t.Run("Join before 252840 with special confirmation rules", func(t *testing.T) { + t.Run("Join before 255840 with special confirmation rules", func(t *testing.T) { joinFrame := uint64(252000) // Create and prove join @@ -413,7 +413,7 @@ func TestProverTimingEdgeCases(t *testing.T) { // Materialize join state = materializeAndCommit(t, proverJoin, joinFrame, state, hg) - // Try to confirm before frame 252840 (should fail) + // Try to confirm before frame 255840 (should fail) confirmFrame := uint64(252839) proverConfirm, err := global.NewProverConfirm(filter, confirmFrame, keyManager, hg, rdfMultiprover) require.NoError(t, err) @@ -423,10 +423,10 @@ func TestProverTimingEdgeCases(t *testing.T) { valid, err := proverConfirm.Verify(confirmFrame) assert.False(t, valid) assert.Error(t, err) - assert.Contains(t, err.Error(), "cannot confirm before frame 252840") + assert.Contains(t, err.Error(), "cannot confirm before frame 255840") - // Confirm at frame 252840 (should succeed even though less than 360 frames) - confirmFrame = uint64(252840) + // Confirm at frame 255840 (should succeed even though less than 360 frames) + confirmFrame = uint64(255840) proverConfirm, err = global.NewProverConfirm(filter, confirmFrame, keyManager, hg, rdfMultiprover) require.NoError(t, err) err = proverConfirm.Prove(confirmFrame) @@ -439,7 +439,7 @@ func TestProverTimingEdgeCases(t *testing.T) { t.Run("Pause timeout causes implicit leave", func(t *testing.T) { // First get to active state - joinFrame := uint64(252840) + joinFrame := uint64(255840) proverJoin, err := global.NewProverJoin([][]byte{filter}, joinFrame, nil, nil, keyManager, hg, rdfMultiprover, vdf.NewWesolowskiFrameProver(zap.L()), frameStore) require.NoError(t, err) err = proverJoin.Prove(joinFrame) @@ -483,7 +483,7 @@ func TestProverInvalidStateTransitions(t *testing.T) { frameStore := store.NewPebbleClockStore(pebbleDB, zap.L()) // Join first - joinFrame := uint64(252840) + joinFrame := uint64(255840) proverJoin, err := global.NewProverJoin([][]byte{filter}, joinFrame, nil, nil, keyManager, hg, rdfMultiprover, vdf.NewWesolowskiFrameProver(zap.L()), frameStore) require.NoError(t, err) err = proverJoin.Prove(joinFrame) diff --git a/node/execution/intrinsics/global/global_prover_confirm.go b/node/execution/intrinsics/global/global_prover_confirm.go index 0cc99c6..fd2abe4 100644 --- a/node/execution/intrinsics/global/global_prover_confirm.go +++ b/node/execution/intrinsics/global/global_prover_confirm.go @@ -498,16 +498,16 @@ func (p *ProverConfirm) Verify(frameNumber uint64) (bool, error) { joinFrame := binary.BigEndian.Uint64(joinFrameBytes) // Check timing constraints - if joinFrame < 252840 && joinFrame >= 244100 { - if frameNumber < 252840 { - // If joined before frame 252840, cannot confirm until frame 252840 + if joinFrame < 255840 && joinFrame >= 244100 { + if frameNumber < 255840 { + // If joined before frame 255840, cannot confirm until frame 255840 return false, errors.Wrap( - errors.New("cannot confirm before frame 252840"), + errors.New("cannot confirm before frame 255840"), "verify", ) } - // Set this to either 252840 - 360 or the raw join frame if higher than it + // Set this to either 255840 - 360 or the raw join frame if higher than it // so the provers before can immeidately join after the wait, those after // still have the full 360. if joinFrame < 252480 { @@ -515,8 +515,8 @@ func (p *ProverConfirm) Verify(frameNumber uint64) (bool, error) { } } - // For joins before 252840, once we reach frame 252840, they can confirm - // immediately, for joins after 252840, normal 360 frame wait applies. + // For joins before 255840, once we reach frame 255840, they can confirm + // immediately, for joins after 255840, normal 360 frame wait applies. // If the join frame precedes the genesis frame (e.g. not mainnet), we // ignore the topic altogether if joinFrame >= 252480 || joinFrame <= 244100 { diff --git a/node/execution/intrinsics/global/global_prover_confirm_test.go b/node/execution/intrinsics/global/global_prover_confirm_test.go index 71290a1..4293965 100644 --- a/node/execution/intrinsics/global/global_prover_confirm_test.go +++ b/node/execution/intrinsics/global/global_prover_confirm_test.go @@ -147,7 +147,7 @@ func TestProverConfirm_Verify(t *testing.T) { // Test data filter := []byte("testfilter") - joinFrame := uint64(252840) + joinFrame := uint64(255840) confirmFrame := joinFrame + 360 address := make([]byte, 32) for i := range address { @@ -219,7 +219,7 @@ func TestProverConfirm_Verify(t *testing.T) { // Test data filter := []byte("testfilter") - joinFrame := uint64(252840) + joinFrame := uint64(255840) confirmFrame := joinFrame + 359 // Too early address := make([]byte, 32) for i := range address { @@ -260,7 +260,7 @@ func TestProverConfirm_Verify(t *testing.T) { assert.False(t, valid) }) - t.Run("Cannot confirm join before frame 252840", func(t *testing.T) { + t.Run("Cannot confirm join before frame 255840", func(t *testing.T) { // Setup mockKeyManager := new(mocks.MockKeyManager) mockHypergraph := new(mocks.MockHypergraph) @@ -269,7 +269,7 @@ func TestProverConfirm_Verify(t *testing.T) { // Test data filter := []byte("testfilter") joinFrame := uint64(252000) - confirmFrame := uint64(252839) // Before 252840 + confirmFrame := uint64(252839) // Before 255840 address := make([]byte, 32) for i := range address { address[i] = byte(i % 256) @@ -305,7 +305,7 @@ func TestProverConfirm_Verify(t *testing.T) { // Call the verify function valid, err := proverConfirm.Verify(confirmFrame) require.Error(t, err) - assert.Contains(t, err.Error(), "cannot confirm before frame 252840") + assert.Contains(t, err.Error(), "cannot confirm before frame 255840") assert.False(t, valid) }) diff --git a/node/execution/intrinsics/global/global_prover_join.go b/node/execution/intrinsics/global/global_prover_join.go index b4b0c51..d9372df 100644 --- a/node/execution/intrinsics/global/global_prover_join.go +++ b/node/execution/intrinsics/global/global_prover_join.go @@ -759,36 +759,74 @@ func (p *ProverJoin) Verify(frameNumber uint64) (valid bool, err error) { } } - // Create composite address: GLOBAL_INTRINSIC_ADDRESS + prover address - fullAddress := [64]byte{} - copy(fullAddress[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:]) - copy(fullAddress[32:], address) - // Get the existing prover vertex data - vertexData, err := p.hypergraph.GetVertexData(fullAddress) - if err == nil && vertexData != nil { - // Prover exists, check if they're in left state (4) - tree := vertexData - - // Check if prover is in left state (4) - statusData, err := p.rdfMultiprover.Get( + proverAddress := [64]byte{} + copy(proverAddress[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:]) + copy(proverAddress[32:], address) + proverVertexData, err := p.hypergraph.GetVertexData(proverAddress) + if err == nil && proverVertexData != nil { + tree := proverVertexData + kickedFrame, err := p.rdfMultiprover.Get( GLOBAL_RDF_SCHEMA, - "prover:Prover", - "Status", + "allocation:ProverAllocation", + "KickFrameNumber", tree, ) - if err == nil && len(statusData) > 0 { - status := statusData[0] - if status != 4 { - // Prover is in some other state - cannot join + if err == nil && len(kickedFrame) == 8 { + kickedFrame := binary.BigEndian.Uint64(kickedFrame) + if kickedFrame != 0 { + // Prover has been kicked for malicious behavior return false, errors.Wrap( - errors.New("prover already exists in non-left state"), + errors.New("prover has been previously kicked"), "verify", ) } } } + for _, f := range p.Filters { + allocationAddressBI, err := poseidon.HashBytes( + slices.Concat( + []byte("PROVER_ALLOCATION"), + p.PublicKeySignatureBLS48581.PublicKey, + f, + ), + ) + if err != nil { + return false, errors.Wrap(err, "verify") + } + allocationAddress := allocationAddressBI.FillBytes(make([]byte, 32)) + // Create composite address: GLOBAL_INTRINSIC_ADDRESS + prover address + fullAddress := [64]byte{} + copy(fullAddress[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:]) + copy(fullAddress[32:], allocationAddress) + + // Get the existing prover allocation vertex data + vertexData, err := p.hypergraph.GetVertexData(fullAddress) + if err == nil && vertexData != nil { + // Prover exists, check if they're in left state (4) + tree := vertexData + + // Check if prover is in left state (4) + statusData, err := p.rdfMultiprover.Get( + GLOBAL_RDF_SCHEMA, + "allocation:ProverAllocation", + "Status", + tree, + ) + if err == nil && len(statusData) > 0 { + status := statusData[0] + if status != 4 { + // Prover is in some other state - cannot join + return false, errors.Wrap( + errors.New("prover already exists in non-left state"), + "verify", + ) + } + } + } + } + // If we get here, either prover doesn't exist or is in left state - both are // valid diff --git a/node/execution/intrinsics/global/global_prover_reject.go b/node/execution/intrinsics/global/global_prover_reject.go index e9f5e21..94ba17c 100644 --- a/node/execution/intrinsics/global/global_prover_reject.go +++ b/node/execution/intrinsics/global/global_prover_reject.go @@ -475,8 +475,8 @@ func (p *ProverReject) Verify(frameNumber uint64) (bool, error) { } joinFrame := binary.BigEndian.Uint64(joinFrameBytes) - // Special case: if join was before frame 252840, can reject any time - if joinFrame >= 252840 { + // Special case: if join was before frame 255840, can reject any time + if joinFrame >= 255840 { // Otherwise same timing constraints as confirm framesSinceJoin := frameNumber - joinFrame if framesSinceJoin > 720 { diff --git a/node/execution/intrinsics/global/global_prover_reject_test.go b/node/execution/intrinsics/global/global_prover_reject_test.go index 5959dcb..dd2c053 100644 --- a/node/execution/intrinsics/global/global_prover_reject_test.go +++ b/node/execution/intrinsics/global/global_prover_reject_test.go @@ -84,7 +84,7 @@ func TestProverReject_Verify(t *testing.T) { // Test data filter := []byte("testfilter") - joinFrame := uint64(252840) + joinFrame := uint64(255840) rejectFrame := joinFrame + 100 // Before confirmation window address := make([]byte, 32) for i := range address { @@ -321,7 +321,7 @@ func TestProverReject_Verify(t *testing.T) { // Test data filter := []byte("testfilter") - joinFrame := uint64(252840) + joinFrame := uint64(255840) rejectFrame := joinFrame + 100 address := make([]byte, 32) for i := range address { diff --git a/node/execution/intrinsics/token/token_intrinsic_transaction.go b/node/execution/intrinsics/token/token_intrinsic_transaction.go index 2835566..3a8aa89 100644 --- a/node/execution/intrinsics/token/token_intrinsic_transaction.go +++ b/node/execution/intrinsics/token/token_intrinsic_transaction.go @@ -23,7 +23,7 @@ import ( ) const FRAME_2_1_CUTOVER = 244200 -const FRAME_2_1_EXTENDED_ENROLL_END = 252840 +const FRAME_2_1_EXTENDED_ENROLL_END = 255840 const FRAME_2_1_EXTENDED_ENROLL_CONFIRM_END = FRAME_2_1_EXTENDED_ENROLL_END + 360 // used to skip frame-based checks, for tests diff --git a/node/store/pebble.go b/node/store/pebble.go index a67e510..608d799 100644 --- a/node/store/pebble.go +++ b/node/store/pebble.go @@ -26,6 +26,7 @@ var pebbleMigrations = []func(*pebble.Batch) error{ migration_2_1_0_5, migration_2_1_0_8, migration_2_1_0_81, + migration_2_1_0_10, } func NewPebbleDB( @@ -457,3 +458,9 @@ func migration_2_1_0_81(b *pebble.Batch) error { // nodes are consistent return nil } + +func migration_2_1_0_10(b *pebble.Batch) error { + // these migration entries exist solely to advance migration number so all + // nodes are consistent + return nil +}