diff --git a/Cargo.lock b/Cargo.lock index f886603..c7a415e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -307,7 +307,7 @@ checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae" [[package]] name = "bls48581" -version = "0.1.0" +version = "2.1.0" dependencies = [ "criterion 0.4.0", "hex 0.4.3", diff --git a/config/version.go b/config/version.go index c2e15f3..fc36a50 100644 --- a/config/version.go +++ b/config/version.go @@ -43,7 +43,7 @@ func FormatVersion(version []byte) string { } func GetPatchNumber() byte { - return 0x10 + return 0x11 } func GetRCNumber() byte { diff --git a/hypergraph/go.mod b/hypergraph/go.mod index 42815be..d913c92 100644 --- a/hypergraph/go.mod +++ b/hypergraph/go.mod @@ -24,7 +24,6 @@ replace source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub => ../go- require ( github.com/prometheus/client_golang v1.22.0 - google.golang.org/grpc v1.72.0 source.quilibrium.com/quilibrium/monorepo/protobufs v0.0.0-00010101000000-000000000000 source.quilibrium.com/quilibrium/monorepo/types v0.0.0-00010101000000-000000000000 source.quilibrium.com/quilibrium/monorepo/utils v0.0.0-00010101000000-000000000000 @@ -70,7 +69,7 @@ require ( golang.org/x/text v0.26.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb // indirect - + google.golang.org/grpc v1.72.0 // indirect google.golang.org/protobuf v1.36.6 // indirect lukechampine.com/blake3 v1.4.1 // indirect ) diff --git a/hypergraph/hypergraph.go b/hypergraph/hypergraph.go index 7e64dd3..1973ec2 100644 --- a/hypergraph/hypergraph.go +++ b/hypergraph/hypergraph.go @@ -762,3 +762,105 @@ func boolToString(b bool) string { } return "false" } + +// DeleteVertexAdd performs a hard delete of a vertex from the VertexAdds set. +// Unlike RemoveVertex (which adds to VertexRemoves for CRDT semantics), this +// actually removes the entry from VertexAdds and deletes the associated vertex +// data. This is used for pruning stale/orphaned data that should not be synced. +// +// The caller must provide a transaction for atomic deletion of both the tree +// entry and the vertex data. +func (hg *HypergraphCRDT) DeleteVertexAdd( + txn tries.TreeBackingStoreTransaction, + shardKey tries.ShardKey, + vertexID [64]byte, +) error { + hg.mu.Lock() + defer hg.mu.Unlock() + + set := hg.getVertexAddsSet(shardKey) + tree := set.GetTree() + + // Delete from the tree (removes tree node and path index) + if err := tree.Delete(txn, vertexID[:]); err != nil { + return errors.Wrap(err, "delete vertex add: tree delete") + } + + // Delete the vertex data + if err := tree.Store.DeleteVertexTree(txn, vertexID[:]); err != nil { + // Log but don't fail - vertex data may already be gone + hg.logger.Debug( + "delete vertex add: vertex data not found", + zap.String("vertex_id", string(vertexID[:])), + zap.Error(err), + ) + } + + return nil +} + +// DeleteVertexRemove performs a hard delete of a vertex from the VertexRemoves +// set. This removes the entry from VertexRemoves, effectively "un-removing" the +// vertex if it still exists in VertexAdds. This is used for pruning stale data. +func (hg *HypergraphCRDT) DeleteVertexRemove( + txn tries.TreeBackingStoreTransaction, + shardKey tries.ShardKey, + vertexID [64]byte, +) error { + hg.mu.Lock() + defer hg.mu.Unlock() + + set := hg.getVertexRemovesSet(shardKey) + tree := set.GetTree() + + // Delete from the tree + if err := tree.Delete(txn, vertexID[:]); err != nil { + return errors.Wrap(err, "delete vertex remove: tree delete") + } + + return nil +} + +// DeleteHyperedgeAdd performs a hard delete of a hyperedge from the +// HyperedgeAdds set. Unlike RemoveHyperedge (which adds to HyperedgeRemoves for +// CRDT semantics), this actually removes the entry from HyperedgeAdds. This is +// used for pruning stale/orphaned data. +func (hg *HypergraphCRDT) DeleteHyperedgeAdd( + txn tries.TreeBackingStoreTransaction, + shardKey tries.ShardKey, + hyperedgeID [64]byte, +) error { + hg.mu.Lock() + defer hg.mu.Unlock() + + set := hg.getHyperedgeAddsSet(shardKey) + tree := set.GetTree() + + // Delete from the tree + if err := tree.Delete(txn, hyperedgeID[:]); err != nil { + return errors.Wrap(err, "delete hyperedge add: tree delete") + } + + return nil +} + +// DeleteHyperedgeRemove performs a hard delete of a hyperedge from the +// HyperedgeRemoves set. This is used for pruning stale data. +func (hg *HypergraphCRDT) DeleteHyperedgeRemove( + txn tries.TreeBackingStoreTransaction, + shardKey tries.ShardKey, + hyperedgeID [64]byte, +) error { + hg.mu.Lock() + defer hg.mu.Unlock() + + set := hg.getHyperedgeRemovesSet(shardKey) + tree := set.GetTree() + + // Delete from the tree + if err := tree.Delete(txn, hyperedgeID[:]); err != nil { + return errors.Wrap(err, "delete hyperedge remove: tree delete") + } + + return nil +} diff --git a/hypergraph/snapshot_manager.go b/hypergraph/snapshot_manager.go index 3ecd191..90a2f6e 100644 --- a/hypergraph/snapshot_manager.go +++ b/hypergraph/snapshot_manager.go @@ -175,11 +175,17 @@ func (m *snapshotManager) publish(root []byte) { m.mu.Lock() defer m.mu.Unlock() + // Remove all handles from the map so new syncs get new handles. + // Handles with active refs will be released when their last user calls release(). + // Handles with no active refs (only the initial ref from creation) are released now. for key, handle := range m.handles { + delete(m.handles, key) if handle != nil { + // releaseRef decrements the ref count. If this was the last ref + // (i.e., no active sync sessions), the underlying DB is released. + // If there are active sync sessions, they will release it when done. handle.releaseRef(m.logger) } - delete(m.handles, key) } m.root = nil @@ -221,6 +227,11 @@ func (m *snapshotManager) acquire( } handle := newSnapshotHandle(key, storeSnapshot, release, m.root) + // Acquire a ref for the caller. The handle is created with refs=1 (the owner ref + // held by the snapshot manager), and this adds another ref for the sync session. + // This ensures publish() can release the owner ref without closing the DB while + // a sync is still using it. + handle.acquire() m.handles[key] = handle return handle } diff --git a/hypergraph/sync.go b/hypergraph/sync.go index 4dec1cf..3f659a4 100644 --- a/hypergraph/sync.go +++ b/hypergraph/sync.go @@ -451,7 +451,7 @@ func getChildSegments( prefix, ) if err != nil && !strings.Contains(err.Error(), "item not found") { - panic(err) + return nodes, 0 } if isPrefix(prefix, path) { @@ -1345,7 +1345,7 @@ func getNodeAtPath( slices.Concat(n.FullPrefix, []int{int(childIndex)}), ) if err != nil && !strings.Contains(err.Error(), "item not found") { - logger.Panic("failed to get node by path", zap.Error(err)) + return nil } if child == nil { @@ -1425,7 +1425,7 @@ func getBranchInfoFromTree( slices.Concat(branch.FullPrefix, []int{i}), ) if err != nil && !strings.Contains(err.Error(), "item not found") { - logger.Panic("failed to get node by path", zap.Error(err)) + return nil, err } } @@ -1495,7 +1495,7 @@ func ensureCommittedNode( path, ) if err != nil && !strings.Contains(err.Error(), "item not found") { - logger.Panic("failed to reload node by path", zap.Error(err)) + return nil } if reloaded != nil { return reloaded diff --git a/node/consensus/app/app_consensus_engine.go b/node/consensus/app/app_consensus_engine.go index 0c400ab..923c9da 100644 --- a/node/consensus/app/app_consensus_engine.go +++ b/node/consensus/app/app_consensus_engine.go @@ -155,6 +155,10 @@ type AppConsensusEngine struct { globalProverRootSynced atomic.Bool globalProverSyncInProgress atomic.Bool + // Genesis initialization + genesisInitialized atomic.Bool + genesisInitChan chan *protobufs.GlobalFrame + // Message queues consensusMessageQueue chan *pb.Message proverMessageQueue chan *pb.Message @@ -294,6 +298,7 @@ func NewAppConsensusEngine( peerAuthCache: make(map[string]time.Time), peerInfoDigestCache: make(map[string]struct{}), keyRegistryDigestCache: make(map[string]struct{}), + genesisInitChan: make(chan *protobufs.GlobalFrame, 1), } engine.frameChainChecker = NewAppFrameChainChecker(clockStore, logger, appAddress) @@ -504,7 +509,10 @@ func NewAppConsensusEngine( *protobufs.AppShardFrame, *protobufs.ProposalVote, ] - initializeCertifiedGenesis := func() { + + // Check if we need to await network data for genesis initialization + needsNetworkGenesis := false + initializeCertifiedGenesis := func(markInitialized bool) { frame, qc := engine.initializeGenesis() state = &models.CertifiedState[*protobufs.AppShardFrame]{ State: &models.State[*protobufs.AppShardFrame]{ @@ -515,17 +523,72 @@ func NewAppConsensusEngine( CertifyingQuorumCertificate: qc, } pending = nil + if markInitialized { + engine.genesisInitialized.Store(true) + } + } + + initializeCertifiedGenesisFromNetwork := func( + difficulty uint32, + shardInfo []*protobufs.AppShardInfo, + ) { + // Delete the temporary genesis frame first + if err := engine.clockStore.DeleteShardClockFrameRange( + engine.appAddress, 0, 1, + ); err != nil { + logger.Debug( + "could not delete temporary genesis frame", + zap.Error(err), + ) + } + + frame, qc := engine.initializeGenesisWithParams(difficulty, shardInfo) + state = &models.CertifiedState[*protobufs.AppShardFrame]{ + State: &models.State[*protobufs.AppShardFrame]{ + Rank: 0, + Identifier: frame.Identity(), + State: &frame, + }, + CertifyingQuorumCertificate: qc, + } + pending = nil + engine.genesisInitialized.Store(true) + + logger.Info( + "initialized genesis with network data", + zap.Uint32("difficulty", difficulty), + zap.Int("shard_info_count", len(shardInfo)), + ) } if err != nil { - initializeCertifiedGenesis() + // No consensus state exists - check if we have a genesis frame already + _, _, genesisErr := engine.clockStore.GetShardClockFrame( + engine.appAddress, + 0, + false, + ) + if genesisErr != nil && errors.Is(genesisErr, store.ErrNotFound) { + // No genesis exists - we need to await network data + needsNetworkGenesis = true + logger.Warn( + "app genesis missing - will await network data", + zap.String("shard_address", hex.EncodeToString(appAddress)), + ) + // Initialize with default values for now + // This will be re-done after receiving network data + // Pass false to NOT mark as initialized - we're waiting for network data + initializeCertifiedGenesis(false) + } else { + initializeCertifiedGenesis(true) + } } else { qc, err := engine.clockStore.GetQuorumCertificate( engine.appAddress, latest.FinalizedRank, ) if err != nil || qc.GetFrameNumber() == 0 { - initializeCertifiedGenesis() + initializeCertifiedGenesis(true) } else { frame, _, err := engine.clockStore.GetShardClockFrame( engine.appAddress, @@ -535,8 +598,10 @@ func NewAppConsensusEngine( if err != nil { panic(err) } - parentFrame, err := engine.clockStore.GetGlobalClockFrame( - qc.GetFrameNumber() - 1, + parentFrame, _, err := engine.clockStore.GetShardClockFrame( + engine.appAddress, + qc.GetFrameNumber()-1, + false, ) if err != nil { panic(err) @@ -656,6 +721,48 @@ func NewAppConsensusEngine( ctx lifecycle.SignalerContext, ready lifecycle.ReadyFunc, ) { + // If we need network genesis, await it before starting consensus + if needsNetworkGenesis { + engine.logger.Info( + "awaiting network data for genesis initialization", + zap.String("shard_address", engine.appAddressHex), + ) + + // Wait for a global frame from pubsub + globalFrame, err := engine.awaitFirstGlobalFrame(ctx) + if err != nil { + engine.logger.Error( + "failed to await global frame for genesis", + zap.Error(err), + ) + ctx.Throw(err) + return + } + + // Fetch shard info from bootstrap peers + shardInfo, err := engine.fetchShardInfoFromBootstrap(ctx) + if err != nil { + engine.logger.Warn( + "failed to fetch shard info from bootstrap peers", + zap.Error(err), + ) + // Continue anyway - we at least have the global frame + } + + engine.logger.Info( + "received network genesis data", + zap.Uint64("global_frame_number", globalFrame.Header.FrameNumber), + zap.Uint32("difficulty", globalFrame.Header.Difficulty), + zap.Int("shard_info_count", len(shardInfo)), + ) + + // Re-initialize genesis with the correct network data + initializeCertifiedGenesisFromNetwork( + globalFrame.Header.Difficulty, + shardInfo, + ) + } + if err := engine.waitForProverRegistration(ctx); err != nil { engine.logger.Error("prover unavailable", zap.Error(err)) ctx.Throw(err) @@ -1401,6 +1508,16 @@ func (e *AppConsensusEngine) updateMetricsLoop( func (e *AppConsensusEngine) initializeGenesis() ( *protobufs.AppShardFrame, *protobufs.QuorumCertificate, +) { + return e.initializeGenesisWithParams(e.config.Engine.Difficulty, nil) +} + +func (e *AppConsensusEngine) initializeGenesisWithParams( + difficulty uint32, + shardInfo []*protobufs.AppShardInfo, +) ( + *protobufs.AppShardFrame, + *protobufs.QuorumCertificate, ) { // Initialize state roots for hypergraph stateRoots := make([][]byte, 4) @@ -1408,9 +1525,14 @@ func (e *AppConsensusEngine) initializeGenesis() ( stateRoots[i] = make([]byte, 64) } + // Use provided difficulty or fall back to config + if difficulty == 0 { + difficulty = e.config.Engine.Difficulty + } + genesisHeader, err := e.frameProver.ProveFrameHeaderGenesis( e.appAddress, - 80000, + difficulty, make([]byte, 516), 100, ) @@ -1560,6 +1682,127 @@ func (e *AppConsensusEngine) ensureAppGenesis() error { return nil } +// fetchShardInfoFromBootstrap connects to bootstrap peers and fetches shard info +// for this app's address using the GetAppShards RPC. +func (e *AppConsensusEngine) fetchShardInfoFromBootstrap( + ctx context.Context, +) ([]*protobufs.AppShardInfo, error) { + bootstrapPeers := e.config.P2P.BootstrapPeers + if len(bootstrapPeers) == 0 { + return nil, errors.New("no bootstrap peers configured") + } + + for _, peerAddr := range bootstrapPeers { + shardInfo, err := e.tryFetchShardInfoFromPeer(ctx, peerAddr) + if err != nil { + e.logger.Debug( + "failed to fetch shard info from peer", + zap.String("peer", peerAddr), + zap.Error(err), + ) + continue + } + if len(shardInfo) > 0 { + e.logger.Info( + "fetched shard info from bootstrap peer", + zap.String("peer", peerAddr), + zap.Int("shard_count", len(shardInfo)), + ) + return shardInfo, nil + } + } + + return nil, errors.New("failed to fetch shard info from any bootstrap peer") +} + +func (e *AppConsensusEngine) tryFetchShardInfoFromPeer( + ctx context.Context, + peerAddr string, +) ([]*protobufs.AppShardInfo, error) { + // Parse multiaddr to extract peer ID and address + ma, err := multiaddr.StringCast(peerAddr) + if err != nil { + return nil, errors.Wrap(err, "parse multiaddr") + } + + // Extract peer ID from the multiaddr + peerIDStr, err := ma.ValueForProtocol(multiaddr.P_P2P) + if err != nil { + return nil, errors.Wrap(err, "extract peer id") + } + + peerID, err := peer.Decode(peerIDStr) + if err != nil { + return nil, errors.Wrap(err, "decode peer id") + } + + // Create gRPC connection to the peer + mga, err := mn.ToNetAddr(ma) + if err != nil { + return nil, errors.Wrap(err, "convert multiaddr") + } + + creds, err := p2p.NewPeerAuthenticator( + e.logger, + e.config.P2P, + nil, + nil, + nil, + nil, + [][]byte{[]byte(peerID)}, + map[string]channel.AllowedPeerPolicyType{}, + map[string]channel.AllowedPeerPolicyType{}, + ).CreateClientTLSCredentials([]byte(peerID)) + if err != nil { + return nil, errors.Wrap(err, "create credentials") + } + + conn, err := grpc.NewClient( + mga.String(), + grpc.WithTransportCredentials(creds), + ) + if err != nil { + return nil, errors.Wrap(err, "dial peer") + } + defer conn.Close() + + client := protobufs.NewGlobalServiceClient(conn) + + reqCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + resp, err := client.GetAppShards(reqCtx, &protobufs.GetAppShardsRequest{ + ShardKey: e.appAddress, + Prefix: []uint32{}, + }) + if err != nil { + return nil, errors.Wrap(err, "get app shards") + } + + return resp.GetInfo(), nil +} + +// awaitFirstGlobalFrame waits for a global frame to arrive via pubsub and +// returns it. This is used during genesis initialization to get the correct +// difficulty from the network. +func (e *AppConsensusEngine) awaitFirstGlobalFrame( + ctx context.Context, +) (*protobufs.GlobalFrame, error) { + e.logger.Info("awaiting first global frame from network for genesis initialization") + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case frame := <-e.genesisInitChan: + e.logger.Info( + "received global frame for genesis initialization", + zap.Uint64("frame_number", frame.Header.FrameNumber), + zap.Uint32("difficulty", frame.Header.Difficulty), + ) + return frame, nil + } +} + func (e *AppConsensusEngine) waitForProverRegistration( ctx lifecycle.SignalerContext, ) error { diff --git a/node/consensus/app/consensus_dynamic_committee.go b/node/consensus/app/consensus_dynamic_committee.go index 0053230..feee074 100644 --- a/node/consensus/app/consensus_dynamic_committee.go +++ b/node/consensus/app/consensus_dynamic_committee.go @@ -123,6 +123,10 @@ func (e *AppConsensusEngine) LeaderForRank(rank uint64) ( return "", errors.Wrap(err, "leader for rank") } + if e.config.P2P.Network == 0 && len(proverSet) < 3 { + return models.Identity(make([]byte, 32)), nil + } + // Handle condition where prover cannot be yet known due to lack of sync: if len(proverSet) == 0 { return models.Identity(make([]byte, 32)), nil diff --git a/node/consensus/app/message_processors.go b/node/consensus/app/message_processors.go index 62d0310..398e955 100644 --- a/node/consensus/app/message_processors.go +++ b/node/consensus/app/message_processors.go @@ -1368,6 +1368,20 @@ func (e *AppConsensusEngine) handleGlobalFrameMessage(message *pb.Message) { return } + // If genesis hasn't been initialized yet, send this frame to the + // genesis init channel (non-blocking) + if !e.genesisInitialized.Load() { + select { + case e.genesisInitChan <- frame: + e.logger.Debug( + "sent global frame to genesis init channel", + zap.Uint64("frame_number", frame.Header.FrameNumber), + ) + default: + // Channel already has a frame, skip + } + } + if err := e.globalTimeReel.Insert(frame); err != nil { // Success metric recorded at the end of processing globalFramesProcessedTotal.WithLabelValues("error").Inc() diff --git a/node/consensus/global/coverage_events.go b/node/consensus/global/coverage_events.go index c0bd224..3d565df 100644 --- a/node/consensus/global/coverage_events.go +++ b/node/consensus/global/coverage_events.go @@ -116,16 +116,21 @@ func (e *GlobalConsensusEngine) checkShardCoverage(frameNumber uint64) error { remaining = int(haltGraceFrames - streak.Count) } if remaining <= 0 && e.config.P2P.Network == 0 { - e.logger.Error( - "CRITICAL: Shard has insufficient coverage - triggering network halt", - zap.String("shard_address", hex.EncodeToString([]byte(shardAddress))), - zap.Uint64("prover_count", proverCount), - zap.Uint64("halt_threshold", haltThreshold), - ) + // Instead of halting, enter prover-only mode at the global level + // This allows prover messages to continue while blocking other messages + if !e.proverOnlyMode.Load() { + e.logger.Warn( + "CRITICAL: Shard has insufficient coverage - entering prover-only mode (non-prover messages will be dropped)", + zap.String("shard_address", hex.EncodeToString([]byte(shardAddress))), + zap.Uint64("prover_count", proverCount), + zap.Uint64("halt_threshold", haltThreshold), + ) + e.proverOnlyMode.Store(true) + } - // Emit halt event + // Emit warning event (not halt) so monitoring knows we're in degraded state e.emitCoverageEvent( - typesconsensus.ControlEventCoverageHalt, + typesconsensus.ControlEventCoverageWarn, &typesconsensus.CoverageEventData{ ShardAddress: []byte(shardAddress), ProverCount: int(proverCount), @@ -133,7 +138,7 @@ func (e *GlobalConsensusEngine) checkShardCoverage(frameNumber uint64) error { AttestedStorage: attestedStorage, TreeMetadata: coverage.TreeMetadata, Message: fmt.Sprintf( - "Shard has only %d provers, network halt required", + "Shard has only %d provers, prover-only mode active (non-prover messages dropped)", proverCount, ), }, @@ -170,6 +175,16 @@ func (e *GlobalConsensusEngine) checkShardCoverage(frameNumber uint64) error { // Not in critical state — clear any ongoing streak e.clearStreak(shardAddress) + // If we were in prover-only mode and coverage is restored, exit prover-only mode + if e.proverOnlyMode.Load() { + e.logger.Info( + "Coverage restored - exiting prover-only mode", + zap.String("shard_address", hex.EncodeToString([]byte(shardAddress))), + zap.Uint64("prover_count", proverCount), + ) + e.proverOnlyMode.Store(false) + } + // Check for low coverage if proverCount < minProvers { e.handleLowCoverage([]byte(shardAddress), coverage, minProvers) diff --git a/node/consensus/global/genesis.go b/node/consensus/global/genesis.go index d25d10c..cf1f103 100644 --- a/node/consensus/global/genesis.go +++ b/node/consensus/global/genesis.go @@ -531,6 +531,85 @@ func (e *GlobalConsensusEngine) createStubGenesis() *protobufs.GlobalFrame { commitments[i] = &tries.VectorCommitmentTree{} } + e.establishTestnetGenesisProvers() + + roots, err := e.hypergraph.Commit(0) + if err != nil { + e.logger.Error("could not commit", zap.Error(err)) + return nil + } + + // Parse and set initial commitments from JSON + for shardKey, commits := range roots { + for i := 0; i < 3; i++ { + commitments[shardKey.L1[i]].Insert( + shardKey.L2[:], + commits[0], + nil, + big.NewInt(int64(len(commits[0]))), + ) + commitments[shardKey.L1[i]].Commit(e.inclusionProver, false) + } + } + + proverRoots := roots[tries.ShardKey{ + L1: [3]byte{}, + L2: intrinsics.GLOBAL_INTRINSIC_ADDRESS, + }] + + proverRoot := proverRoots[0] + + genesisHeader.ProverTreeCommitment = proverRoot + + for i := 0; i < 256; i++ { + genesisHeader.GlobalCommitments[i] = commitments[i].Commit( + e.inclusionProver, + false, + ) + } + + // Establish an empty signature payload – this avoids panics on broken + // header readers + genesisHeader.PublicKeySignatureBls48581 = + &protobufs.BLS48581AggregateSignature{ + Signature: make([]byte, 0), + PublicKey: &protobufs.BLS48581G2PublicKey{ + KeyValue: make([]byte, 0), + }, + Bitmask: make([]byte, 0), + } + + genesisFrame := &protobufs.GlobalFrame{ + Header: genesisHeader, + } + + // Compute frame ID and store the full frame + frameIDBI, _ := poseidon.HashBytes(genesisHeader.Output) + frameID := frameIDBI.FillBytes(make([]byte, 32)) + e.frameStoreMu.Lock() + e.frameStore[string(frameID)] = genesisFrame + e.frameStoreMu.Unlock() + + // Add to time reel + txn, err := e.clockStore.NewTransaction(false) + if err != nil { + panic(err) + } + if err := e.clockStore.PutGlobalClockFrame(genesisFrame, txn); err != nil { + txn.Abort() + e.logger.Error("could not add frame", zap.Error(err)) + return nil + } + if err := txn.Commit(); err != nil { + txn.Abort() + e.logger.Error("could not add frame", zap.Error(err)) + return nil + } + + return genesisFrame +} + +func (e *GlobalConsensusEngine) establishTestnetGenesisProvers() error { var proverPubKeys [][]byte var err error if e.config.P2P.Network != 99 && e.config.Engine != nil && @@ -647,83 +726,9 @@ func (e *GlobalConsensusEngine) createStubGenesis() *protobufs.GlobalFrame { err = state.Commit() if err != nil { e.logger.Error("failed to commit", zap.Error(err)) - return nil } - roots, err := e.hypergraph.Commit(0) - if err != nil { - e.logger.Error("could not commit", zap.Error(err)) - return nil - } - - // Parse and set initial commitments from JSON - for shardKey, commits := range roots { - for i := 0; i < 3; i++ { - commitments[shardKey.L1[i]].Insert( - shardKey.L2[:], - commits[0], - nil, - big.NewInt(int64(len(commits[0]))), - ) - commitments[shardKey.L1[i]].Commit(e.inclusionProver, false) - } - } - - proverRoots := roots[tries.ShardKey{ - L1: [3]byte{}, - L2: intrinsics.GLOBAL_INTRINSIC_ADDRESS, - }] - - proverRoot := proverRoots[0] - - genesisHeader.ProverTreeCommitment = proverRoot - - for i := 0; i < 256; i++ { - genesisHeader.GlobalCommitments[i] = commitments[i].Commit( - e.inclusionProver, - false, - ) - } - - // Establish an empty signature payload – this avoids panics on broken - // header readers - genesisHeader.PublicKeySignatureBls48581 = - &protobufs.BLS48581AggregateSignature{ - Signature: make([]byte, 0), - PublicKey: &protobufs.BLS48581G2PublicKey{ - KeyValue: make([]byte, 0), - }, - Bitmask: make([]byte, 0), - } - - genesisFrame := &protobufs.GlobalFrame{ - Header: genesisHeader, - } - - // Compute frame ID and store the full frame - frameIDBI, _ := poseidon.HashBytes(genesisHeader.Output) - frameID := frameIDBI.FillBytes(make([]byte, 32)) - e.frameStoreMu.Lock() - e.frameStore[string(frameID)] = genesisFrame - e.frameStoreMu.Unlock() - - // Add to time reel - txn, err := e.clockStore.NewTransaction(false) - if err != nil { - panic(err) - } - if err := e.clockStore.PutGlobalClockFrame(genesisFrame, txn); err != nil { - txn.Abort() - e.logger.Error("could not add frame", zap.Error(err)) - return nil - } - if err := txn.Commit(); err != nil { - txn.Abort() - e.logger.Error("could not add frame", zap.Error(err)) - return nil - } - - return genesisFrame + return nil } // InitializeGenesisState ensures the global genesis frame and QC exist using the diff --git a/node/consensus/global/global_consensus_engine.go b/node/consensus/global/global_consensus_engine.go index fbb0607..1040f84 100644 --- a/node/consensus/global/global_consensus_engine.go +++ b/node/consensus/global/global_consensus_engine.go @@ -37,6 +37,7 @@ import ( "source.quilibrium.com/quilibrium/monorepo/consensus/validator" "source.quilibrium.com/quilibrium/monorepo/consensus/verification" "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb" + hgcrdt "source.quilibrium.com/quilibrium/monorepo/hypergraph" "source.quilibrium.com/quilibrium/monorepo/lifecycle" "source.quilibrium.com/quilibrium/monorepo/node/consensus/aggregator" "source.quilibrium.com/quilibrium/monorepo/node/consensus/provers" @@ -199,6 +200,7 @@ type GlobalConsensusEngine struct { appFrameStore map[string]*protobufs.AppShardFrame appFrameStoreMu sync.RWMutex lowCoverageStreak map[string]*coverageStreak + proverOnlyMode atomic.Bool peerInfoDigestCache map[string]struct{} peerInfoDigestCacheMu sync.Mutex keyRegistryDigestCache map[string]struct{} @@ -595,6 +597,42 @@ func NewGlobalConsensusEngine( if err != nil { establishGenesis() } else { + adds := engine.hypergraph.(*hgcrdt.HypergraphCRDT).GetVertexAddsSet( + tries.ShardKey{ + L1: [3]byte{}, + L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)), + }, + ) + + if lc, _ := adds.GetTree().GetMetadata(); lc == 0 { + if config.P2P.Network == 0 { + genesisData := engine.getMainnetGenesisJSON() + if genesisData == nil { + panic("no genesis data") + } + + state := hgstate.NewHypergraphState(engine.hypergraph) + + err = engine.establishMainnetGenesisProvers(state, genesisData) + if err != nil { + engine.logger.Error("failed to establish provers", zap.Error(err)) + panic(err) + } + + err = state.Commit() + if err != nil { + engine.logger.Error("failed to commit", zap.Error(err)) + panic(err) + } + } else { + engine.establishTestnetGenesisProvers() + } + + err := engine.proverRegistry.Refresh() + if err != nil { + panic(err) + } + } if latest.LatestTimeout != nil { logger.Info( "obtained latest consensus state", diff --git a/node/consensus/global/message_collector.go b/node/consensus/global/message_collector.go index 338e1d3..164fa61 100644 --- a/node/consensus/global/message_collector.go +++ b/node/consensus/global/message_collector.go @@ -234,6 +234,15 @@ func (e *GlobalConsensusEngine) addGlobalMessage(data []byte) { return } + // In prover-only mode, filter out non-prover messages + if e.proverOnlyMode.Load() { + bundle.Requests = e.filterProverOnlyRequests(bundle.Requests) + if len(bundle.Requests) == 0 { + // All requests were filtered out + return + } + } + if len(bundle.Requests) > maxGlobalMessagesPerFrame { if e.logger != nil { e.logger.Debug( @@ -265,6 +274,49 @@ func (e *GlobalConsensusEngine) addGlobalMessage(data []byte) { e.messageAggregator.Add(record) } +// filterProverOnlyRequests filters a list of message requests to only include +// prover-related messages. This is used when in prover-only mode due to +// insufficient coverage. +func (e *GlobalConsensusEngine) filterProverOnlyRequests( + requests []*protobufs.MessageRequest, +) []*protobufs.MessageRequest { + filtered := make([]*protobufs.MessageRequest, 0, len(requests)) + droppedCount := 0 + + for _, req := range requests { + if req == nil || req.GetRequest() == nil { + continue + } + + // Only allow prover-related message types + switch req.GetRequest().(type) { + case *protobufs.MessageRequest_Join, + *protobufs.MessageRequest_Leave, + *protobufs.MessageRequest_Pause, + *protobufs.MessageRequest_Resume, + *protobufs.MessageRequest_Confirm, + *protobufs.MessageRequest_Reject, + *protobufs.MessageRequest_Kick, + *protobufs.MessageRequest_Update: + // Prover messages are allowed + filtered = append(filtered, req) + default: + // All other messages are dropped in prover-only mode + droppedCount++ + } + } + + if droppedCount > 0 && e.logger != nil { + e.logger.Debug( + "dropped non-prover messages in prover-only mode", + zap.Int("dropped_count", droppedCount), + zap.Int("allowed_count", len(filtered)), + ) + } + + return filtered +} + func (e *GlobalConsensusEngine) logBundleRequestTypes( bundle *protobufs.MessageBundle, ) { diff --git a/node/consensus/provers/prover_registry.go b/node/consensus/provers/prover_registry.go index d5ffbc8..5d64e76 100644 --- a/node/consensus/provers/prover_registry.go +++ b/node/consensus/provers/prover_registry.go @@ -12,7 +12,6 @@ import ( "github.com/iden3/go-iden3-crypto/poseidon" "github.com/pkg/errors" "go.uber.org/zap" - hgcrdt "source.quilibrium.com/quilibrium/monorepo/hypergraph" "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/global" hgstate "source.quilibrium.com/quilibrium/monorepo/node/execution/state/hypergraph" "source.quilibrium.com/quilibrium/monorepo/types/consensus" @@ -414,19 +413,30 @@ func (r *ProverRegistry) PruneOrphanJoins(frameNumber uint64) error { } cutoff := frameNumber - 760 - var pruned int + var prunedAllocations int + var prunedProvers int - set := r.hypergraph.(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(tries.ShardKey{ + shardKey := tries.ShardKey{ L1: [3]byte{0x00, 0x00, 0x00}, L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)), - }) + } - txn, err := set.GetTree().Store.NewTransaction(false) + txn, err := r.hypergraph.NewTransaction(false) if err != nil { return errors.Wrap(err, "prune orphan joins") } - for _, info := range r.proverCache { + // Track provers to remove from cache after pruning + proversToRemove := []string{} + + r.logger.Debug( + "starting prune orphan joins scan", + zap.Uint64("frame_number", frameNumber), + zap.Uint64("cutoff", cutoff), + zap.Int("prover_cache_size", len(r.proverCache)), + ) + + for addr, info := range r.proverCache { if info == nil || len(info.Allocations) == 0 { continue } @@ -435,7 +445,20 @@ func (r *ProverRegistry) PruneOrphanJoins(frameNumber uint64) error { var removedFilters map[string]struct{} for _, allocation := range info.Allocations { - if allocation.Status == consensus.ProverStatusJoining && + // Log each allocation being evaluated + r.logger.Debug( + "evaluating allocation for prune", + zap.String("prover_address", hex.EncodeToString(info.Address)), + zap.Int("status", int(allocation.Status)), + zap.Uint64("join_frame", allocation.JoinFrameNumber), + zap.Uint64("cutoff", cutoff), + zap.Bool("is_joining", allocation.Status == consensus.ProverStatusJoining), + zap.Bool("is_rejected", allocation.Status == consensus.ProverStatusRejected), + zap.Bool("is_old_enough", allocation.JoinFrameNumber < cutoff), + ) + + if (allocation.Status == consensus.ProverStatusJoining || + allocation.Status == consensus.ProverStatusRejected) && allocation.JoinFrameNumber < cutoff { if err := r.pruneAllocationVertex(txn, info, allocation); err != nil { txn.Abort() @@ -446,7 +469,7 @@ func (r *ProverRegistry) PruneOrphanJoins(frameNumber uint64) error { removedFilters = make(map[string]struct{}) } removedFilters[string(allocation.ConfirmationFilter)] = struct{}{} - pruned++ + prunedAllocations++ continue } @@ -456,17 +479,33 @@ func (r *ProverRegistry) PruneOrphanJoins(frameNumber uint64) error { if len(updated) != len(info.Allocations) { info.Allocations = updated r.cleanupFilterCache(info, removedFilters) + + // If no allocations remain, prune the prover record as well + if len(updated) == 0 { + if err := r.pruneProverRecord(txn, shardKey, info); err != nil { + txn.Abort() + return errors.Wrap(err, "prune orphan joins") + } + proversToRemove = append(proversToRemove, addr) + prunedProvers++ + } } } - if pruned > 0 { + // Remove pruned provers from cache + for _, addr := range proversToRemove { + delete(r.proverCache, addr) + } + + if prunedAllocations > 0 || prunedProvers > 0 { if err := txn.Commit(); err != nil { return errors.Wrap(err, "prune orphan joins") } r.logger.Info( "pruned orphan prover allocations", - zap.Int("allocations_pruned", pruned), + zap.Int("allocations_pruned", prunedAllocations), + zap.Int("provers_pruned", prunedProvers), zap.Uint64("frame_cutoff", cutoff), ) } else { @@ -510,33 +549,96 @@ func (r *ProverRegistry) pruneAllocationVertex( allocationHash.FillBytes(make([]byte, 32)), ) - _, err = r.hypergraph.GetVertex(vertexID) - if err != nil { - if errors.Cause(err) == hypergraph.ErrRemoved { - return nil - } + shardKey := tries.ShardKey{ + L1: [3]byte{0x00, 0x00, 0x00}, + L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)), + } + + // Use DeleteVertexAdd which properly handles locking and deletes both + // the tree entry and the vertex data atomically + if err := r.hypergraph.DeleteVertexAdd(txn, shardKey, vertexID); err != nil { r.logger.Debug( - "allocation vertex missing during prune", + "could not delete allocation vertex during prune", zap.String("address", hex.EncodeToString(info.Address)), zap.String( "filter", hex.EncodeToString(allocation.ConfirmationFilter), ), + zap.String("vertex_id", hex.EncodeToString(vertexID[:])), zap.Error(err), ) - return nil + // Don't return error - the vertex may already be deleted + } else { + r.logger.Debug( + "deleted allocation vertex during prune", + zap.String("address", hex.EncodeToString(info.Address)), + zap.String( + "filter", + hex.EncodeToString(allocation.ConfirmationFilter), + ), + zap.String("vertex_id", hex.EncodeToString(vertexID[:])), + ) } - set := r.hypergraph.(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(tries.ShardKey{ - L1: [3]byte{0x00, 0x00, 0x00}, - L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)), - }) + return nil +} - vtree := set.GetTree() - if err := vtree.Delete(txn, vertexID[:]); err != nil { - return errors.Wrap(err, "prune allocation remove vertex") +// pruneProverRecord removes a prover's vertex, hyperedge, and associated data +// when all of its allocations have been pruned. +func (r *ProverRegistry) pruneProverRecord( + txn tries.TreeBackingStoreTransaction, + shardKey tries.ShardKey, + info *consensus.ProverInfo, +) error { + if info == nil || len(info.Address) == 0 { + return errors.New("missing prover info") } + // Construct the prover vertex ID + var proverVertexID [64]byte + copy(proverVertexID[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:]) + copy(proverVertexID[32:], info.Address) + + // Delete prover vertex using DeleteVertexAdd which properly handles locking + // and deletes both the tree entry and vertex data atomically + if err := r.hypergraph.DeleteVertexAdd(txn, shardKey, proverVertexID); err != nil { + r.logger.Debug( + "could not delete prover vertex during prune", + zap.String("address", hex.EncodeToString(info.Address)), + zap.String("vertex_id", hex.EncodeToString(proverVertexID[:])), + zap.Error(err), + ) + // Don't return error - the vertex may already be deleted + } else { + r.logger.Debug( + "deleted prover vertex during prune", + zap.String("address", hex.EncodeToString(info.Address)), + zap.String("vertex_id", hex.EncodeToString(proverVertexID[:])), + ) + } + + // Delete prover hyperedge using DeleteHyperedgeAdd + if err := r.hypergraph.DeleteHyperedgeAdd(txn, shardKey, proverVertexID); err != nil { + r.logger.Debug( + "could not delete prover hyperedge during prune", + zap.String("address", hex.EncodeToString(info.Address)), + zap.String("hyperedge_id", hex.EncodeToString(proverVertexID[:])), + zap.Error(err), + ) + // Don't return error - the hyperedge may already be deleted + } else { + r.logger.Debug( + "deleted prover hyperedge during prune", + zap.String("address", hex.EncodeToString(info.Address)), + zap.String("hyperedge_id", hex.EncodeToString(proverVertexID[:])), + ) + } + + r.logger.Debug( + "pruned prover record", + zap.String("address", hex.EncodeToString(info.Address)), + ) + return nil } diff --git a/node/consensus/provers/prover_registry_test.go b/node/consensus/provers/prover_registry_test.go index 9cb9e3c..8c0d5f7 100644 --- a/node/consensus/provers/prover_registry_test.go +++ b/node/consensus/provers/prover_registry_test.go @@ -2,6 +2,9 @@ package provers import ( "bytes" + "encoding/binary" + "encoding/hex" + "fmt" "math/big" "slices" "testing" @@ -11,10 +14,18 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap" + "source.quilibrium.com/quilibrium/monorepo/bls48581" + "source.quilibrium.com/quilibrium/monorepo/config" + hgcrdt "source.quilibrium.com/quilibrium/monorepo/hypergraph" + "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/global" + "source.quilibrium.com/quilibrium/monorepo/node/store" "source.quilibrium.com/quilibrium/monorepo/node/tests" "source.quilibrium.com/quilibrium/monorepo/types/consensus" + "source.quilibrium.com/quilibrium/monorepo/types/execution/intrinsics" "source.quilibrium.com/quilibrium/monorepo/types/mocks" + "source.quilibrium.com/quilibrium/monorepo/types/schema" "source.quilibrium.com/quilibrium/monorepo/types/tries" + "source.quilibrium.com/quilibrium/monorepo/verenc" ) type mockIterator struct { @@ -171,3 +182,1220 @@ func TestProverRegistryWithShards(t *testing.T) { assert.Equal(t, 0, count) }) } + +// TestPruneOrphanJoins_Comprehensive tests the pruning of orphan prover joins +// with a comprehensive scenario covering global provers, app shard provers, +// and mixed allocation states. +func TestPruneOrphanJoins_Comprehensive(t *testing.T) { + logger := zap.NewNop() + + // Create stores with in-memory pebble DB + pebbleDB := store.NewPebbleDB( + logger, + &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/prune_orphan"}, + 0, + ) + defer pebbleDB.Close() + + // Create inclusion prover and verifiable encryptor + inclusionProver := bls48581.NewKZGInclusionProver(logger) + verifiableEncryptor := verenc.NewMPCitHVerifiableEncryptor(1) + + // Create hypergraph store and hypergraph + hypergraphStore := store.NewPebbleHypergraphStore( + &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/prune_orphan"}, + pebbleDB, + logger, + verifiableEncryptor, + inclusionProver, + ) + hg, err := hypergraphStore.LoadHypergraph(&tests.Nopthenticator{}, 1) + require.NoError(t, err) + + // Create RDF multiprover for setting up test data + rdfMultiprover := schema.NewRDFMultiprover( + &schema.TurtleRDFParser{}, + inclusionProver, + ) + + // Current frame for testing - pruning will use cutoff = currentFrame - 760 + // For currentFrame=1000, cutoff = 240 + // Allocations with JoinFrameNumber < 240 will be pruned + const currentFrame = uint64(1000) + const oldJoinFrame = uint64(100) // 100 < 240, will be pruned + const recentJoinFrame = currentFrame - 10 // 990 > 240, will NOT be pruned + + type allocationSpec struct { + filter []byte + joinFrame uint64 + status byte // 0=Joining, 1=Active + } + + // Helper to create a prover with specific allocations + createProverWithAllocations := func( + publicKey []byte, + proverStatus byte, + allocations []allocationSpec, + ) ([]byte, error) { + proverAddressBI, err := poseidon.HashBytes(publicKey) + if err != nil { + return nil, err + } + proverAddress := proverAddressBI.FillBytes(make([]byte, 32)) + + hgCRDT := hg.(*hgcrdt.HypergraphCRDT) + txn, err := hgCRDT.NewTransaction(false) + if err != nil { + return nil, err + } + + // Create prover vertex + proverTree := &tries.VectorCommitmentTree{} + err = rdfMultiprover.Set( + global.GLOBAL_RDF_SCHEMA, + intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], + "prover:Prover", + "PublicKey", + publicKey, + proverTree, + ) + if err != nil { + txn.Abort() + return nil, err + } + + err = rdfMultiprover.Set( + global.GLOBAL_RDF_SCHEMA, + intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], + "prover:Prover", + "Status", + []byte{proverStatus}, + proverTree, + ) + if err != nil { + txn.Abort() + return nil, err + } + + availableStorageBytes := make([]byte, 8) + err = rdfMultiprover.Set( + global.GLOBAL_RDF_SCHEMA, + intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], + "prover:Prover", + "AvailableStorage", + availableStorageBytes, + proverTree, + ) + if err != nil { + txn.Abort() + return nil, err + } + + seniorityBytes := make([]byte, 8) + err = rdfMultiprover.Set( + global.GLOBAL_RDF_SCHEMA, + intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], + "prover:Prover", + "Seniority", + seniorityBytes, + proverTree, + ) + if err != nil { + txn.Abort() + return nil, err + } + + // Add prover vertex to hypergraph + proverVertex := hgcrdt.NewVertex( + intrinsics.GLOBAL_INTRINSIC_ADDRESS, + [32]byte(proverAddress), + proverTree.Commit(inclusionProver, false), + big.NewInt(0), + ) + err = hg.AddVertex(txn, proverVertex) + if err != nil { + txn.Abort() + return nil, err + } + + // Save prover vertex data + var proverVertexID [64]byte + copy(proverVertexID[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:]) + copy(proverVertexID[32:], proverAddress) + err = hg.SetVertexData(txn, proverVertexID, proverTree) + if err != nil { + txn.Abort() + return nil, err + } + + // Create hyperedge for prover + hyperedge := hgcrdt.NewHyperedge( + intrinsics.GLOBAL_INTRINSIC_ADDRESS, + [32]byte(proverAddress), + ) + + // Create allocation vertices for each allocation spec + for _, alloc := range allocations { + allocationAddressBI, err := poseidon.HashBytes( + slices.Concat([]byte("PROVER_ALLOCATION"), publicKey, alloc.filter), + ) + if err != nil { + txn.Abort() + return nil, err + } + allocationAddress := allocationAddressBI.FillBytes(make([]byte, 32)) + + allocationTree := &tries.VectorCommitmentTree{} + err = rdfMultiprover.Set( + global.GLOBAL_RDF_SCHEMA, + intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], + "allocation:ProverAllocation", + "Prover", + proverAddress, + allocationTree, + ) + if err != nil { + txn.Abort() + return nil, err + } + + err = rdfMultiprover.Set( + global.GLOBAL_RDF_SCHEMA, + intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], + "allocation:ProverAllocation", + "Status", + []byte{alloc.status}, + allocationTree, + ) + if err != nil { + txn.Abort() + return nil, err + } + + err = rdfMultiprover.Set( + global.GLOBAL_RDF_SCHEMA, + intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], + "allocation:ProverAllocation", + "ConfirmationFilter", + alloc.filter, + allocationTree, + ) + if err != nil { + txn.Abort() + return nil, err + } + + frameNumberBytes := make([]byte, 8) + binary.BigEndian.PutUint64(frameNumberBytes, alloc.joinFrame) + err = rdfMultiprover.Set( + global.GLOBAL_RDF_SCHEMA, + intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], + "allocation:ProverAllocation", + "JoinFrameNumber", + frameNumberBytes, + allocationTree, + ) + if err != nil { + txn.Abort() + return nil, err + } + + // Add allocation vertex + allocationVertex := hgcrdt.NewVertex( + intrinsics.GLOBAL_INTRINSIC_ADDRESS, + [32]byte(allocationAddress), + allocationTree.Commit(inclusionProver, false), + big.NewInt(0), + ) + err = hg.AddVertex(txn, allocationVertex) + if err != nil { + txn.Abort() + return nil, err + } + + // Save allocation vertex data + var allocationVertexID [64]byte + copy(allocationVertexID[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:]) + copy(allocationVertexID[32:], allocationAddress) + err = hg.SetVertexData(txn, allocationVertexID, allocationTree) + if err != nil { + txn.Abort() + return nil, err + } + + // Add allocation to hyperedge + hyperedge.AddExtrinsic(allocationVertex) + } + + // Add hyperedge + err = hg.AddHyperedge(txn, hyperedge) + if err != nil { + txn.Abort() + return nil, err + } + + err = txn.Commit() + if err != nil { + return nil, err + } + + return proverAddress, nil + } + + // Helper to check if vertex exists + vertexExists := func(vertexID [64]byte) bool { + _, err := hg.GetVertex(vertexID) + return err == nil + } + + // Helper to check if vertex data exists + vertexDataExists := func(vertexID [64]byte) bool { + data, err := hg.GetVertexData(vertexID) + return err == nil && data != nil + } + + // Helper to check if hyperedge exists + hyperedgeExists := func(hyperedgeID [64]byte) bool { + _, err := hg.GetHyperedge(hyperedgeID) + return err == nil + } + + // Helper to compute prover vertex ID + getProverVertexID := func(proverAddress []byte) [64]byte { + var id [64]byte + copy(id[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:]) + copy(id[32:], proverAddress) + return id + } + + // Helper to compute allocation vertex ID + getAllocationVertexID := func(publicKey, filter []byte) [64]byte { + allocationHash, _ := poseidon.HashBytes( + slices.Concat([]byte("PROVER_ALLOCATION"), publicKey, filter), + ) + var id [64]byte + copy(id[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:]) + copy(id[32:], allocationHash.FillBytes(make([]byte, 32))) + return id + } + + // ===== CREATE TEST DATA ===== + + // 1. Create 6 global provers with single allocation each, filter=nil, status=active + globalProverAddrs := make([][]byte, 6) + globalProverKeys := make([][]byte, 6) + for i := 0; i < 6; i++ { + publicKey := bytes.Repeat([]byte{byte(0x10 + i)}, 585) + globalProverKeys[i] = publicKey + + proverAddr, err := createProverWithAllocations( + publicKey, + 1, // Active prover status + []allocationSpec{ + {filter: nil, joinFrame: recentJoinFrame, status: 1}, // Active global allocation + }, + ) + require.NoError(t, err) + globalProverAddrs[i] = proverAddr + t.Logf("Created global prover %d at address: %s", + i, hex.EncodeToString(proverAddr)) + } + + // 2. Create 5 app shard provers with 100 allocations each, ALL with old join frame + // These should be completely pruned (prover and all allocations) + allOldProverAddrs := make([][]byte, 5) + allOldProverKeys := make([][]byte, 5) + allOldFilters := make([][][]byte, 5) // Store filters for each prover + for i := 0; i < 5; i++ { + publicKey := bytes.Repeat([]byte{byte(0x20 + i)}, 585) + allOldProverKeys[i] = publicKey + + allocations := make([]allocationSpec, 100) + filters := make([][]byte, 100) + for j := 0; j < 100; j++ { + filter := []byte(fmt.Sprintf("shard_%d_%d", i, j)) + filters[j] = filter + allocations[j] = allocationSpec{ + filter: filter, + joinFrame: oldJoinFrame, // All old - will be pruned + status: 0, // Joining + } + } + allOldFilters[i] = filters + + proverAddr, err := createProverWithAllocations( + publicKey, + 0, // Joining prover status + allocations, + ) + require.NoError(t, err) + allOldProverAddrs[i] = proverAddr + t.Logf("Created all-old app shard prover %d at address: %s", + i, hex.EncodeToString(proverAddr)) + } + + // 3. Create 5 app shard provers with 100 allocations each: + // - 50 with old join frame (will be pruned) + // - 50 with recent join frame (will remain) + // Prover should remain but with only 50 allocations + mixedProverAddrs := make([][]byte, 5) + mixedProverKeys := make([][]byte, 5) + mixedOldFilters := make([][][]byte, 5) // Old filters that should be pruned + mixedNewFilters := make([][][]byte, 5) // Recent filters that should remain + for i := 0; i < 5; i++ { + publicKey := bytes.Repeat([]byte{byte(0x30 + i)}, 585) + mixedProverKeys[i] = publicKey + + allocations := make([]allocationSpec, 100) + oldFilters := make([][]byte, 50) + newFilters := make([][]byte, 50) + + for j := 0; j < 100; j++ { + filter := []byte(fmt.Sprintf("mixed_shard_%d_%d", i, j)) + if j < 50 { + // First 50: old join frame - will be pruned + oldFilters[j] = filter + allocations[j] = allocationSpec{ + filter: filter, + joinFrame: oldJoinFrame, + status: 0, // Joining + } + } else { + // Last 50: recent join frame - will remain + newFilters[j-50] = filter + allocations[j] = allocationSpec{ + filter: filter, + joinFrame: recentJoinFrame, + status: 0, // Joining + } + } + } + mixedOldFilters[i] = oldFilters + mixedNewFilters[i] = newFilters + + proverAddr, err := createProverWithAllocations( + publicKey, + 0, // Joining prover status + allocations, + ) + require.NoError(t, err) + mixedProverAddrs[i] = proverAddr + t.Logf("Created mixed app shard prover %d at address: %s", + i, hex.EncodeToString(proverAddr)) + } + + // 4. Create 3 provers with rejected allocations (old join frame) + // These should be completely pruned like joining allocations + rejectedProverAddrs := make([][]byte, 3) + rejectedProverKeys := make([][]byte, 3) + rejectedFilters := make([][][]byte, 3) + for i := 0; i < 3; i++ { + publicKey := bytes.Repeat([]byte{byte(0x40 + i)}, 585) + rejectedProverKeys[i] = publicKey + + allocations := make([]allocationSpec, 10) + filters := make([][]byte, 10) + for j := 0; j < 10; j++ { + filter := []byte(fmt.Sprintf("rejected_shard_%d_%d", i, j)) + filters[j] = filter + allocations[j] = allocationSpec{ + filter: filter, + joinFrame: oldJoinFrame, + status: 4, // Rejected + } + } + rejectedFilters[i] = filters + + proverAddr, err := createProverWithAllocations( + publicKey, + 0, // Joining prover status + allocations, + ) + require.NoError(t, err) + rejectedProverAddrs[i] = proverAddr + t.Logf("Created rejected prover %d at address: %s", + i, hex.EncodeToString(proverAddr)) + } + + // 5. Create 3 provers with MIXED Active and Joining allocations (all with old join frame) + // Active allocations should NOT be pruned even with old join frame + // Joining allocations should be pruned + // Prover should remain because it has Active allocations + mixedActiveJoiningProverAddrs := make([][]byte, 3) + mixedActiveJoiningProverKeys := make([][]byte, 3) + mixedActiveFilters := make([][][]byte, 3) // Active filters that should remain + mixedJoiningFilters := make([][][]byte, 3) // Joining filters that should be pruned + for i := 0; i < 3; i++ { + publicKey := bytes.Repeat([]byte{byte(0x50 + i)}, 585) + mixedActiveJoiningProverKeys[i] = publicKey + + allocations := make([]allocationSpec, 20) + activeFilters := make([][]byte, 10) + joiningFilters := make([][]byte, 10) + + for j := 0; j < 20; j++ { + filter := []byte(fmt.Sprintf("mixed_active_joining_%d_%d", i, j)) + if j < 10 { + // First 10: Active status with old join frame - should NOT be pruned + activeFilters[j] = filter + allocations[j] = allocationSpec{ + filter: filter, + joinFrame: oldJoinFrame, // Old, but Active so should remain + status: 1, // Active + } + } else { + // Last 10: Joining status with old join frame - should be pruned + joiningFilters[j-10] = filter + allocations[j] = allocationSpec{ + filter: filter, + joinFrame: oldJoinFrame, // Old and Joining, so should be pruned + status: 0, // Joining + } + } + } + mixedActiveFilters[i] = activeFilters + mixedJoiningFilters[i] = joiningFilters + + proverAddr, err := createProverWithAllocations( + publicKey, + 1, // Active prover status + allocations, + ) + require.NoError(t, err) + mixedActiveJoiningProverAddrs[i] = proverAddr + t.Logf("Created mixed active/joining prover %d at address: %s", + i, hex.EncodeToString(proverAddr)) + } + + // 6. Create 2 provers with ALL Active allocations (old join frame) + // None of these should be pruned - Active status protects them + allActiveProverAddrs := make([][]byte, 2) + allActiveProverKeys := make([][]byte, 2) + allActiveFilters := make([][][]byte, 2) + for i := 0; i < 2; i++ { + publicKey := bytes.Repeat([]byte{byte(0x60 + i)}, 585) + allActiveProverKeys[i] = publicKey + + allocations := make([]allocationSpec, 50) + filters := make([][]byte, 50) + for j := 0; j < 50; j++ { + filter := []byte(fmt.Sprintf("all_active_%d_%d", i, j)) + filters[j] = filter + allocations[j] = allocationSpec{ + filter: filter, + joinFrame: oldJoinFrame, // Old, but Active so should remain + status: 1, // Active + } + } + allActiveFilters[i] = filters + + proverAddr, err := createProverWithAllocations( + publicKey, + 1, // Active prover status + allocations, + ) + require.NoError(t, err) + allActiveProverAddrs[i] = proverAddr + t.Logf("Created all-active prover %d at address: %s", + i, hex.EncodeToString(proverAddr)) + } + + // ===== VERIFY INITIAL STATE ===== + + // Verify all global provers exist + for i := 0; i < 6; i++ { + proverID := getProverVertexID(globalProverAddrs[i]) + allocID := getAllocationVertexID(globalProverKeys[i], nil) + + assert.True(t, vertexExists(proverID), + "Global prover %d vertex should exist before prune", i) + assert.True(t, vertexDataExists(proverID), + "Global prover %d vertex data should exist before prune", i) + assert.True(t, vertexExists(allocID), + "Global prover %d allocation should exist before prune", i) + assert.True(t, hyperedgeExists(proverID), + "Global prover %d hyperedge should exist before prune", i) + } + + // Verify all-old provers exist + for i := 0; i < 5; i++ { + proverID := getProverVertexID(allOldProverAddrs[i]) + assert.True(t, vertexExists(proverID), + "All-old prover %d vertex should exist before prune", i) + assert.True(t, vertexDataExists(proverID), + "All-old prover %d vertex data should exist before prune", i) + assert.True(t, hyperedgeExists(proverID), + "All-old prover %d hyperedge should exist before prune", i) + + // Verify all 100 allocations exist + for j := 0; j < 100; j++ { + allocID := getAllocationVertexID(allOldProverKeys[i], allOldFilters[i][j]) + assert.True(t, vertexExists(allocID), + "All-old prover %d allocation %d should exist before prune", i, j) + } + } + + // Verify mixed provers exist + for i := 0; i < 5; i++ { + proverID := getProverVertexID(mixedProverAddrs[i]) + assert.True(t, vertexExists(proverID), + "Mixed prover %d vertex should exist before prune", i) + assert.True(t, vertexDataExists(proverID), + "Mixed prover %d vertex data should exist before prune", i) + assert.True(t, hyperedgeExists(proverID), + "Mixed prover %d hyperedge should exist before prune", i) + + // Verify all 100 allocations exist + for j := 0; j < 50; j++ { + oldAllocID := getAllocationVertexID(mixedProverKeys[i], mixedOldFilters[i][j]) + assert.True(t, vertexExists(oldAllocID), + "Mixed prover %d old allocation %d should exist before prune", i, j) + + newAllocID := getAllocationVertexID(mixedProverKeys[i], mixedNewFilters[i][j]) + assert.True(t, vertexExists(newAllocID), + "Mixed prover %d new allocation %d should exist before prune", i, j) + } + } + + // Verify rejected provers exist + for i := 0; i < 3; i++ { + proverID := getProverVertexID(rejectedProverAddrs[i]) + assert.True(t, vertexExists(proverID), + "Rejected prover %d vertex should exist before prune", i) + assert.True(t, vertexDataExists(proverID), + "Rejected prover %d vertex data should exist before prune", i) + assert.True(t, hyperedgeExists(proverID), + "Rejected prover %d hyperedge should exist before prune", i) + + // Verify all 10 rejected allocations exist + for j := 0; j < 10; j++ { + allocID := getAllocationVertexID(rejectedProverKeys[i], rejectedFilters[i][j]) + assert.True(t, vertexExists(allocID), + "Rejected prover %d allocation %d should exist before prune", i, j) + } + } + + // Verify mixed active/joining provers exist + for i := 0; i < 3; i++ { + proverID := getProverVertexID(mixedActiveJoiningProverAddrs[i]) + assert.True(t, vertexExists(proverID), + "Mixed active/joining prover %d vertex should exist before prune", i) + assert.True(t, vertexDataExists(proverID), + "Mixed active/joining prover %d vertex data should exist before prune", i) + assert.True(t, hyperedgeExists(proverID), + "Mixed active/joining prover %d hyperedge should exist before prune", i) + + // Verify all 10 active allocations exist + for j := 0; j < 10; j++ { + allocID := getAllocationVertexID(mixedActiveJoiningProverKeys[i], mixedActiveFilters[i][j]) + assert.True(t, vertexExists(allocID), + "Mixed active/joining prover %d active allocation %d should exist before prune", i, j) + } + // Verify all 10 joining allocations exist + for j := 0; j < 10; j++ { + allocID := getAllocationVertexID(mixedActiveJoiningProverKeys[i], mixedJoiningFilters[i][j]) + assert.True(t, vertexExists(allocID), + "Mixed active/joining prover %d joining allocation %d should exist before prune", i, j) + } + } + + // Verify all-active provers exist + for i := 0; i < 2; i++ { + proverID := getProverVertexID(allActiveProverAddrs[i]) + assert.True(t, vertexExists(proverID), + "All-active prover %d vertex should exist before prune", i) + assert.True(t, vertexDataExists(proverID), + "All-active prover %d vertex data should exist before prune", i) + assert.True(t, hyperedgeExists(proverID), + "All-active prover %d hyperedge should exist before prune", i) + + // Verify all 50 active allocations exist + for j := 0; j < 50; j++ { + allocID := getAllocationVertexID(allActiveProverKeys[i], allActiveFilters[i][j]) + assert.True(t, vertexExists(allocID), + "All-active prover %d allocation %d should exist before prune", i, j) + } + } + + // ===== CREATE REGISTRY AND PRUNE ===== + + registry, err := NewProverRegistry(logger, hg) + require.NoError(t, err) + + // Run pruning + err = registry.PruneOrphanJoins(currentFrame) + require.NoError(t, err) + + // ===== VERIFY POST-PRUNE STATE ===== + + // 1. Verify global provers are COMPLETELY UNTOUCHED + for i := 0; i < 6; i++ { + proverID := getProverVertexID(globalProverAddrs[i]) + allocID := getAllocationVertexID(globalProverKeys[i], nil) + + assert.True(t, vertexExists(proverID), + "Global prover %d vertex should STILL exist after prune", i) + assert.True(t, vertexDataExists(proverID), + "Global prover %d vertex data should STILL exist after prune", i) + assert.True(t, vertexExists(allocID), + "Global prover %d allocation should STILL exist after prune", i) + assert.True(t, vertexDataExists(allocID), + "Global prover %d allocation data should STILL exist after prune", i) + assert.True(t, hyperedgeExists(proverID), + "Global prover %d hyperedge should STILL exist after prune", i) + } + + // 2. Verify all-old provers are COMPLETELY PRUNED (prover vertex gone too) + for i := 0; i < 5; i++ { + proverID := getProverVertexID(allOldProverAddrs[i]) + + assert.False(t, vertexExists(proverID), + "All-old prover %d vertex should be DELETED after prune", i) + assert.False(t, vertexDataExists(proverID), + "All-old prover %d vertex data should be DELETED after prune", i) + assert.False(t, hyperedgeExists(proverID), + "All-old prover %d hyperedge should be DELETED after prune", i) + + // Verify all 100 allocations are deleted + for j := 0; j < 100; j++ { + allocID := getAllocationVertexID(allOldProverKeys[i], allOldFilters[i][j]) + assert.False(t, vertexExists(allocID), + "All-old prover %d allocation %d should be DELETED after prune", i, j) + assert.False(t, vertexDataExists(allocID), + "All-old prover %d allocation %d data should be DELETED after prune", i, j) + } + } + + // 3. Verify mixed provers: prover remains, old allocations pruned, new allocations remain + for i := 0; i < 5; i++ { + proverID := getProverVertexID(mixedProverAddrs[i]) + + assert.True(t, vertexExists(proverID), + "Mixed prover %d vertex should STILL exist after prune", i) + assert.True(t, vertexDataExists(proverID), + "Mixed prover %d vertex data should STILL exist after prune", i) + assert.True(t, hyperedgeExists(proverID), + "Mixed prover %d hyperedge should STILL exist after prune", i) + + // Verify old allocations are deleted + for j := 0; j < 50; j++ { + oldAllocID := getAllocationVertexID(mixedProverKeys[i], mixedOldFilters[i][j]) + assert.False(t, vertexExists(oldAllocID), + "Mixed prover %d old allocation %d should be DELETED after prune", i, j) + assert.False(t, vertexDataExists(oldAllocID), + "Mixed prover %d old allocation %d data should be DELETED after prune", i, j) + } + + // Verify new allocations remain + for j := 0; j < 50; j++ { + newAllocID := getAllocationVertexID(mixedProverKeys[i], mixedNewFilters[i][j]) + assert.True(t, vertexExists(newAllocID), + "Mixed prover %d new allocation %d should STILL exist after prune", i, j) + assert.True(t, vertexDataExists(newAllocID), + "Mixed prover %d new allocation %d data should STILL exist after prune", i, j) + } + } + + // 4. Verify rejected provers are COMPLETELY PRUNED (prover vertex gone too) + for i := 0; i < 3; i++ { + proverID := getProverVertexID(rejectedProverAddrs[i]) + + assert.False(t, vertexExists(proverID), + "Rejected prover %d vertex should be DELETED after prune", i) + assert.False(t, vertexDataExists(proverID), + "Rejected prover %d vertex data should be DELETED after prune", i) + assert.False(t, hyperedgeExists(proverID), + "Rejected prover %d hyperedge should be DELETED after prune", i) + + // Verify all 10 rejected allocations are deleted + for j := 0; j < 10; j++ { + allocID := getAllocationVertexID(rejectedProverKeys[i], rejectedFilters[i][j]) + assert.False(t, vertexExists(allocID), + "Rejected prover %d allocation %d should be DELETED after prune", i, j) + assert.False(t, vertexDataExists(allocID), + "Rejected prover %d allocation %d data should be DELETED after prune", i, j) + } + } + + // 5. Verify mixed active/joining provers: prover remains, Active allocations remain, + // Joining allocations are pruned + for i := 0; i < 3; i++ { + proverID := getProverVertexID(mixedActiveJoiningProverAddrs[i]) + + assert.True(t, vertexExists(proverID), + "Mixed active/joining prover %d vertex should STILL exist after prune", i) + assert.True(t, vertexDataExists(proverID), + "Mixed active/joining prover %d vertex data should STILL exist after prune", i) + assert.True(t, hyperedgeExists(proverID), + "Mixed active/joining prover %d hyperedge should STILL exist after prune", i) + + // Verify Active allocations REMAIN (not pruned despite old join frame) + for j := 0; j < 10; j++ { + activeAllocID := getAllocationVertexID(mixedActiveJoiningProverKeys[i], mixedActiveFilters[i][j]) + assert.True(t, vertexExists(activeAllocID), + "Mixed active/joining prover %d ACTIVE allocation %d should STILL exist after prune", i, j) + assert.True(t, vertexDataExists(activeAllocID), + "Mixed active/joining prover %d ACTIVE allocation %d data should STILL exist after prune", i, j) + } + + // Verify Joining allocations are DELETED + for j := 0; j < 10; j++ { + joiningAllocID := getAllocationVertexID(mixedActiveJoiningProverKeys[i], mixedJoiningFilters[i][j]) + assert.False(t, vertexExists(joiningAllocID), + "Mixed active/joining prover %d JOINING allocation %d should be DELETED after prune", i, j) + assert.False(t, vertexDataExists(joiningAllocID), + "Mixed active/joining prover %d JOINING allocation %d data should be DELETED after prune", i, j) + } + } + + // 6. Verify all-active provers are COMPLETELY UNTOUCHED + // Active allocations with old join frame should NOT be pruned + for i := 0; i < 2; i++ { + proverID := getProverVertexID(allActiveProverAddrs[i]) + + assert.True(t, vertexExists(proverID), + "All-active prover %d vertex should STILL exist after prune", i) + assert.True(t, vertexDataExists(proverID), + "All-active prover %d vertex data should STILL exist after prune", i) + assert.True(t, hyperedgeExists(proverID), + "All-active prover %d hyperedge should STILL exist after prune", i) + + // Verify all 50 Active allocations REMAIN + for j := 0; j < 50; j++ { + allocID := getAllocationVertexID(allActiveProverKeys[i], allActiveFilters[i][j]) + assert.True(t, vertexExists(allocID), + "All-active prover %d allocation %d should STILL exist after prune", i, j) + assert.True(t, vertexDataExists(allocID), + "All-active prover %d allocation %d data should STILL exist after prune", i, j) + } + } + + // ===== VERIFY REGISTRY CACHE STATE ===== + + // Global provers should still be in cache + for i := 0; i < 6; i++ { + info, err := registry.GetProverInfo(globalProverAddrs[i]) + require.NoError(t, err) + assert.NotNil(t, info, "Global prover %d should still be in registry cache", i) + assert.Len(t, info.Allocations, 1, "Global prover %d should have 1 allocation", i) + } + + // All-old provers should be removed from cache + for i := 0; i < 5; i++ { + info, err := registry.GetProverInfo(allOldProverAddrs[i]) + require.NoError(t, err) + assert.Nil(t, info, "All-old prover %d should be removed from registry cache", i) + } + + // Mixed provers should still be in cache with only 50 allocations + for i := 0; i < 5; i++ { + info, err := registry.GetProverInfo(mixedProverAddrs[i]) + require.NoError(t, err) + assert.NotNil(t, info, "Mixed prover %d should still be in registry cache", i) + assert.Len(t, info.Allocations, 50, + "Mixed prover %d should have 50 allocations after prune", i) + } + + // Rejected provers should be removed from cache + for i := 0; i < 3; i++ { + info, err := registry.GetProverInfo(rejectedProverAddrs[i]) + require.NoError(t, err) + assert.Nil(t, info, "Rejected prover %d should be removed from registry cache", i) + } + + // Mixed active/joining provers should still be in cache with only 10 allocations (the Active ones) + for i := 0; i < 3; i++ { + info, err := registry.GetProverInfo(mixedActiveJoiningProverAddrs[i]) + require.NoError(t, err) + assert.NotNil(t, info, "Mixed active/joining prover %d should still be in registry cache", i) + assert.Len(t, info.Allocations, 10, + "Mixed active/joining prover %d should have 10 allocations (Active ones) after prune", i) + + // Verify all remaining allocations are Active status + for _, alloc := range info.Allocations { + assert.Equal(t, consensus.ProverStatusActive, alloc.Status, + "Mixed active/joining prover %d should only have Active allocations remaining", i) + } + } + + // All-active provers should still be in cache with all 50 allocations + for i := 0; i < 2; i++ { + info, err := registry.GetProverInfo(allActiveProverAddrs[i]) + require.NoError(t, err) + assert.NotNil(t, info, "All-active prover %d should still be in registry cache", i) + assert.Len(t, info.Allocations, 50, + "All-active prover %d should still have all 50 allocations after prune", i) + + // Verify all allocations are Active status + for _, alloc := range info.Allocations { + assert.Equal(t, consensus.ProverStatusActive, alloc.Status, + "All-active prover %d should only have Active allocations", i) + } + } + + // ===== VERIFY THROUGH ADDITIONAL REGISTRY METHODS ===== + + // Verify all-old provers don't appear in GetProversByStatus(Joining) + joiningProvers, err := registry.GetProversByStatus(nil, consensus.ProverStatusJoining) + require.NoError(t, err) + for _, prover := range joiningProvers { + for i, addr := range allOldProverAddrs { + assert.NotEqual(t, hex.EncodeToString(addr), hex.EncodeToString(prover.Address), + "All-old prover %d should not appear in GetProversByStatus(Joining)", i) + } + for i, addr := range rejectedProverAddrs { + assert.NotEqual(t, hex.EncodeToString(addr), hex.EncodeToString(prover.Address), + "Rejected prover %d should not appear in GetProversByStatus(Joining)", i) + } + } + + // Verify all-old provers don't appear in GetProvers for their filters + for i := 0; i < 5; i++ { + for j := 0; j < 100; j++ { + filter := allOldFilters[i][j] + provers, err := registry.GetProvers(filter) + require.NoError(t, err) + for _, p := range provers { + assert.NotEqual(t, hex.EncodeToString(allOldProverAddrs[i]), hex.EncodeToString(p.Address), + "All-old prover %d should not appear in GetProvers for filter %d", i, j) + } + } + } + + // Verify all-active provers appear in GetProversByStatus(Active) for their specific filters + // Note: GetProversByStatus(nil, ...) only returns global provers (filter=nil) + // The all-active provers are on app shards, so we need to check their specific filters + for i := 0; i < 2; i++ { + for j := 0; j < 50; j++ { + filter := allActiveFilters[i][j] + activeProvers, err := registry.GetProversByStatus(filter, consensus.ProverStatusActive) + require.NoError(t, err) + found := false + for _, prover := range activeProvers { + if hex.EncodeToString(allActiveProverAddrs[i]) == hex.EncodeToString(prover.Address) { + found = true + break + } + } + assert.True(t, found, + "All-active prover %d should appear in GetProversByStatus(Active) for filter %d", i, j) + } + } + + t.Logf("Prune test completed successfully:") + t.Logf(" - 6 global provers: untouched") + t.Logf(" - 5 all-old provers: completely pruned (500 allocations)") + t.Logf(" - 5 mixed provers: 250 old allocations pruned, 250 recent allocations remain") + t.Logf(" - 3 rejected provers: completely pruned (30 rejected allocations)") + t.Logf(" - 3 mixed active/joining provers: 30 Joining allocations pruned, 30 Active allocations remain") + t.Logf(" - 2 all-active provers: untouched (100 Active allocations remain)") +} + +// TestPruneOrphanJoins_IncompleteState tests the scenario where a previous prune +// deleted the vertex ID set entry but not the vertex data (simulating the original bug). +// The registry should still be able to prune these allocations by cleaning up the +// orphaned vertex data. +func TestPruneOrphanJoins_IncompleteState(t *testing.T) { + logger := zap.NewNop() + + // Create stores with in-memory pebble DB + pebbleDB := store.NewPebbleDB( + logger, + &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/prune_incomplete"}, + 0, + ) + defer pebbleDB.Close() + + // Create inclusion prover and verifiable encryptor + inclusionProver := bls48581.NewKZGInclusionProver(logger) + verifiableEncryptor := verenc.NewMPCitHVerifiableEncryptor(1) + + // Create hypergraph store and hypergraph + hypergraphStore := store.NewPebbleHypergraphStore( + &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/prune_incomplete"}, + pebbleDB, + logger, + verifiableEncryptor, + inclusionProver, + ) + hg, err := hypergraphStore.LoadHypergraph(&tests.Nopthenticator{}, 1) + require.NoError(t, err) + + // Create RDF multiprover for setting up test data + rdfMultiprover := schema.NewRDFMultiprover( + &schema.TurtleRDFParser{}, + inclusionProver, + ) + + const currentFrame = uint64(1000) + const oldJoinFrame = uint64(100) // Will be pruned + + // Helper to create a prover with allocations, returning the prover address + createProverWithAllocations := func( + publicKey []byte, + filters [][]byte, + joinFrame uint64, + ) ([]byte, error) { + proverAddressBI, err := poseidon.HashBytes(publicKey) + if err != nil { + return nil, err + } + proverAddress := proverAddressBI.FillBytes(make([]byte, 32)) + + hgCRDT := hg.(*hgcrdt.HypergraphCRDT) + txn, err := hgCRDT.NewTransaction(false) + if err != nil { + return nil, err + } + + // Create prover vertex + proverTree := &tries.VectorCommitmentTree{} + _ = rdfMultiprover.Set(global.GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], + "prover:Prover", "PublicKey", publicKey, proverTree) + _ = rdfMultiprover.Set(global.GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], + "prover:Prover", "Status", []byte{0}, proverTree) + _ = rdfMultiprover.Set(global.GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], + "prover:Prover", "AvailableStorage", make([]byte, 8), proverTree) + _ = rdfMultiprover.Set(global.GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], + "prover:Prover", "Seniority", make([]byte, 8), proverTree) + + proverVertex := hgcrdt.NewVertex( + intrinsics.GLOBAL_INTRINSIC_ADDRESS, + [32]byte(proverAddress), + proverTree.Commit(inclusionProver, false), + big.NewInt(0), + ) + if err := hg.AddVertex(txn, proverVertex); err != nil { + txn.Abort() + return nil, err + } + + var proverVertexID [64]byte + copy(proverVertexID[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:]) + copy(proverVertexID[32:], proverAddress) + if err := hg.SetVertexData(txn, proverVertexID, proverTree); err != nil { + txn.Abort() + return nil, err + } + + hyperedge := hgcrdt.NewHyperedge( + intrinsics.GLOBAL_INTRINSIC_ADDRESS, + [32]byte(proverAddress), + ) + + // Create allocation vertices + for _, filter := range filters { + allocationAddressBI, _ := poseidon.HashBytes( + slices.Concat([]byte("PROVER_ALLOCATION"), publicKey, filter), + ) + allocationAddress := allocationAddressBI.FillBytes(make([]byte, 32)) + + allocationTree := &tries.VectorCommitmentTree{} + _ = rdfMultiprover.Set(global.GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], + "allocation:ProverAllocation", "Prover", proverAddress, allocationTree) + _ = rdfMultiprover.Set(global.GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], + "allocation:ProverAllocation", "Status", []byte{0}, allocationTree) + _ = rdfMultiprover.Set(global.GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], + "allocation:ProverAllocation", "ConfirmationFilter", filter, allocationTree) + + frameNumberBytes := make([]byte, 8) + binary.BigEndian.PutUint64(frameNumberBytes, joinFrame) + _ = rdfMultiprover.Set(global.GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], + "allocation:ProverAllocation", "JoinFrameNumber", frameNumberBytes, allocationTree) + + allocationVertex := hgcrdt.NewVertex( + intrinsics.GLOBAL_INTRINSIC_ADDRESS, + [32]byte(allocationAddress), + allocationTree.Commit(inclusionProver, false), + big.NewInt(0), + ) + if err := hg.AddVertex(txn, allocationVertex); err != nil { + txn.Abort() + return nil, err + } + + var allocationVertexID [64]byte + copy(allocationVertexID[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:]) + copy(allocationVertexID[32:], allocationAddress) + if err := hg.SetVertexData(txn, allocationVertexID, allocationTree); err != nil { + txn.Abort() + return nil, err + } + + hyperedge.AddExtrinsic(allocationVertex) + } + + if err := hg.AddHyperedge(txn, hyperedge); err != nil { + txn.Abort() + return nil, err + } + + if err := txn.Commit(); err != nil { + return nil, err + } + + return proverAddress, nil + } + + // Helper to delete ONLY the vertex ID set entry (not the vertex data) + // This simulates the state after a previous incomplete prune + deleteVertexIDSetOnly := func(publicKey []byte, filter []byte) error { + allocationAddressBI, _ := poseidon.HashBytes( + slices.Concat([]byte("PROVER_ALLOCATION"), publicKey, filter), + ) + allocationAddress := allocationAddressBI.FillBytes(make([]byte, 32)) + + var vertexID [64]byte + copy(vertexID[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:]) + copy(vertexID[32:], allocationAddress) + + hgCRDT := hg.(*hgcrdt.HypergraphCRDT) + txn, err := hgCRDT.NewTransaction(false) + if err != nil { + return err + } + + shardKey := tries.ShardKey{ + L1: [3]byte{0x00, 0x00, 0x00}, + L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)), + } + vtree := hgCRDT.GetVertexAddsSet(shardKey).GetTree() + + // Delete from ID set only + if err := vtree.Delete(txn, vertexID[:]); err != nil { + txn.Abort() + return err + } + + return txn.Commit() + } + + // Helper to check if vertex exists in ID set + vertexExistsInIDSet := func(vertexID [64]byte) bool { + _, err := hg.GetVertex(vertexID) + return err == nil + } + + // Helper to check if vertex data exists + vertexDataExists := func(vertexID [64]byte) bool { + data, err := hg.GetVertexData(vertexID) + return err == nil && data != nil + } + + // Helper to compute allocation vertex ID + getAllocationVertexID := func(publicKey, filter []byte) [64]byte { + allocationHash, _ := poseidon.HashBytes( + slices.Concat([]byte("PROVER_ALLOCATION"), publicKey, filter), + ) + var id [64]byte + copy(id[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:]) + copy(id[32:], allocationHash.FillBytes(make([]byte, 32))) + return id + } + + // Create a prover with 5 allocations + publicKey := bytes.Repeat([]byte{0x50}, 585) + filters := [][]byte{ + []byte("incomplete_filter_0"), + []byte("incomplete_filter_1"), + []byte("incomplete_filter_2"), + []byte("incomplete_filter_3"), + []byte("incomplete_filter_4"), + } + + proverAddr, err := createProverWithAllocations(publicKey, filters, oldJoinFrame) + require.NoError(t, err) + t.Logf("Created prover at address: %s", hex.EncodeToString(proverAddr)) + + // Verify all allocations exist (both ID set and data) + for i, filter := range filters { + allocID := getAllocationVertexID(publicKey, filter) + assert.True(t, vertexExistsInIDSet(allocID), + "Allocation %d should exist in ID set before manipulation", i) + assert.True(t, vertexDataExists(allocID), + "Allocation %d data should exist before manipulation", i) + } + + // Now delete the ID set entries for allocations 0, 1, and 2, but leave their vertex data + // This simulates the state after an incomplete prune (the original bug) + for i := 0; i < 3; i++ { + err := deleteVertexIDSetOnly(publicKey, filters[i]) + require.NoError(t, err) + t.Logf("Deleted ID set entry for allocation %d (leaving vertex data)", i) + } + + // Verify the incomplete state: ID set entries gone, but data remains + for i := 0; i < 3; i++ { + allocID := getAllocationVertexID(publicKey, filters[i]) + assert.False(t, vertexExistsInIDSet(allocID), + "Allocation %d should NOT exist in ID set after manipulation", i) + assert.True(t, vertexDataExists(allocID), + "Allocation %d data should STILL exist after manipulation (orphaned)", i) + } + + // Allocations 3 and 4 should still be complete + for i := 3; i < 5; i++ { + allocID := getAllocationVertexID(publicKey, filters[i]) + assert.True(t, vertexExistsInIDSet(allocID), + "Allocation %d should still exist in ID set", i) + assert.True(t, vertexDataExists(allocID), + "Allocation %d data should still exist", i) + } + + // Create registry - this will load the prover from vertex data iterator + // The allocations with missing ID set entries will still be in the cache + // because extractGlobalState reads from vertex DATA, not ID set + registry, err := NewProverRegistry(logger, hg) + require.NoError(t, err) + + // Verify the prover is in cache with all 5 allocations + // (because extractGlobalState reads from vertex data which still exists) + info, err := registry.GetProverInfo(proverAddr) + require.NoError(t, err) + require.NotNil(t, info) + t.Logf("Prover in cache has %d allocations", len(info.Allocations)) + + // Run pruning - this should handle the incomplete state gracefully + err = registry.PruneOrphanJoins(currentFrame) + require.NoError(t, err) + + // After pruning: + // - All allocation vertex DATA should be deleted (both orphaned and complete ones) + // - The prover should be removed since all allocations are gone + for i, filter := range filters { + allocID := getAllocationVertexID(publicKey, filter) + assert.False(t, vertexExistsInIDSet(allocID), + "Allocation %d should not exist in ID set after prune", i) + assert.False(t, vertexDataExists(allocID), + "Allocation %d data should be DELETED after prune", i) + } + + // Prover should be removed from cache via GetProverInfo + info, err = registry.GetProverInfo(proverAddr) + require.NoError(t, err) + assert.Nil(t, info, "Prover should be removed from cache after all allocations pruned") + + // Also verify through GetProvers that the prover is gone from all filters + for _, filter := range filters { + provers, err := registry.GetProvers(filter) + require.NoError(t, err) + for _, p := range provers { + assert.NotEqual(t, hex.EncodeToString(proverAddr), hex.EncodeToString(p.Address), + "Prover should not appear in GetProvers for filter %s", string(filter)) + } + } + + // Verify through GetProversByStatus that the prover is gone + joiningProvers, err := registry.GetProversByStatus(nil, consensus.ProverStatusJoining) + require.NoError(t, err) + for _, p := range joiningProvers { + assert.NotEqual(t, hex.EncodeToString(proverAddr), hex.EncodeToString(p.Address), + "Prover should not appear in GetProversByStatus(Joining)") + } + + t.Logf("Incomplete state prune test completed successfully") + t.Logf(" - 3 allocations with missing ID set entries: vertex data cleaned up") + t.Logf(" - 2 allocations with complete state: fully pruned") + t.Logf(" - Prover removed after all allocations pruned") + t.Logf(" - Registry methods confirm prover is gone") +} diff --git a/node/crypto/proof_tree_test.go b/node/crypto/proof_tree_test.go index 7c62a09..7763c5a 100644 --- a/node/crypto/proof_tree_test.go +++ b/node/crypto/proof_tree_test.go @@ -588,6 +588,349 @@ func TestTreeLongestBranchNoBLS(t *testing.T) { } } +// TestTreeNoStaleNodesAfterDelete tests that deleting nodes does not leave +// orphaned/stale tree nodes in the database. This specifically tests the case +// where branch merging occurs during deletion. +func TestTreeNoStaleNodesAfterDeleteNoBLS(t *testing.T) { + l, _ := zap.NewProduction() + db := store.NewPebbleDB(l, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, 0) + s := store.NewPebbleHypergraphStore(&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, db, l, verencr, nil) + shardKey := tries.ShardKey{} + tree := &tries.LazyVectorCommitmentTree{Store: s, SetType: "vertex", PhaseType: "adds", ShardKey: shardKey} + + // Create keys that will force branch creation and then merging on deletion. + // We want keys that share a prefix so that: + // 1. Inserting them creates branch nodes + // 2. Deleting some of them causes branch merging (childCount == 1 case) + + // Create 3 keys with same first 8 bytes, different after + commonPrefix := make([]byte, 8) + rand.Read(commonPrefix) + + keys := make([][]byte, 3) + values := make([][]byte, 3) + + for i := 0; i < 3; i++ { + key := make([]byte, 64) + copy(key[:8], commonPrefix) + // Vary byte 8 to create branching + key[8] = byte(i * 64) // 0x00, 0x40, 0x80 + rand.Read(key[9:]) + keys[i] = key + + val := make([]byte, 32) + rand.Read(val) + values[i] = val + + err := tree.Insert(nil, key, val, nil, big.NewInt(1)) + if err != nil { + t.Fatalf("Failed to insert key %d: %v", i, err) + } + } + + // Verify all 3 keys exist + for i, key := range keys { + _, err := tree.Get(key) + if err != nil { + t.Fatalf("Key %d not found after insert: %v", i, err) + } + } + + // Count tree nodes before deletion + nodesBefore := countTreeNodes(t, s, shardKey) + t.Logf("Tree nodes before deletion: %d", nodesBefore) + + // Delete one key - this should trigger branch merging + err := tree.Delete(nil, keys[1]) + if err != nil { + t.Fatalf("Failed to delete key 1: %v", err) + } + + // Verify key 1 is gone + _, err = tree.Get(keys[1]) + if err == nil { + t.Fatal("Key 1 should not exist after deletion") + } + + // Verify keys 0 and 2 still exist + for _, i := range []int{0, 2} { + _, err := tree.Get(keys[i]) + if err != nil { + t.Fatalf("Key %d not found after deleting key 1: %v", i, err) + } + } + + // Count tree nodes after deletion + nodesAfter := countTreeNodes(t, s, shardKey) + t.Logf("Tree nodes after deletion: %d", nodesAfter) + + // Now verify that all stored nodes are actually reachable from the root. + // This is the critical check - any unreachable nodes are "stale". + reachableNodes := countReachableNodes(t, tree) + t.Logf("Reachable nodes from root: %d", reachableNodes) + + if nodesAfter != reachableNodes { + t.Errorf("STALE NODES DETECTED: stored=%d, reachable=%d, stale=%d", + nodesAfter, reachableNodes, nodesAfter-reachableNodes) + } + + // More aggressive test: delete all but one key + err = tree.Delete(nil, keys[2]) + if err != nil { + t.Fatalf("Failed to delete key 2: %v", err) + } + + nodesAfterSecondDelete := countTreeNodes(t, s, shardKey) + reachableAfterSecondDelete := countReachableNodes(t, tree) + t.Logf("After second delete: stored=%d, reachable=%d", nodesAfterSecondDelete, reachableAfterSecondDelete) + + if nodesAfterSecondDelete != reachableAfterSecondDelete { + t.Errorf("STALE NODES DETECTED after second delete: stored=%d, reachable=%d, stale=%d", + nodesAfterSecondDelete, reachableAfterSecondDelete, nodesAfterSecondDelete-reachableAfterSecondDelete) + } + + // Delete the last key - tree should be empty + err = tree.Delete(nil, keys[0]) + if err != nil { + t.Fatalf("Failed to delete key 0: %v", err) + } + + nodesAfterAllDeleted := countTreeNodes(t, s, shardKey) + t.Logf("After all deleted: stored=%d", nodesAfterAllDeleted) + + // There should be no tree nodes left (except possibly the root marker) + if nodesAfterAllDeleted > 1 { + t.Errorf("STALE NODES DETECTED after all deleted: stored=%d (expected 0 or 1)", + nodesAfterAllDeleted) + } +} + +// TestTreeNoStaleNodesAfterBranchMerge specifically tests the case where +// deleting a node causes a branch to merge with its only remaining child branch. +// This tests the FullPrefix update bug hypothesis. +func TestTreeNoStaleNodesAfterBranchMergeNoBLS(t *testing.T) { + l, _ := zap.NewProduction() + db := store.NewPebbleDB(l, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, 0) + s := store.NewPebbleHypergraphStore(&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, db, l, verencr, nil) + shardKey := tries.ShardKey{} + tree := &tries.LazyVectorCommitmentTree{Store: s, SetType: "vertex", PhaseType: "adds", ShardKey: shardKey} + + // To trigger a branch-to-branch merge during deletion, we need: + // 1. A parent branch with exactly 2 children + // 2. One child is a leaf (to be deleted) + // 3. The other child is a branch node + // + // Structure we want to create: + // Root (branch) + // ├── Child A (leaf) - will be deleted + // └── Child B (branch) + // ├── Grandchild 1 (leaf) + // └── Grandchild 2 (leaf) + // + // After deleting Child A, Root should merge with Child B. + + // Keys with controlled nibbles to create specific tree structure + // Nibble = 6 bits, so first nibble is bits 0-5 of first byte + + // Key A: first nibble = 0 (byte[0] bits 7-2 = 000000) + keyA := make([]byte, 64) + keyA[0] = 0x00 // First nibble = 0 + rand.Read(keyA[1:]) + + // Keys for branch B children: first nibble = 1 (byte[0] bits 7-2 = 000001) + // They share first nibble (1) but differ at second nibble + keyB1 := make([]byte, 64) + keyB1[0] = 0x04 // First nibble = 1 (binary: 000001 << 2 = 00000100) + keyB1[1] = 0x00 // Second nibble differs + rand.Read(keyB1[2:]) + + keyB2 := make([]byte, 64) + keyB2[0] = 0x04 // First nibble = 1 + keyB2[1] = 0x40 // Second nibble differs (binary: 010000...) + rand.Read(keyB2[2:]) + + // Insert all keys + val := make([]byte, 32) + rand.Read(val) + if err := tree.Insert(nil, keyA, val, nil, big.NewInt(1)); err != nil { + t.Fatalf("Failed to insert keyA: %v", err) + } + + rand.Read(val) + if err := tree.Insert(nil, keyB1, val, nil, big.NewInt(1)); err != nil { + t.Fatalf("Failed to insert keyB1: %v", err) + } + + rand.Read(val) + if err := tree.Insert(nil, keyB2, val, nil, big.NewInt(1)); err != nil { + t.Fatalf("Failed to insert keyB2: %v", err) + } + + // Verify structure + nodesBefore := countTreeNodes(t, s, shardKey) + reachableBefore := countReachableNodes(t, tree) + t.Logf("Before deletion: stored=%d, reachable=%d", nodesBefore, reachableBefore) + + if nodesBefore != reachableBefore { + t.Errorf("STALE NODES before deletion: stored=%d, reachable=%d", nodesBefore, reachableBefore) + } + + // Delete keyA - this should trigger the merge of root with child B + if err := tree.Delete(nil, keyA); err != nil { + t.Fatalf("Failed to delete keyA: %v", err) + } + + // Verify keyA is gone + if _, err := tree.Get(keyA); err == nil { + t.Fatal("keyA should not exist after deletion") + } + + // Verify B keys still exist + if _, err := tree.Get(keyB1); err != nil { + t.Fatalf("keyB1 not found after deletion: %v", err) + } + if _, err := tree.Get(keyB2); err != nil { + t.Fatalf("keyB2 not found after deletion: %v", err) + } + + // Check for stale nodes + nodesAfter := countTreeNodes(t, s, shardKey) + reachableAfter := countReachableNodes(t, tree) + t.Logf("After deletion: stored=%d, reachable=%d", nodesAfter, reachableAfter) + + if nodesAfter != reachableAfter { + t.Errorf("STALE NODES DETECTED after branch merge: stored=%d, reachable=%d, stale=%d", + nodesAfter, reachableAfter, nodesAfter-reachableAfter) + } +} + +// TestTreeNoStaleNodesAfterMassDelete tests stale node detection with many keys +func TestTreeNoStaleNodesAfterMassDeleteNoBLS(t *testing.T) { + l, _ := zap.NewProduction() + db := store.NewPebbleDB(l, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, 0) + s := store.NewPebbleHypergraphStore(&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, db, l, verencr, nil) + shardKey := tries.ShardKey{} + tree := &tries.LazyVectorCommitmentTree{Store: s, SetType: "vertex", PhaseType: "adds", ShardKey: shardKey} + + // Insert 1000 random keys + numKeys := 1000 + keys := make([][]byte, numKeys) + + for i := 0; i < numKeys; i++ { + key := make([]byte, 64) + rand.Read(key) + keys[i] = key + + val := make([]byte, 32) + rand.Read(val) + + err := tree.Insert(nil, key, val, nil, big.NewInt(1)) + if err != nil { + t.Fatalf("Failed to insert key %d: %v", i, err) + } + } + + nodesBefore := countTreeNodes(t, s, shardKey) + reachableBefore := countReachableNodes(t, tree) + t.Logf("Before deletion: stored=%d, reachable=%d", nodesBefore, reachableBefore) + + if nodesBefore != reachableBefore { + t.Errorf("STALE NODES before deletion: stored=%d, reachable=%d", nodesBefore, reachableBefore) + } + + // Delete half the keys in random order + mrand.Shuffle(numKeys, func(i, j int) { + keys[i], keys[j] = keys[j], keys[i] + }) + + for i := 0; i < numKeys/2; i++ { + err := tree.Delete(nil, keys[i]) + if err != nil { + t.Fatalf("Failed to delete key %d: %v", i, err) + } + } + + nodesAfter := countTreeNodes(t, s, shardKey) + reachableAfter := countReachableNodes(t, tree) + t.Logf("After deleting half: stored=%d, reachable=%d", nodesAfter, reachableAfter) + + if nodesAfter != reachableAfter { + t.Errorf("STALE NODES DETECTED: stored=%d, reachable=%d, stale=%d", + nodesAfter, reachableAfter, nodesAfter-reachableAfter) + } + + // Verify remaining keys still accessible + for i := numKeys / 2; i < numKeys; i++ { + _, err := tree.Get(keys[i]) + if err != nil { + t.Errorf("Key %d not found after mass deletion: %v", i, err) + } + } +} + +// countTreeNodes counts all tree nodes stored in the database for a given shard +func countTreeNodes(t *testing.T, s *store.PebbleHypergraphStore, shardKey tries.ShardKey) int { + count := 0 + + // Use the store's iterator to count nodes + iter, err := s.IterateRawLeaves("vertex", "adds", shardKey) + if err != nil { + t.Fatalf("Failed to create iterator: %v", err) + } + defer iter.Close() + + // Count all entries (both branch and leaf nodes) + for valid := iter.First(); valid; valid = iter.Next() { + count++ + } + + return count +} + +// countReachableNodes counts nodes reachable from the tree root by walking the tree +func countReachableNodes(t *testing.T, tree *tries.LazyVectorCommitmentTree) int { + if tree.Root == nil { + return 0 + } + + count := 0 + var walk func(node tries.LazyVectorCommitmentNode) + walk = func(node tries.LazyVectorCommitmentNode) { + if node == nil { + return + } + count++ + + switch n := node.(type) { + case *tries.LazyVectorCommitmentBranchNode: + for i := 0; i < 64; i++ { + child := n.Children[i] + if child == nil { + // Try to load from store + var err error + child, err = tree.Store.GetNodeByPath( + tree.SetType, + tree.PhaseType, + tree.ShardKey, + append(n.FullPrefix, i), + ) + if err != nil { + continue + } + } + if child != nil { + walk(child) + } + } + case *tries.LazyVectorCommitmentLeafNode: + // Leaf node, nothing more to walk + } + } + + walk(tree.Root) + return count +} + // TestTreeBranchStructure tests that the tree structure is preserved after // adding and removing leaves that cause branch creation due to shared prefixes. func TestTreeBranchStructureNoBLS(t *testing.T) { diff --git a/node/crypto/rdf_multiprover_integration_test.go b/node/crypto/rdf_multiprover_integration_test.go index adf1773..4b76272 100644 --- a/node/crypto/rdf_multiprover_integration_test.go +++ b/node/crypto/rdf_multiprover_integration_test.go @@ -5,6 +5,7 @@ package crypto_test import ( "bytes" + "fmt" "math/big" "testing" @@ -16,6 +17,265 @@ import ( qcrypto "source.quilibrium.com/quilibrium/monorepo/types/tries" ) +func TestName(t *testing.T) { + rdfDoc := ` +@prefix qcl: . +@prefix rdfs: . +@prefix name: . + +name:NameRecord a rdfs:Class ; + rdfs:comment "Quilibrium name service record" . + +name:Name a rdfs:Property ; + rdfs:range name:NameRecord ; + rdfs:domain qcl:String ; + qcl:size 32 ; + qcl:order 1 . + +name:AuthorityKey a rdfs:Property ; + rdfs:range name:NameRecord ; + rdfs:domain qcl:ByteArray ; + qcl:size 57 ; + qcl:order 2 . + +name:Parent a rdfs:Property ; + rdfs:range name:NameRecord ; + rdfs:domain qcl:ByteArray ; + qcl:size 32 ; + qcl:order 3 . + +name:CreatedAt a rdfs:Property ; + rdfs:range name:NameRecord ; + rdfs:domain qcl:Uint ; + qcl:size 64 ; + qcl:order 4 . + +name:UpdatedAt a rdfs:Property ; + rdfs:range name:NameRecord ; + rdfs:domain qcl:Uint ; + qcl:size 64 ; + qcl:order 5 . + +name:RecordType a rdfs:Property ; + rdfs:range name:NameRecord ; + rdfs:domain qcl:Uint ; + qcl:size 8 ; + qcl:order 6 . + +name:Data a rdfs:Property ; + rdfs:range name:NameRecord ; + rdfs:domain qcl:String ; + qcl:size 64 ; + qcl:order 7 . + ` + // Create components + parser := &schema.TurtleRDFParser{} + log := zap.NewNop() + inclusionProver := bls48581.NewKZGInclusionProver(log) + multiprover := schema.NewRDFMultiprover(parser, inclusionProver) + + // Create a test tree + tree := &qcrypto.VectorCommitmentTree{} + + // Insert test data at the correct indices + // The tree needs to have data at indices that will be used in the polynomial + // Insert enough data to ensure polynomial has values at indices 1, 2, 3 + for i := 0; i < 63; i++ { + data := bytes.Repeat([]byte{byte(i + 1)}, 57) + err := tree.Insert([]byte{byte(i << 2)}, data, nil, big.NewInt(57)) + require.NoError(t, err) + } + tree.Commit(inclusionProver, false) + + t.Run("Prove", func(t *testing.T) { + fields := []string{"Name", "AuthorityKey"} + proof, err := multiprover.Prove(rdfDoc, fields, tree) + pb, _ := proof.ToBytes() + fmt.Printf("proof %x\n", pb) + require.NoError(t, err) + assert.NotNil(t, proof) + }) + + t.Run("ProveWithType", func(t *testing.T) { + fields := []string{"Name", "AuthorityKey"} + typeIndex := uint64(63) + proof, err := multiprover.ProveWithType(rdfDoc, fields, tree, &typeIndex) + require.NoError(t, err) + assert.NotNil(t, proof) + + pb, _ := proof.ToBytes() + fmt.Printf("proof with type %x\n", pb) + // Test without type index + proof, err = multiprover.ProveWithType(rdfDoc, fields, tree, nil) + require.NoError(t, err) + assert.NotNil(t, proof) + pb, _ = proof.ToBytes() + + fmt.Printf("proof with type but type is nil %x\n", pb) + }) + + t.Run("Get", func(t *testing.T) { + // Test getting name field (order 1, so key is 1<<2 = 4, data at index 1) + value, err := multiprover.Get(rdfDoc, "name:NameRecord", "Name", tree) + require.NoError(t, err) + assert.Equal(t, bytes.Repeat([]byte{2}, 57), value) // Index 1 has value 2 + + // Test getting one-time key field (order 2, so key is 2<<2 = 8, data at index 2) + value, err = multiprover.Get(rdfDoc, "name:NameRecord", "AuthorityKey", tree) + require.NoError(t, err) + assert.Equal(t, bytes.Repeat([]byte{3}, 57), value) // Index 2 has value 3 + }) + + t.Run("GetFieldOrder", func(t *testing.T) { + order, maxOrder, err := multiprover.GetFieldOrder(rdfDoc, "name:NameRecord", "Name") + require.NoError(t, err) + assert.Equal(t, 1, order) + assert.Equal(t, 7, maxOrder) + + order, maxOrder, err = multiprover.GetFieldOrder(rdfDoc, "name:NameRecord", "AuthorityKey") + require.NoError(t, err) + assert.Equal(t, 2, order) + assert.Equal(t, 7, maxOrder) + + order, maxOrder, err = multiprover.GetFieldOrder(rdfDoc, "name:NameRecord", "Parent") + require.NoError(t, err) + assert.Equal(t, 3, order) + assert.Equal(t, 7, maxOrder) + }) + + t.Run("GetFieldKey", func(t *testing.T) { + key, err := multiprover.GetFieldKey(rdfDoc, "name:NameRecord", "Name") + require.NoError(t, err) + assert.Equal(t, []byte{1 << 2}, key) + + key, err = multiprover.GetFieldKey(rdfDoc, "name:NameRecord", "AuthorityKey") + require.NoError(t, err) + assert.Equal(t, []byte{2 << 2}, key) + }) + + t.Run("Verify", func(t *testing.T) { + // Create proof without type index for simpler verification + fields := []string{"Name", "AuthorityKey", "Parent"} + proof, err := multiprover.ProveWithType(rdfDoc, fields, tree, nil) + require.NoError(t, err) + + // Get actual data from tree for verification + data := make([][]byte, len(fields)) + for i, field := range fields { + value, err := multiprover.Get(rdfDoc, "name:NameRecord", field, tree) + require.NoError(t, err) + data[i] = value + } + + // Create commit + commit := tree.Commit(inclusionProver, false) + proofBytes, _ := proof.ToBytes() + + // Verify should pass with correct data + // keys parameter is nil to use default index-based keys + valid, err := multiprover.Verify(rdfDoc, fields, nil, commit, proofBytes, data) + require.NoError(t, err) + assert.True(t, valid) + + // Verify should fail with wrong data + wrongData := make([][]byte, len(fields)) + for i := range wrongData { + wrongData[i] = []byte("wrong data") + } + valid, err = multiprover.Verify(rdfDoc, fields, nil, commit, proofBytes, wrongData) + require.NoError(t, err) + assert.False(t, valid) + + // Verify should error with non-existent field + invalidFields := []string{"Name", "NonExistent"} + _, err = multiprover.Verify(rdfDoc, invalidFields, nil, commit, proofBytes, data[:2]) + assert.Error(t, err) + assert.Contains(t, err.Error(), "not found") + }) + + t.Run("VerifyWithType", func(t *testing.T) { + // Add type marker to tree + typeData := bytes.Repeat([]byte{0xff}, 32) + typeIndex := uint64(63) + err := tree.Insert(typeData, typeData, nil, big.NewInt(32)) + require.NoError(t, err) + + // Get commit after all data is inserted + commit := tree.Commit(inclusionProver, false) + + // Create proof with type + fields := []string{"Name", "AuthorityKey"} + proof, err := multiprover.ProveWithType(rdfDoc, fields, tree, &typeIndex) + require.NoError(t, err) + + // Get actual data from tree + data := make([][]byte, len(fields)) + for i, field := range fields { + value, err := multiprover.Get(rdfDoc, "name:NameRecord", field, tree) + require.NoError(t, err) + data[i] = value + } + + proofBytes, _ := proof.ToBytes() + + // Verify with type should pass + valid, err := multiprover.VerifyWithType(rdfDoc, fields, nil, commit, proofBytes, data, &typeIndex, typeData) + require.NoError(t, err) + assert.True(t, valid) + + // Verify without type when proof was created with type should fail + valid, err = multiprover.VerifyWithType(rdfDoc, fields, nil, commit, proofBytes, data, nil, nil) + require.NoError(t, err) + assert.False(t, valid) // Should fail due to missing type data + + // Create proof without type + proofNoType, err := multiprover.ProveWithType(rdfDoc, fields, tree, nil) + require.NoError(t, err) + proofNoTypeBytes, _ := proofNoType.ToBytes() + + // Verify without type should pass + valid, err = multiprover.VerifyWithType(rdfDoc, fields, nil, commit, proofNoTypeBytes, data, nil, nil) + require.NoError(t, err) + assert.True(t, valid) + + // Verify with wrong type data should fail + wrongTypeData := []byte("wrong type data") + valid, err = multiprover.VerifyWithType(rdfDoc, fields, nil, commit, proofBytes, data, &typeIndex, wrongTypeData) + require.NoError(t, err) + assert.False(t, valid) + + // Verify with different type index but same data should still fail + // because the hash uses the fixed key bytes.Repeat([]byte{0xff}, 32) + differentTypeIndex := uint64(50) + valid, err = multiprover.VerifyWithType(rdfDoc, fields, nil, commit, proofBytes, data, &differentTypeIndex, typeData) + require.NoError(t, err) + assert.False(t, valid) // Should fail because proof was created with index 63 + }) + + t.Run("ErrorCases", func(t *testing.T) { + // Test non-existent field + _, err := multiprover.Get(rdfDoc, "name:NameRecord", "NonExistent", tree) + assert.Error(t, err) + assert.Contains(t, err.Error(), "not found") + + // Test invalid document + _, err = multiprover.Get("invalid rdf", "name:NameRecord", "name", tree) + assert.Error(t, err) + + // Test verify with mismatched data count + fields := []string{"Name", "AuthorityKey"} + proof, err := multiprover.ProveWithType(rdfDoc, fields, tree, nil) + require.NoError(t, err) + + // Wrong number of data elements + wrongData := [][]byte{[]byte("data1")} + commit := tree.Commit(inclusionProver, false) + _, err = multiprover.Verify(rdfDoc, fields, nil, commit, proof.GetProof(), wrongData) + assert.Error(t, err) + assert.Contains(t, err.Error(), "fields and data length mismatch") + }) +} + func TestRDFMultiprover(t *testing.T) { // Create test RDF document rdfDoc := ` diff --git a/node/dbscan/main.go b/node/dbscan/main.go index 7766a7c..8d9dea3 100644 --- a/node/dbscan/main.go +++ b/node/dbscan/main.go @@ -35,8 +35,28 @@ var ( ) configDirectory2 = flag.String( "config2", - filepath.Join(".", ".config"), - "the configuration directory", + "", + "the second configuration directory (optional, enables comparison mode)", + ) + keyPrefix = flag.String( + "prefix", + "", + "hex-encoded key prefix to filter results (e.g., '09' for hypergraph keys)", + ) + searchKey = flag.String( + "search", + "", + "hex-encoded key substring to search for in keys", + ) + searchValue = flag.String( + "search-value", + "", + "hex-encoded substring to search for in values", + ) + maxResults = flag.Int( + "max", + 0, + "maximum number of results to display (0 = unlimited)", ) // *char flags @@ -50,6 +70,34 @@ func main() { config.Flags(&char, &ver) flag.Parse() + // Parse filter options + var prefixFilter []byte + if *keyPrefix != "" { + var err error + prefixFilter, err = hex.DecodeString(*keyPrefix) + if err != nil { + log.Fatalf("invalid prefix hex: %v", err) + } + } + + var keySearchPattern []byte + if *searchKey != "" { + var err error + keySearchPattern, err = hex.DecodeString(*searchKey) + if err != nil { + log.Fatalf("invalid search hex: %v", err) + } + } + + var valueSearchPattern []byte + if *searchValue != "" { + var err error + valueSearchPattern, err = hex.DecodeString(*searchValue) + if err != nil { + log.Fatalf("invalid search-value hex: %v", err) + } + } + nodeConfig1, err := config.LoadConfig(*configDirectory1, "", false) if err != nil { log.Fatal("failed to load config", err) @@ -64,9 +112,102 @@ func main() { db1 := store.NewPebbleDB(logger, nodeConfig1.DB, uint(0)) defer db1.Close() - iter1, err := db1.NewIter([]byte{0x00}, []byte{0xff}) + // Determine iteration bounds based on prefix filter + lowerBound := []byte{0x00} + upperBound := []byte{0xff} + if len(prefixFilter) > 0 { + lowerBound = prefixFilter + // Create upper bound by incrementing the last byte of the prefix + upperBound = make([]byte, len(prefixFilter)) + copy(upperBound, prefixFilter) + for i := len(upperBound) - 1; i >= 0; i-- { + if upperBound[i] < 0xff { + upperBound[i]++ + break + } + upperBound[i] = 0x00 + if i == 0 { + // Prefix is all 0xff, scan to end + upperBound = []byte{0xff} + } + } + } + + // Single database mode (read-only dump) + if *configDirectory2 == "" { + runSingleDBMode(db1, lowerBound, upperBound, prefixFilter, keySearchPattern, valueSearchPattern, logger) + return + } + + // Comparison mode (two databases) + runCompareMode(db1, lowerBound, upperBound, prefixFilter, keySearchPattern, valueSearchPattern, logger) +} + +func runSingleDBMode( + db1 *store.PebbleDB, + lowerBound, upperBound []byte, + prefixFilter, keySearchPattern, valueSearchPattern []byte, + logger *zap.Logger, +) { + iter1, err := db1.NewIter(lowerBound, upperBound) if err != nil { logger.Error("failed to create iterator", zap.Error(err)) + return + } + defer iter1.Close() + + count := 0 + matched := 0 + + for iter1.First(); iter1.Valid(); iter1.Next() { + key := iter1.Key() + value := iter1.Value() + + // Apply prefix filter + if len(prefixFilter) > 0 && !bytes.HasPrefix(key, prefixFilter) { + continue + } + + // Apply key search pattern + if len(keySearchPattern) > 0 && !bytes.Contains(key, keySearchPattern) { + continue + } + + // Apply value search pattern + if len(valueSearchPattern) > 0 && !bytes.Contains(value, valueSearchPattern) { + continue + } + + count++ + matched++ + + decoded := decodeValue(key, value) + fmt.Printf( + "key: %s\nsemantic: %s\nvalue:\n%s\n\n", + hex.EncodeToString(key), + describeKey(key), + indent(decoded), + ) + + if *maxResults > 0 && matched >= *maxResults { + fmt.Printf("... (stopped after %d results, use -max to change limit)\n", *maxResults) + break + } + } + + fmt.Printf("\nsummary: %d keys displayed from %s\n", matched, *configDirectory1) +} + +func runCompareMode( + db1 *store.PebbleDB, + lowerBound, upperBound []byte, + prefixFilter, keySearchPattern, valueSearchPattern []byte, + logger *zap.Logger, +) { + iter1, err := db1.NewIter(lowerBound, upperBound) + if err != nil { + logger.Error("failed to create iterator", zap.Error(err)) + return } defer iter1.Close() @@ -78,9 +219,10 @@ func main() { db2 := store.NewPebbleDB(logger, nodeConfig2.DB, uint(0)) defer db2.Close() - iter2, err := db2.NewIter([]byte{0x00}, []byte{0xff}) + iter2, err := db2.NewIter(lowerBound, upperBound) if err != nil { logger.Error("failed to create iterator", zap.Error(err)) + return } defer iter2.Close() @@ -90,8 +232,22 @@ func main() { onlyDB1 := 0 onlyDB2 := 0 valueDiff := 0 + matched := 0 keyPresenceMap := make(map[string]*keyPresence) + shouldInclude := func(key, value []byte) bool { + if len(prefixFilter) > 0 && !bytes.HasPrefix(key, prefixFilter) { + return false + } + if len(keySearchPattern) > 0 && !bytes.Contains(key, keySearchPattern) { + return false + } + if len(valueSearchPattern) > 0 && !bytes.Contains(value, valueSearchPattern) { + return false + } + return true + } + for iter1Valid || iter2Valid { var key1, key2 []byte var value1, value2 []byte @@ -114,34 +270,86 @@ func main() { case iter1Valid && iter2Valid: comparison := bytes.Compare(key1, key2) if comparison == 0 { - if bytes.Equal(value1, value2) { - fmt.Printf( - "key: %s\nsemantic: %s\nvalues identical in %s and %s\nvalue:\n%s\n\n", - shortHex(key1), - describeKey(key1), - *configDirectory1, - *configDirectory2, - indent(decoded1), - ) - } else { - valueDiff++ - fmt.Printf( - "key: %s\nsemantic: %s\nvalue (%s):\n%s\nvalue (%s):\n%s\n", - shortHex(key1), - describeKey(key1), - *configDirectory1, - indent(decoded1), - *configDirectory2, - indent(decoded2), - ) - if diff := diffStrings(decoded1, decoded2); diff != "" { - fmt.Printf("diff:\n%s\n", indent(diff)) + if shouldInclude(key1, value1) || shouldInclude(key2, value2) { + matched++ + if *maxResults > 0 && matched > *maxResults { + fmt.Printf("... (stopped after %d results)\n", *maxResults) + goto done + } + + if bytes.Equal(value1, value2) { + fmt.Printf( + "key: %s\nsemantic: %s\nvalues identical in %s and %s\nvalue:\n%s\n\n", + shortHex(key1), + describeKey(key1), + *configDirectory1, + *configDirectory2, + indent(decoded1), + ) + } else { + valueDiff++ + fmt.Printf( + "key: %s\nsemantic: %s\nvalue (%s):\n%s\nvalue (%s):\n%s\n", + shortHex(key1), + describeKey(key1), + *configDirectory1, + indent(decoded1), + *configDirectory2, + indent(decoded2), + ) + if diff := diffStrings(decoded1, decoded2); diff != "" { + fmt.Printf("diff:\n%s\n", indent(diff)) + } + fmt.Printf("\n") } - fmt.Printf("\n") } iter1Valid = iter1.Next() iter2Valid = iter2.Next() } else if comparison < 0 { + if shouldInclude(key1, value1) { + matched++ + if *maxResults > 0 && matched > *maxResults { + fmt.Printf("... (stopped after %d results)\n", *maxResults) + goto done + } + + onlyDB1++ + fmt.Printf( + "key only in %s: %s\nsemantic: %s\nvalue:\n%s\n\n", + *configDirectory1, + shortHex(key1), + describeKey(key1), + indent(decoded1), + ) + } + iter1Valid = iter1.Next() + } else { + if shouldInclude(key2, value2) { + matched++ + if *maxResults > 0 && matched > *maxResults { + fmt.Printf("... (stopped after %d results)\n", *maxResults) + goto done + } + + onlyDB2++ + fmt.Printf( + "key only in %s: %s\nsemantic: %s\nvalue:\n%s\n\n", + *configDirectory2, + shortHex(key2), + describeKey(key2), + indent(decoded2), + ) + } + iter2Valid = iter2.Next() + } + case iter1Valid: + if shouldInclude(key1, value1) { + matched++ + if *maxResults > 0 && matched > *maxResults { + fmt.Printf("... (stopped after %d results)\n", *maxResults) + goto done + } + onlyDB1++ fmt.Printf( "key only in %s: %s\nsemantic: %s\nvalue:\n%s\n\n", @@ -150,8 +358,16 @@ func main() { describeKey(key1), indent(decoded1), ) - iter1Valid = iter1.Next() - } else { + } + iter1Valid = iter1.Next() + case iter2Valid: + if shouldInclude(key2, value2) { + matched++ + if *maxResults > 0 && matched > *maxResults { + fmt.Printf("... (stopped after %d results)\n", *maxResults) + goto done + } + onlyDB2++ fmt.Printf( "key only in %s: %s\nsemantic: %s\nvalue:\n%s\n\n", @@ -160,31 +376,12 @@ func main() { describeKey(key2), indent(decoded2), ) - iter2Valid = iter2.Next() } - case iter1Valid: - onlyDB1++ - fmt.Printf( - "key only in %s: %s\nsemantic: %s\nvalue:\n%s\n\n", - *configDirectory1, - shortHex(key1), - describeKey(key1), - indent(decoded1), - ) - iter1Valid = iter1.Next() - case iter2Valid: - onlyDB2++ - fmt.Printf( - "key only in %s: %s\nsemantic: %s\nvalue:\n%s\n\n", - *configDirectory2, - shortHex(key2), - describeKey(key2), - indent(decoded2), - ) iter2Valid = iter2.Next() } } +done: fmt.Printf( "summary: %d keys only in %s, %d keys only in %s, %d keys with differing values\n", onlyDB1, @@ -687,7 +884,7 @@ func decodeHypergraphValue(key []byte, value []byte) string { switch sub { case store.VERTEX_DATA: - return summarizeVectorCommitmentTree(value) + return summarizeVectorCommitmentTree(key, value) case store.VERTEX_TOMBSTONE: return shortHex(value) case store.VERTEX_ADDS_TREE_NODE, @@ -765,8 +962,11 @@ func decodeHypergraphProto(value []byte) (string, bool) { return output, matched } -func summarizeVectorCommitmentTree(value []byte) string { - _, err := tries.DeserializeNonLazyTree(value) +// Global intrinsic address (32 bytes of 0xff) +var globalIntrinsicAddress = bytes.Repeat([]byte{0xff}, 32) + +func summarizeVectorCommitmentTree(key []byte, value []byte) string { + tree, err := tries.DeserializeNonLazyTree(value) if err != nil { return fmt.Sprintf( "vector_commitment_tree decode_error=%v raw=%s", @@ -780,6 +980,24 @@ func summarizeVectorCommitmentTree(value []byte) string { "size_bytes": len(value), "sha256": shortHex(sum[:]), } + + // Check if this is a global intrinsic vertex (domain = 0xff*32) + // Key structure for vertex data: {0x09, 0xF0, domain[32], address[32]} + if len(key) >= 66 { + domain := key[2:34] + address := key[34:66] + + if bytes.Equal(domain, globalIntrinsicAddress) { + // This is a global intrinsic vertex - decode the fields + globalData := decodeGlobalIntrinsicVertex(tree, address) + if globalData != nil { + for k, v := range globalData { + summary[k] = v + } + } + } + } + jsonBytes, err := json.MarshalIndent(summary, "", " ") if err != nil { return fmt.Sprintf("vector_commitment_tree size_bytes=%d", len(value)) @@ -788,6 +1006,188 @@ func summarizeVectorCommitmentTree(value []byte) string { return string(jsonBytes) } +// decodeGlobalIntrinsicVertex attempts to decode the vertex as a global intrinsic type +// (prover, allocation, or reward) +func decodeGlobalIntrinsicVertex(tree *tries.VectorCommitmentTree, address []byte) map[string]any { + result := make(map[string]any) + result["vertex_address"] = hex.EncodeToString(address) + + // Try to detect the type by examining which fields exist + // Prover has PublicKey at order 0 (key 0x00) with size 585 + // Allocation has Prover reference at order 0 (key 0x00) + // Reward has DelegateAddress at order 0 (key 0x00) with size 32 + + // Check order 0 field + order0Value, err := tree.Get([]byte{0x00}) + if err != nil || len(order0Value) == 0 { + result["type"] = "unknown (no order 0 field)" + return result + } + + switch len(order0Value) { + case 585: + // Prover: PublicKey is 585 bytes + result["type"] = "prover:Prover" + result["public_key"] = shortHex(order0Value) + decodeProverFields(tree, result) + case 32: + // Could be Allocation (Prover reference) or Reward (DelegateAddress) + // Check for allocation-specific fields + confirmFilter, _ := tree.Get([]byte{0x08}) // order 2 + if len(confirmFilter) > 0 || len(confirmFilter) == 0 { + // Check if JoinFrameNumber exists (order 4, key 0x10) + joinFrame, _ := tree.Get([]byte{0x10}) + if len(joinFrame) == 8 { + result["type"] = "allocation:ProverAllocation" + result["prover_reference"] = hex.EncodeToString(order0Value) + decodeAllocationFields(tree, result) + } else { + // Likely a reward vertex + result["type"] = "reward:ProverReward" + result["delegate_address"] = hex.EncodeToString(order0Value) + decodeRewardFields(tree, result) + } + } + default: + result["type"] = "unknown" + result["order_0_size"] = len(order0Value) + } + + return result +} + +func decodeProverFields(tree *tries.VectorCommitmentTree, result map[string]any) { + // Prover schema: + // order 0: PublicKey (585 bytes) - already decoded + // order 1: Status (1 byte) - key 0x04 + // order 2: AvailableStorage (8 bytes) - key 0x08 + // order 3: Seniority (8 bytes) - key 0x0c + // order 4: KickFrameNumber (8 bytes) - key 0x10 + + if status, err := tree.Get([]byte{0x04}); err == nil && len(status) == 1 { + result["status"] = decodeProverStatus(status[0]) + result["status_raw"] = status[0] + } + + if storage, err := tree.Get([]byte{0x08}); err == nil && len(storage) == 8 { + result["available_storage"] = binary.BigEndian.Uint64(storage) + } + + if seniority, err := tree.Get([]byte{0x0c}); err == nil && len(seniority) == 8 { + result["seniority"] = binary.BigEndian.Uint64(seniority) + } + + if kickFrame, err := tree.Get([]byte{0x10}); err == nil && len(kickFrame) == 8 { + result["kick_frame_number"] = binary.BigEndian.Uint64(kickFrame) + } +} + +func decodeAllocationFields(tree *tries.VectorCommitmentTree, result map[string]any) { + // Allocation schema: + // order 0: Prover (32 bytes) - already decoded + // order 1: Status (1 byte) - key 0x04 + // order 2: ConfirmationFilter (up to 64 bytes) - key 0x08 + // order 3: RejectionFilter (up to 64 bytes) - key 0x0c + // order 4: JoinFrameNumber (8 bytes) - key 0x10 + // order 5: LeaveFrameNumber (8 bytes) - key 0x14 + // order 6: PauseFrameNumber (8 bytes) - key 0x18 + // order 7: ResumeFrameNumber (8 bytes) - key 0x1c + // order 8: KickFrameNumber (8 bytes) - key 0x20 + // order 9: JoinConfirmFrameNumber (8 bytes) - key 0x24 + // order 10: JoinRejectFrameNumber (8 bytes) - key 0x28 + // order 11: LeaveConfirmFrameNumber (8 bytes) - key 0x2c + // order 12: LeaveRejectFrameNumber (8 bytes) - key 0x30 + // order 13: LastActiveFrameNumber (8 bytes) - key 0x34 + + if status, err := tree.Get([]byte{0x04}); err == nil && len(status) == 1 { + result["status"] = decodeAllocationStatus(status[0]) + result["status_raw"] = status[0] + } + + if confirmFilter, err := tree.Get([]byte{0x08}); err == nil && len(confirmFilter) > 0 { + result["confirmation_filter"] = hex.EncodeToString(confirmFilter) + if bytes.Equal(confirmFilter, make([]byte, len(confirmFilter))) { + result["is_global_prover"] = true + } + } else { + result["is_global_prover"] = true + } + + if rejFilter, err := tree.Get([]byte{0x0c}); err == nil && len(rejFilter) > 0 { + result["rejection_filter"] = hex.EncodeToString(rejFilter) + } + + if joinFrame, err := tree.Get([]byte{0x10}); err == nil && len(joinFrame) == 8 { + result["join_frame_number"] = binary.BigEndian.Uint64(joinFrame) + } + + if leaveFrame, err := tree.Get([]byte{0x14}); err == nil && len(leaveFrame) == 8 { + result["leave_frame_number"] = binary.BigEndian.Uint64(leaveFrame) + } + + if pauseFrame, err := tree.Get([]byte{0x18}); err == nil && len(pauseFrame) == 8 { + result["pause_frame_number"] = binary.BigEndian.Uint64(pauseFrame) + } + + if resumeFrame, err := tree.Get([]byte{0x1c}); err == nil && len(resumeFrame) == 8 { + result["resume_frame_number"] = binary.BigEndian.Uint64(resumeFrame) + } + + if kickFrame, err := tree.Get([]byte{0x20}); err == nil && len(kickFrame) == 8 { + result["kick_frame_number"] = binary.BigEndian.Uint64(kickFrame) + } + + if joinConfirm, err := tree.Get([]byte{0x24}); err == nil && len(joinConfirm) == 8 { + result["join_confirm_frame_number"] = binary.BigEndian.Uint64(joinConfirm) + } + + if joinReject, err := tree.Get([]byte{0x28}); err == nil && len(joinReject) == 8 { + result["join_reject_frame_number"] = binary.BigEndian.Uint64(joinReject) + } + + if leaveConfirm, err := tree.Get([]byte{0x2c}); err == nil && len(leaveConfirm) == 8 { + result["leave_confirm_frame_number"] = binary.BigEndian.Uint64(leaveConfirm) + } + + if leaveReject, err := tree.Get([]byte{0x30}); err == nil && len(leaveReject) == 8 { + result["leave_reject_frame_number"] = binary.BigEndian.Uint64(leaveReject) + } + + if lastActive, err := tree.Get([]byte{0x34}); err == nil && len(lastActive) == 8 { + result["last_active_frame_number"] = binary.BigEndian.Uint64(lastActive) + } +} + +func decodeRewardFields(tree *tries.VectorCommitmentTree, result map[string]any) { + // Reward schema - just has DelegateAddress at order 0 + // Nothing else to decode for now +} + +func decodeProverStatus(status byte) string { + // Prover status mapping (internal byte -> name) + switch status { + case 0: + return "Joining" + case 1: + return "Active" + case 2: + return "Paused" + case 3: + return "Leaving" + case 4: + return "Rejected" + case 5: + return "Kicked" + default: + return fmt.Sprintf("Unknown(%d)", status) + } +} + +func decodeAllocationStatus(status byte) string { + // Allocation status mapping (same as prover status) + return decodeProverStatus(status) +} + func summarizeHypergraphTreeNode(value []byte) string { if len(value) == 0 { return "hypergraph_tree_node " diff --git a/node/store/pebble.go b/node/store/pebble.go index 6cf4e83..7f72f7b 100644 --- a/node/store/pebble.go +++ b/node/store/pebble.go @@ -1,6 +1,7 @@ package store import ( + "bytes" "context" "encoding/binary" "encoding/hex" @@ -16,6 +17,7 @@ import ( "go.uber.org/zap" "source.quilibrium.com/quilibrium/monorepo/config" "source.quilibrium.com/quilibrium/monorepo/types/store" + "source.quilibrium.com/quilibrium/monorepo/types/tries" ) type PebbleDB struct { @@ -55,6 +57,11 @@ var pebbleMigrations = []func(*pebble.Batch) error{ migration_2_1_0_153, migration_2_1_0_154, migration_2_1_0_155, + migration_2_1_0_156, + migration_2_1_0_157, + migration_2_1_0_158, + migration_2_1_0_159, + migration_2_1_0_17, } func NewPebbleDB( @@ -342,8 +349,8 @@ func (p *PebbleDB) NewIter(lowerBound []byte, upperBound []byte) ( error, ) { return p.db.NewIter(&pebble.IterOptions{ - LowerBound: lowerBound, // buildutils:allow-slice-alias slice is static - UpperBound: upperBound, // buildutils:allow-slice-alias slice is static + LowerBound: lowerBound, + UpperBound: upperBound, }) } @@ -415,8 +422,8 @@ func (t *PebbleTransaction) NewIter(lowerBound []byte, upperBound []byte) ( error, ) { return t.b.NewIter(&pebble.IterOptions{ - LowerBound: lowerBound, // buildutils:allow-slice-alias slice is static - UpperBound: upperBound, // buildutils:allow-slice-alias slice is static + LowerBound: lowerBound, + UpperBound: upperBound, }) } @@ -437,7 +444,7 @@ func rightAlign(data []byte, size int) []byte { l := len(data) if l == size { - return data // buildutils:allow-slice-alias slice is static + return data } if l > size { @@ -651,6 +658,103 @@ func migration_2_1_0_155(b *pebble.Batch) error { return migration_2_1_0_15(b) } +func migration_2_1_0_156(b *pebble.Batch) error { + return migration_2_1_0_15(b) +} + +func migration_2_1_0_157(b *pebble.Batch) error { + return migration_2_1_0_15(b) +} + +func migration_2_1_0_158(b *pebble.Batch) error { + return migration_2_1_0_15(b) +} + +func migration_2_1_0_159(b *pebble.Batch) error { + return migration_2_1_0_15(b) +} + +func migration_2_1_0_17(b *pebble.Batch) error { + // Global shard key: L1={0,0,0}, L2=0xff*32 + globalShardKey := tries.ShardKey{ + L1: [3]byte{}, + L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)), + } + // Next shard key (for exclusive upper bound): L1={0,0,1}, L2=0x00*32 + nextShardKey := tries.ShardKey{ + L1: [3]byte{0, 0, 1}, + L2: [32]byte{}, + } + + // Delete vertex data for global domain + // Vertex data keys: {0x09, 0xF0, domain[32], address[32]} + // Start: {0x09, 0xF0, 0xff*32} (prefix for global domain) + // End: {0x09, 0xF1} (next prefix type, ensures we capture all addresses) + if err := b.DeleteRange( + hypergraphVertexDataKey(globalShardKey.L2[:]), + []byte{HYPERGRAPH_SHARD, VERTEX_DATA + 1}, + &pebble.WriteOptions{}, + ); err != nil { + return err + } + + // Delete vertex adds tree nodes + if err := b.DeleteRange( + hypergraphVertexAddsTreeNodeKey(globalShardKey, []byte{}), + hypergraphVertexAddsTreeNodeKey(nextShardKey, []byte{}), + &pebble.WriteOptions{}, + ); err != nil { + return err + } + + // Delete vertex adds tree nodes by path + if err := b.DeleteRange( + hypergraphVertexAddsTreeNodeByPathKey(globalShardKey, []int{}), + hypergraphVertexAddsTreeNodeByPathKey(nextShardKey, []int{}), + &pebble.WriteOptions{}, + ); err != nil { + return err + } + + // Delete hyperedge adds tree nodes + if err := b.DeleteRange( + hypergraphHyperedgeAddsTreeNodeKey(globalShardKey, []byte{}), + hypergraphHyperedgeAddsTreeNodeKey(nextShardKey, []byte{}), + &pebble.WriteOptions{}, + ); err != nil { + return err + } + + // Delete hyperedge adds tree nodes by path + if err := b.DeleteRange( + hypergraphHyperedgeAddsTreeNodeByPathKey(globalShardKey, []int{}), + hypergraphHyperedgeAddsTreeNodeByPathKey(nextShardKey, []int{}), + &pebble.WriteOptions{}, + ); err != nil { + return err + } + + // Delete vertex adds tree root + if err := b.DeleteRange( + hypergraphVertexAddsTreeRootKey(globalShardKey), + hypergraphVertexAddsTreeRootKey(nextShardKey), + &pebble.WriteOptions{}, + ); err != nil { + return err + } + + // Delete hyperedge adds tree root + if err := b.DeleteRange( + hypergraphHyperedgeAddsTreeRootKey(globalShardKey), + hypergraphHyperedgeAddsTreeRootKey(nextShardKey), + &pebble.WriteOptions{}, + ); err != nil { + return err + } + + return nil +} + type pebbleSnapshotDB struct { snap *pebble.Snapshot } @@ -676,8 +780,8 @@ func (p *pebbleSnapshotDB) NewIter(lowerBound []byte, upperBound []byte) ( error, ) { return p.snap.NewIter(&pebble.IterOptions{ - LowerBound: lowerBound, // buildutils:allow-slice-alias slice is static - UpperBound: upperBound, // buildutils:allow-slice-alias slice is static + LowerBound: lowerBound, + UpperBound: upperBound, }) } diff --git a/node/store/pebble_test.go b/node/store/pebble_test.go index 76f4d49..36ea602 100644 --- a/node/store/pebble_test.go +++ b/node/store/pebble_test.go @@ -1,10 +1,13 @@ package store import ( + "encoding/hex" + "fmt" "os" "path/filepath" "testing" + "github.com/iden3/go-iden3-crypto/poseidon" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -12,6 +15,13 @@ import ( "source.quilibrium.com/quilibrium/monorepo/config" ) +func TestPoseidon(t *testing.T) { + bi, err := poseidon.HashBytes([]byte("testvector")) + require.NoError(t, err) + fmt.Println(hex.EncodeToString(bi.FillBytes(make([]byte, 32)))) + assert.FailNow(t, "") +} + func TestNewPebbleDB_ExistingDirectory(t *testing.T) { testDir, err := os.MkdirTemp("", "pebble-test-existing-*") require.NoError(t, err) diff --git a/node/worker/manager.go b/node/worker/manager.go index 862a938..10ada64 100644 --- a/node/worker/manager.go +++ b/node/worker/manager.go @@ -819,9 +819,6 @@ func (w *WorkerManager) getMultiaddrOfWorker(coreId uint) ( rpcMultiaddr = strings.Replace(rpcMultiaddr, "/0.0.0.0/", "/127.0.0.1/", 1) rpcMultiaddr = strings.Replace(rpcMultiaddr, "/0:0:0:0:0:0:0:0/", "/::1/", 1) rpcMultiaddr = strings.Replace(rpcMultiaddr, "/::/", "/::1/", 1) - // force TCP as stream is not supported over UDP/QUIC - rpcMultiaddr = strings.Replace(rpcMultiaddr, "/quic-v1", "", 1) - rpcMultiaddr = strings.Replace(rpcMultiaddr, "udp", "tcp", 1) ma, err := multiaddr.StringCast(rpcMultiaddr) return ma, errors.Wrap(err, "get multiaddr of worker") diff --git a/types/hypergraph/hypergraph.go b/types/hypergraph/hypergraph.go index f2ad133..c922124 100644 --- a/types/hypergraph/hypergraph.go +++ b/types/hypergraph/hypergraph.go @@ -194,6 +194,42 @@ type Hypergraph interface { frameNumber uint64, ) error + // Hard delete operations - these bypass CRDT semantics for pruning + + // DeleteVertexAdd performs a hard delete of a vertex from the VertexAdds + // set. Unlike RemoveVertex (which adds to VertexRemoves for CRDT semantics), + // this actually removes the entry from VertexAdds and deletes the associated + // vertex data. This is used for pruning stale/orphaned data. + DeleteVertexAdd( + txn tries.TreeBackingStoreTransaction, + shardKey tries.ShardKey, + vertexID [64]byte, + ) error + + // DeleteVertexRemove performs a hard delete of a vertex from the + // VertexRemoves set. This is used for pruning stale data. + DeleteVertexRemove( + txn tries.TreeBackingStoreTransaction, + shardKey tries.ShardKey, + vertexID [64]byte, + ) error + + // DeleteHyperedgeAdd performs a hard delete of a hyperedge from the + // HyperedgeAdds set. This is used for pruning stale/orphaned data. + DeleteHyperedgeAdd( + txn tries.TreeBackingStoreTransaction, + shardKey tries.ShardKey, + hyperedgeID [64]byte, + ) error + + // DeleteHyperedgeRemove performs a hard delete of a hyperedge from the + // HyperedgeRemoves set. This is used for pruning stale data. + DeleteHyperedgeRemove( + txn tries.TreeBackingStoreTransaction, + shardKey tries.ShardKey, + hyperedgeID [64]byte, + ) error + // Hyperedge data operations // GetHyperedgeExtrinsics retrieves the extrinsic tree of a hyperedge, which diff --git a/types/mocks/hypergraph.go b/types/mocks/hypergraph.go index 87c2e2e..541d406 100644 --- a/types/mocks/hypergraph.go +++ b/types/mocks/hypergraph.go @@ -555,6 +555,46 @@ func (h *MockHypergraph) ImportTree( return args.Error(0) } +// DeleteVertexAdd implements hypergraph.Hypergraph. +func (h *MockHypergraph) DeleteVertexAdd( + txn tries.TreeBackingStoreTransaction, + shardKey tries.ShardKey, + vertexID [64]byte, +) error { + args := h.Called(txn, shardKey, vertexID) + return args.Error(0) +} + +// DeleteVertexRemove implements hypergraph.Hypergraph. +func (h *MockHypergraph) DeleteVertexRemove( + txn tries.TreeBackingStoreTransaction, + shardKey tries.ShardKey, + vertexID [64]byte, +) error { + args := h.Called(txn, shardKey, vertexID) + return args.Error(0) +} + +// DeleteHyperedgeAdd implements hypergraph.Hypergraph. +func (h *MockHypergraph) DeleteHyperedgeAdd( + txn tries.TreeBackingStoreTransaction, + shardKey tries.ShardKey, + hyperedgeID [64]byte, +) error { + args := h.Called(txn, shardKey, hyperedgeID) + return args.Error(0) +} + +// DeleteHyperedgeRemove implements hypergraph.Hypergraph. +func (h *MockHypergraph) DeleteHyperedgeRemove( + txn tries.TreeBackingStoreTransaction, + shardKey tries.ShardKey, + hyperedgeID [64]byte, +) error { + args := h.Called(txn, shardKey, hyperedgeID) + return args.Error(0) +} + // Ensure MockHypergraph implements Hypergraph var _ hg.Hypergraph = (*MockHypergraph)(nil) diff --git a/types/tries/lazy_proof_tree.go b/types/tries/lazy_proof_tree.go index 6885c39..b47b729 100644 --- a/types/tries/lazy_proof_tree.go +++ b/types/tries/lazy_proof_tree.go @@ -569,6 +569,10 @@ type TreeBackingStore interface { shardKey ShardKey, leaf *RawLeafData, ) error + DeleteVertexTree( + txn TreeBackingStoreTransaction, + id []byte, + ) error } // LazyVectorCommitmentTree is a lazy-loaded (from a TreeBackingStore based