diff --git a/Cargo.lock b/Cargo.lock
index f886603..c7a415e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -307,7 +307,7 @@ checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae"
[[package]]
name = "bls48581"
-version = "0.1.0"
+version = "2.1.0"
dependencies = [
"criterion 0.4.0",
"hex 0.4.3",
diff --git a/RELEASE-NOTES b/RELEASE-NOTES
index e889ed6..95f236a 100644
--- a/RELEASE-NOTES
+++ b/RELEASE-NOTES
@@ -1,3 +1,10 @@
+# 2.1.0.17
+- resolve sync race condition with prover registry pruning
+- update hypergraph to directly manage raw deletions
+- migration to resolve records issue from above
+- resolve early snapshot termination issue
+- global halts are now just halts on processing non-global ops
+
# 2.1.0.16
- build_utils – static code analysis checker for underlying slice assignment
- hypergraph snapshot manager now uses in memory snapshot instead of pebble snapshot
diff --git a/config/version.go b/config/version.go
index c2e15f3..fc36a50 100644
--- a/config/version.go
+++ b/config/version.go
@@ -43,7 +43,7 @@ func FormatVersion(version []byte) string {
}
func GetPatchNumber() byte {
- return 0x10
+ return 0x11
}
func GetRCNumber() byte {
diff --git a/hypergraph/go.mod b/hypergraph/go.mod
index 42815be..d913c92 100644
--- a/hypergraph/go.mod
+++ b/hypergraph/go.mod
@@ -24,7 +24,6 @@ replace source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub => ../go-
require (
github.com/prometheus/client_golang v1.22.0
- google.golang.org/grpc v1.72.0
source.quilibrium.com/quilibrium/monorepo/protobufs v0.0.0-00010101000000-000000000000
source.quilibrium.com/quilibrium/monorepo/types v0.0.0-00010101000000-000000000000
source.quilibrium.com/quilibrium/monorepo/utils v0.0.0-00010101000000-000000000000
@@ -70,7 +69,7 @@ require (
golang.org/x/text v0.26.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb // indirect
-
+ google.golang.org/grpc v1.72.0 // indirect
google.golang.org/protobuf v1.36.6 // indirect
lukechampine.com/blake3 v1.4.1 // indirect
)
diff --git a/hypergraph/hypergraph.go b/hypergraph/hypergraph.go
index 7e64dd3..1973ec2 100644
--- a/hypergraph/hypergraph.go
+++ b/hypergraph/hypergraph.go
@@ -762,3 +762,105 @@ func boolToString(b bool) string {
}
return "false"
}
+
+// DeleteVertexAdd performs a hard delete of a vertex from the VertexAdds set.
+// Unlike RemoveVertex (which adds to VertexRemoves for CRDT semantics), this
+// actually removes the entry from VertexAdds and deletes the associated vertex
+// data. This is used for pruning stale/orphaned data that should not be synced.
+//
+// The caller must provide a transaction for atomic deletion of both the tree
+// entry and the vertex data.
+func (hg *HypergraphCRDT) DeleteVertexAdd(
+ txn tries.TreeBackingStoreTransaction,
+ shardKey tries.ShardKey,
+ vertexID [64]byte,
+) error {
+ hg.mu.Lock()
+ defer hg.mu.Unlock()
+
+ set := hg.getVertexAddsSet(shardKey)
+ tree := set.GetTree()
+
+ // Delete from the tree (removes tree node and path index)
+ if err := tree.Delete(txn, vertexID[:]); err != nil {
+ return errors.Wrap(err, "delete vertex add: tree delete")
+ }
+
+ // Delete the vertex data
+ if err := tree.Store.DeleteVertexTree(txn, vertexID[:]); err != nil {
+ // Log but don't fail - vertex data may already be gone
+ hg.logger.Debug(
+ "delete vertex add: vertex data not found",
+ zap.String("vertex_id", string(vertexID[:])),
+ zap.Error(err),
+ )
+ }
+
+ return nil
+}
+
+// DeleteVertexRemove performs a hard delete of a vertex from the VertexRemoves
+// set. This removes the entry from VertexRemoves, effectively "un-removing" the
+// vertex if it still exists in VertexAdds. This is used for pruning stale data.
+func (hg *HypergraphCRDT) DeleteVertexRemove(
+ txn tries.TreeBackingStoreTransaction,
+ shardKey tries.ShardKey,
+ vertexID [64]byte,
+) error {
+ hg.mu.Lock()
+ defer hg.mu.Unlock()
+
+ set := hg.getVertexRemovesSet(shardKey)
+ tree := set.GetTree()
+
+ // Delete from the tree
+ if err := tree.Delete(txn, vertexID[:]); err != nil {
+ return errors.Wrap(err, "delete vertex remove: tree delete")
+ }
+
+ return nil
+}
+
+// DeleteHyperedgeAdd performs a hard delete of a hyperedge from the
+// HyperedgeAdds set. Unlike RemoveHyperedge (which adds to HyperedgeRemoves for
+// CRDT semantics), this actually removes the entry from HyperedgeAdds. This is
+// used for pruning stale/orphaned data.
+func (hg *HypergraphCRDT) DeleteHyperedgeAdd(
+ txn tries.TreeBackingStoreTransaction,
+ shardKey tries.ShardKey,
+ hyperedgeID [64]byte,
+) error {
+ hg.mu.Lock()
+ defer hg.mu.Unlock()
+
+ set := hg.getHyperedgeAddsSet(shardKey)
+ tree := set.GetTree()
+
+ // Delete from the tree
+ if err := tree.Delete(txn, hyperedgeID[:]); err != nil {
+ return errors.Wrap(err, "delete hyperedge add: tree delete")
+ }
+
+ return nil
+}
+
+// DeleteHyperedgeRemove performs a hard delete of a hyperedge from the
+// HyperedgeRemoves set. This is used for pruning stale data.
+func (hg *HypergraphCRDT) DeleteHyperedgeRemove(
+ txn tries.TreeBackingStoreTransaction,
+ shardKey tries.ShardKey,
+ hyperedgeID [64]byte,
+) error {
+ hg.mu.Lock()
+ defer hg.mu.Unlock()
+
+ set := hg.getHyperedgeRemovesSet(shardKey)
+ tree := set.GetTree()
+
+ // Delete from the tree
+ if err := tree.Delete(txn, hyperedgeID[:]); err != nil {
+ return errors.Wrap(err, "delete hyperedge remove: tree delete")
+ }
+
+ return nil
+}
diff --git a/hypergraph/snapshot_manager.go b/hypergraph/snapshot_manager.go
index 3ecd191..90a2f6e 100644
--- a/hypergraph/snapshot_manager.go
+++ b/hypergraph/snapshot_manager.go
@@ -175,11 +175,17 @@ func (m *snapshotManager) publish(root []byte) {
m.mu.Lock()
defer m.mu.Unlock()
+ // Remove all handles from the map so new syncs get new handles.
+ // Handles with active refs will be released when their last user calls release().
+ // Handles with no active refs (only the initial ref from creation) are released now.
for key, handle := range m.handles {
+ delete(m.handles, key)
if handle != nil {
+ // releaseRef decrements the ref count. If this was the last ref
+ // (i.e., no active sync sessions), the underlying DB is released.
+ // If there are active sync sessions, they will release it when done.
handle.releaseRef(m.logger)
}
- delete(m.handles, key)
}
m.root = nil
@@ -221,6 +227,11 @@ func (m *snapshotManager) acquire(
}
handle := newSnapshotHandle(key, storeSnapshot, release, m.root)
+ // Acquire a ref for the caller. The handle is created with refs=1 (the owner ref
+ // held by the snapshot manager), and this adds another ref for the sync session.
+ // This ensures publish() can release the owner ref without closing the DB while
+ // a sync is still using it.
+ handle.acquire()
m.handles[key] = handle
return handle
}
diff --git a/hypergraph/sync.go b/hypergraph/sync.go
index 4dec1cf..3f659a4 100644
--- a/hypergraph/sync.go
+++ b/hypergraph/sync.go
@@ -451,7 +451,7 @@ func getChildSegments(
prefix,
)
if err != nil && !strings.Contains(err.Error(), "item not found") {
- panic(err)
+ return nodes, 0
}
if isPrefix(prefix, path) {
@@ -1345,7 +1345,7 @@ func getNodeAtPath(
slices.Concat(n.FullPrefix, []int{int(childIndex)}),
)
if err != nil && !strings.Contains(err.Error(), "item not found") {
- logger.Panic("failed to get node by path", zap.Error(err))
+ return nil
}
if child == nil {
@@ -1425,7 +1425,7 @@ func getBranchInfoFromTree(
slices.Concat(branch.FullPrefix, []int{i}),
)
if err != nil && !strings.Contains(err.Error(), "item not found") {
- logger.Panic("failed to get node by path", zap.Error(err))
+ return nil, err
}
}
@@ -1495,7 +1495,7 @@ func ensureCommittedNode(
path,
)
if err != nil && !strings.Contains(err.Error(), "item not found") {
- logger.Panic("failed to reload node by path", zap.Error(err))
+ return nil
}
if reloaded != nil {
return reloaded
diff --git a/node/consensus/app/app_consensus_engine.go b/node/consensus/app/app_consensus_engine.go
index 0c400ab..923c9da 100644
--- a/node/consensus/app/app_consensus_engine.go
+++ b/node/consensus/app/app_consensus_engine.go
@@ -155,6 +155,10 @@ type AppConsensusEngine struct {
globalProverRootSynced atomic.Bool
globalProverSyncInProgress atomic.Bool
+ // Genesis initialization
+ genesisInitialized atomic.Bool
+ genesisInitChan chan *protobufs.GlobalFrame
+
// Message queues
consensusMessageQueue chan *pb.Message
proverMessageQueue chan *pb.Message
@@ -294,6 +298,7 @@ func NewAppConsensusEngine(
peerAuthCache: make(map[string]time.Time),
peerInfoDigestCache: make(map[string]struct{}),
keyRegistryDigestCache: make(map[string]struct{}),
+ genesisInitChan: make(chan *protobufs.GlobalFrame, 1),
}
engine.frameChainChecker = NewAppFrameChainChecker(clockStore, logger, appAddress)
@@ -504,7 +509,10 @@ func NewAppConsensusEngine(
*protobufs.AppShardFrame,
*protobufs.ProposalVote,
]
- initializeCertifiedGenesis := func() {
+
+ // Check if we need to await network data for genesis initialization
+ needsNetworkGenesis := false
+ initializeCertifiedGenesis := func(markInitialized bool) {
frame, qc := engine.initializeGenesis()
state = &models.CertifiedState[*protobufs.AppShardFrame]{
State: &models.State[*protobufs.AppShardFrame]{
@@ -515,17 +523,72 @@ func NewAppConsensusEngine(
CertifyingQuorumCertificate: qc,
}
pending = nil
+ if markInitialized {
+ engine.genesisInitialized.Store(true)
+ }
+ }
+
+ initializeCertifiedGenesisFromNetwork := func(
+ difficulty uint32,
+ shardInfo []*protobufs.AppShardInfo,
+ ) {
+ // Delete the temporary genesis frame first
+ if err := engine.clockStore.DeleteShardClockFrameRange(
+ engine.appAddress, 0, 1,
+ ); err != nil {
+ logger.Debug(
+ "could not delete temporary genesis frame",
+ zap.Error(err),
+ )
+ }
+
+ frame, qc := engine.initializeGenesisWithParams(difficulty, shardInfo)
+ state = &models.CertifiedState[*protobufs.AppShardFrame]{
+ State: &models.State[*protobufs.AppShardFrame]{
+ Rank: 0,
+ Identifier: frame.Identity(),
+ State: &frame,
+ },
+ CertifyingQuorumCertificate: qc,
+ }
+ pending = nil
+ engine.genesisInitialized.Store(true)
+
+ logger.Info(
+ "initialized genesis with network data",
+ zap.Uint32("difficulty", difficulty),
+ zap.Int("shard_info_count", len(shardInfo)),
+ )
}
if err != nil {
- initializeCertifiedGenesis()
+ // No consensus state exists - check if we have a genesis frame already
+ _, _, genesisErr := engine.clockStore.GetShardClockFrame(
+ engine.appAddress,
+ 0,
+ false,
+ )
+ if genesisErr != nil && errors.Is(genesisErr, store.ErrNotFound) {
+ // No genesis exists - we need to await network data
+ needsNetworkGenesis = true
+ logger.Warn(
+ "app genesis missing - will await network data",
+ zap.String("shard_address", hex.EncodeToString(appAddress)),
+ )
+ // Initialize with default values for now
+ // This will be re-done after receiving network data
+ // Pass false to NOT mark as initialized - we're waiting for network data
+ initializeCertifiedGenesis(false)
+ } else {
+ initializeCertifiedGenesis(true)
+ }
} else {
qc, err := engine.clockStore.GetQuorumCertificate(
engine.appAddress,
latest.FinalizedRank,
)
if err != nil || qc.GetFrameNumber() == 0 {
- initializeCertifiedGenesis()
+ initializeCertifiedGenesis(true)
} else {
frame, _, err := engine.clockStore.GetShardClockFrame(
engine.appAddress,
@@ -535,8 +598,10 @@ func NewAppConsensusEngine(
if err != nil {
panic(err)
}
- parentFrame, err := engine.clockStore.GetGlobalClockFrame(
- qc.GetFrameNumber() - 1,
+ parentFrame, _, err := engine.clockStore.GetShardClockFrame(
+ engine.appAddress,
+ qc.GetFrameNumber()-1,
+ false,
)
if err != nil {
panic(err)
@@ -656,6 +721,48 @@ func NewAppConsensusEngine(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
) {
+ // If we need network genesis, await it before starting consensus
+ if needsNetworkGenesis {
+ engine.logger.Info(
+ "awaiting network data for genesis initialization",
+ zap.String("shard_address", engine.appAddressHex),
+ )
+
+ // Wait for a global frame from pubsub
+ globalFrame, err := engine.awaitFirstGlobalFrame(ctx)
+ if err != nil {
+ engine.logger.Error(
+ "failed to await global frame for genesis",
+ zap.Error(err),
+ )
+ ctx.Throw(err)
+ return
+ }
+
+ // Fetch shard info from bootstrap peers
+ shardInfo, err := engine.fetchShardInfoFromBootstrap(ctx)
+ if err != nil {
+ engine.logger.Warn(
+ "failed to fetch shard info from bootstrap peers",
+ zap.Error(err),
+ )
+ // Continue anyway - we at least have the global frame
+ }
+
+ engine.logger.Info(
+ "received network genesis data",
+ zap.Uint64("global_frame_number", globalFrame.Header.FrameNumber),
+ zap.Uint32("difficulty", globalFrame.Header.Difficulty),
+ zap.Int("shard_info_count", len(shardInfo)),
+ )
+
+ // Re-initialize genesis with the correct network data
+ initializeCertifiedGenesisFromNetwork(
+ globalFrame.Header.Difficulty,
+ shardInfo,
+ )
+ }
+
if err := engine.waitForProverRegistration(ctx); err != nil {
engine.logger.Error("prover unavailable", zap.Error(err))
ctx.Throw(err)
@@ -1401,6 +1508,16 @@ func (e *AppConsensusEngine) updateMetricsLoop(
func (e *AppConsensusEngine) initializeGenesis() (
*protobufs.AppShardFrame,
*protobufs.QuorumCertificate,
+) {
+ return e.initializeGenesisWithParams(e.config.Engine.Difficulty, nil)
+}
+
+func (e *AppConsensusEngine) initializeGenesisWithParams(
+ difficulty uint32,
+ shardInfo []*protobufs.AppShardInfo,
+) (
+ *protobufs.AppShardFrame,
+ *protobufs.QuorumCertificate,
) {
// Initialize state roots for hypergraph
stateRoots := make([][]byte, 4)
@@ -1408,9 +1525,14 @@ func (e *AppConsensusEngine) initializeGenesis() (
stateRoots[i] = make([]byte, 64)
}
+ // Use provided difficulty or fall back to config
+ if difficulty == 0 {
+ difficulty = e.config.Engine.Difficulty
+ }
+
genesisHeader, err := e.frameProver.ProveFrameHeaderGenesis(
e.appAddress,
- 80000,
+ difficulty,
make([]byte, 516),
100,
)
@@ -1560,6 +1682,127 @@ func (e *AppConsensusEngine) ensureAppGenesis() error {
return nil
}
+// fetchShardInfoFromBootstrap connects to bootstrap peers and fetches shard info
+// for this app's address using the GetAppShards RPC.
+func (e *AppConsensusEngine) fetchShardInfoFromBootstrap(
+ ctx context.Context,
+) ([]*protobufs.AppShardInfo, error) {
+ bootstrapPeers := e.config.P2P.BootstrapPeers
+ if len(bootstrapPeers) == 0 {
+ return nil, errors.New("no bootstrap peers configured")
+ }
+
+ for _, peerAddr := range bootstrapPeers {
+ shardInfo, err := e.tryFetchShardInfoFromPeer(ctx, peerAddr)
+ if err != nil {
+ e.logger.Debug(
+ "failed to fetch shard info from peer",
+ zap.String("peer", peerAddr),
+ zap.Error(err),
+ )
+ continue
+ }
+ if len(shardInfo) > 0 {
+ e.logger.Info(
+ "fetched shard info from bootstrap peer",
+ zap.String("peer", peerAddr),
+ zap.Int("shard_count", len(shardInfo)),
+ )
+ return shardInfo, nil
+ }
+ }
+
+ return nil, errors.New("failed to fetch shard info from any bootstrap peer")
+}
+
+func (e *AppConsensusEngine) tryFetchShardInfoFromPeer(
+ ctx context.Context,
+ peerAddr string,
+) ([]*protobufs.AppShardInfo, error) {
+ // Parse multiaddr to extract peer ID and address
+ ma, err := multiaddr.StringCast(peerAddr)
+ if err != nil {
+ return nil, errors.Wrap(err, "parse multiaddr")
+ }
+
+ // Extract peer ID from the multiaddr
+ peerIDStr, err := ma.ValueForProtocol(multiaddr.P_P2P)
+ if err != nil {
+ return nil, errors.Wrap(err, "extract peer id")
+ }
+
+ peerID, err := peer.Decode(peerIDStr)
+ if err != nil {
+ return nil, errors.Wrap(err, "decode peer id")
+ }
+
+ // Create gRPC connection to the peer
+ mga, err := mn.ToNetAddr(ma)
+ if err != nil {
+ return nil, errors.Wrap(err, "convert multiaddr")
+ }
+
+ creds, err := p2p.NewPeerAuthenticator(
+ e.logger,
+ e.config.P2P,
+ nil,
+ nil,
+ nil,
+ nil,
+ [][]byte{[]byte(peerID)},
+ map[string]channel.AllowedPeerPolicyType{},
+ map[string]channel.AllowedPeerPolicyType{},
+ ).CreateClientTLSCredentials([]byte(peerID))
+ if err != nil {
+ return nil, errors.Wrap(err, "create credentials")
+ }
+
+ conn, err := grpc.NewClient(
+ mga.String(),
+ grpc.WithTransportCredentials(creds),
+ )
+ if err != nil {
+ return nil, errors.Wrap(err, "dial peer")
+ }
+ defer conn.Close()
+
+ client := protobufs.NewGlobalServiceClient(conn)
+
+ reqCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
+ defer cancel()
+
+ resp, err := client.GetAppShards(reqCtx, &protobufs.GetAppShardsRequest{
+ ShardKey: e.appAddress,
+ Prefix: []uint32{},
+ })
+ if err != nil {
+ return nil, errors.Wrap(err, "get app shards")
+ }
+
+ return resp.GetInfo(), nil
+}
+
+// awaitFirstGlobalFrame waits for a global frame to arrive via pubsub and
+// returns it. This is used during genesis initialization to get the correct
+// difficulty from the network.
+func (e *AppConsensusEngine) awaitFirstGlobalFrame(
+ ctx context.Context,
+) (*protobufs.GlobalFrame, error) {
+ e.logger.Info("awaiting first global frame from network for genesis initialization")
+
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case frame := <-e.genesisInitChan:
+ e.logger.Info(
+ "received global frame for genesis initialization",
+ zap.Uint64("frame_number", frame.Header.FrameNumber),
+ zap.Uint32("difficulty", frame.Header.Difficulty),
+ )
+ return frame, nil
+ }
+}
+
func (e *AppConsensusEngine) waitForProverRegistration(
ctx lifecycle.SignalerContext,
) error {
diff --git a/node/consensus/app/consensus_dynamic_committee.go b/node/consensus/app/consensus_dynamic_committee.go
index 0053230..feee074 100644
--- a/node/consensus/app/consensus_dynamic_committee.go
+++ b/node/consensus/app/consensus_dynamic_committee.go
@@ -123,6 +123,10 @@ func (e *AppConsensusEngine) LeaderForRank(rank uint64) (
return "", errors.Wrap(err, "leader for rank")
}
+ if e.config.P2P.Network == 0 && len(proverSet) < 3 {
+ return models.Identity(make([]byte, 32)), nil
+ }
+
// Handle condition where prover cannot be yet known due to lack of sync:
if len(proverSet) == 0 {
return models.Identity(make([]byte, 32)), nil
diff --git a/node/consensus/app/message_processors.go b/node/consensus/app/message_processors.go
index 62d0310..398e955 100644
--- a/node/consensus/app/message_processors.go
+++ b/node/consensus/app/message_processors.go
@@ -1368,6 +1368,20 @@ func (e *AppConsensusEngine) handleGlobalFrameMessage(message *pb.Message) {
return
}
+ // If genesis hasn't been initialized yet, send this frame to the
+ // genesis init channel (non-blocking)
+ if !e.genesisInitialized.Load() {
+ select {
+ case e.genesisInitChan <- frame:
+ e.logger.Debug(
+ "sent global frame to genesis init channel",
+ zap.Uint64("frame_number", frame.Header.FrameNumber),
+ )
+ default:
+ // Channel already has a frame, skip
+ }
+ }
+
if err := e.globalTimeReel.Insert(frame); err != nil {
// Success metric recorded at the end of processing
globalFramesProcessedTotal.WithLabelValues("error").Inc()
diff --git a/node/consensus/global/coverage_events.go b/node/consensus/global/coverage_events.go
index c0bd224..3d565df 100644
--- a/node/consensus/global/coverage_events.go
+++ b/node/consensus/global/coverage_events.go
@@ -116,16 +116,21 @@ func (e *GlobalConsensusEngine) checkShardCoverage(frameNumber uint64) error {
remaining = int(haltGraceFrames - streak.Count)
}
if remaining <= 0 && e.config.P2P.Network == 0 {
- e.logger.Error(
- "CRITICAL: Shard has insufficient coverage - triggering network halt",
- zap.String("shard_address", hex.EncodeToString([]byte(shardAddress))),
- zap.Uint64("prover_count", proverCount),
- zap.Uint64("halt_threshold", haltThreshold),
- )
+ // Instead of halting, enter prover-only mode at the global level
+ // This allows prover messages to continue while blocking other messages
+ if !e.proverOnlyMode.Load() {
+ e.logger.Warn(
+ "CRITICAL: Shard has insufficient coverage - entering prover-only mode (non-prover messages will be dropped)",
+ zap.String("shard_address", hex.EncodeToString([]byte(shardAddress))),
+ zap.Uint64("prover_count", proverCount),
+ zap.Uint64("halt_threshold", haltThreshold),
+ )
+ e.proverOnlyMode.Store(true)
+ }
- // Emit halt event
+ // Emit warning event (not halt) so monitoring knows we're in degraded state
e.emitCoverageEvent(
- typesconsensus.ControlEventCoverageHalt,
+ typesconsensus.ControlEventCoverageWarn,
&typesconsensus.CoverageEventData{
ShardAddress: []byte(shardAddress),
ProverCount: int(proverCount),
@@ -133,7 +138,7 @@ func (e *GlobalConsensusEngine) checkShardCoverage(frameNumber uint64) error {
AttestedStorage: attestedStorage,
TreeMetadata: coverage.TreeMetadata,
Message: fmt.Sprintf(
- "Shard has only %d provers, network halt required",
+ "Shard has only %d provers, prover-only mode active (non-prover messages dropped)",
proverCount,
),
},
@@ -170,6 +175,16 @@ func (e *GlobalConsensusEngine) checkShardCoverage(frameNumber uint64) error {
// Not in critical state — clear any ongoing streak
e.clearStreak(shardAddress)
+ // If we were in prover-only mode and coverage is restored, exit prover-only mode
+ if e.proverOnlyMode.Load() {
+ e.logger.Info(
+ "Coverage restored - exiting prover-only mode",
+ zap.String("shard_address", hex.EncodeToString([]byte(shardAddress))),
+ zap.Uint64("prover_count", proverCount),
+ )
+ e.proverOnlyMode.Store(false)
+ }
+
// Check for low coverage
if proverCount < minProvers {
e.handleLowCoverage([]byte(shardAddress), coverage, minProvers)
diff --git a/node/consensus/global/genesis.go b/node/consensus/global/genesis.go
index d25d10c..cf1f103 100644
--- a/node/consensus/global/genesis.go
+++ b/node/consensus/global/genesis.go
@@ -531,6 +531,85 @@ func (e *GlobalConsensusEngine) createStubGenesis() *protobufs.GlobalFrame {
commitments[i] = &tries.VectorCommitmentTree{}
}
+ e.establishTestnetGenesisProvers()
+
+ roots, err := e.hypergraph.Commit(0)
+ if err != nil {
+ e.logger.Error("could not commit", zap.Error(err))
+ return nil
+ }
+
+ // Parse and set initial commitments from JSON
+ for shardKey, commits := range roots {
+ for i := 0; i < 3; i++ {
+ commitments[shardKey.L1[i]].Insert(
+ shardKey.L2[:],
+ commits[0],
+ nil,
+ big.NewInt(int64(len(commits[0]))),
+ )
+ commitments[shardKey.L1[i]].Commit(e.inclusionProver, false)
+ }
+ }
+
+ proverRoots := roots[tries.ShardKey{
+ L1: [3]byte{},
+ L2: intrinsics.GLOBAL_INTRINSIC_ADDRESS,
+ }]
+
+ proverRoot := proverRoots[0]
+
+ genesisHeader.ProverTreeCommitment = proverRoot
+
+ for i := 0; i < 256; i++ {
+ genesisHeader.GlobalCommitments[i] = commitments[i].Commit(
+ e.inclusionProver,
+ false,
+ )
+ }
+
+ // Establish an empty signature payload – this avoids panics on broken
+ // header readers
+ genesisHeader.PublicKeySignatureBls48581 =
+ &protobufs.BLS48581AggregateSignature{
+ Signature: make([]byte, 0),
+ PublicKey: &protobufs.BLS48581G2PublicKey{
+ KeyValue: make([]byte, 0),
+ },
+ Bitmask: make([]byte, 0),
+ }
+
+ genesisFrame := &protobufs.GlobalFrame{
+ Header: genesisHeader,
+ }
+
+ // Compute frame ID and store the full frame
+ frameIDBI, _ := poseidon.HashBytes(genesisHeader.Output)
+ frameID := frameIDBI.FillBytes(make([]byte, 32))
+ e.frameStoreMu.Lock()
+ e.frameStore[string(frameID)] = genesisFrame
+ e.frameStoreMu.Unlock()
+
+ // Add to time reel
+ txn, err := e.clockStore.NewTransaction(false)
+ if err != nil {
+ panic(err)
+ }
+ if err := e.clockStore.PutGlobalClockFrame(genesisFrame, txn); err != nil {
+ txn.Abort()
+ e.logger.Error("could not add frame", zap.Error(err))
+ return nil
+ }
+ if err := txn.Commit(); err != nil {
+ txn.Abort()
+ e.logger.Error("could not add frame", zap.Error(err))
+ return nil
+ }
+
+ return genesisFrame
+}
+
+func (e *GlobalConsensusEngine) establishTestnetGenesisProvers() error {
var proverPubKeys [][]byte
var err error
if e.config.P2P.Network != 99 && e.config.Engine != nil &&
@@ -647,83 +726,9 @@ func (e *GlobalConsensusEngine) createStubGenesis() *protobufs.GlobalFrame {
err = state.Commit()
if err != nil {
e.logger.Error("failed to commit", zap.Error(err))
- return nil
}
- roots, err := e.hypergraph.Commit(0)
- if err != nil {
- e.logger.Error("could not commit", zap.Error(err))
- return nil
- }
-
- // Parse and set initial commitments from JSON
- for shardKey, commits := range roots {
- for i := 0; i < 3; i++ {
- commitments[shardKey.L1[i]].Insert(
- shardKey.L2[:],
- commits[0],
- nil,
- big.NewInt(int64(len(commits[0]))),
- )
- commitments[shardKey.L1[i]].Commit(e.inclusionProver, false)
- }
- }
-
- proverRoots := roots[tries.ShardKey{
- L1: [3]byte{},
- L2: intrinsics.GLOBAL_INTRINSIC_ADDRESS,
- }]
-
- proverRoot := proverRoots[0]
-
- genesisHeader.ProverTreeCommitment = proverRoot
-
- for i := 0; i < 256; i++ {
- genesisHeader.GlobalCommitments[i] = commitments[i].Commit(
- e.inclusionProver,
- false,
- )
- }
-
- // Establish an empty signature payload – this avoids panics on broken
- // header readers
- genesisHeader.PublicKeySignatureBls48581 =
- &protobufs.BLS48581AggregateSignature{
- Signature: make([]byte, 0),
- PublicKey: &protobufs.BLS48581G2PublicKey{
- KeyValue: make([]byte, 0),
- },
- Bitmask: make([]byte, 0),
- }
-
- genesisFrame := &protobufs.GlobalFrame{
- Header: genesisHeader,
- }
-
- // Compute frame ID and store the full frame
- frameIDBI, _ := poseidon.HashBytes(genesisHeader.Output)
- frameID := frameIDBI.FillBytes(make([]byte, 32))
- e.frameStoreMu.Lock()
- e.frameStore[string(frameID)] = genesisFrame
- e.frameStoreMu.Unlock()
-
- // Add to time reel
- txn, err := e.clockStore.NewTransaction(false)
- if err != nil {
- panic(err)
- }
- if err := e.clockStore.PutGlobalClockFrame(genesisFrame, txn); err != nil {
- txn.Abort()
- e.logger.Error("could not add frame", zap.Error(err))
- return nil
- }
- if err := txn.Commit(); err != nil {
- txn.Abort()
- e.logger.Error("could not add frame", zap.Error(err))
- return nil
- }
-
- return genesisFrame
+ return nil
}
// InitializeGenesisState ensures the global genesis frame and QC exist using the
diff --git a/node/consensus/global/global_consensus_engine.go b/node/consensus/global/global_consensus_engine.go
index fbb0607..1040f84 100644
--- a/node/consensus/global/global_consensus_engine.go
+++ b/node/consensus/global/global_consensus_engine.go
@@ -37,6 +37,7 @@ import (
"source.quilibrium.com/quilibrium/monorepo/consensus/validator"
"source.quilibrium.com/quilibrium/monorepo/consensus/verification"
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
+ hgcrdt "source.quilibrium.com/quilibrium/monorepo/hypergraph"
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/aggregator"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/provers"
@@ -199,6 +200,7 @@ type GlobalConsensusEngine struct {
appFrameStore map[string]*protobufs.AppShardFrame
appFrameStoreMu sync.RWMutex
lowCoverageStreak map[string]*coverageStreak
+ proverOnlyMode atomic.Bool
peerInfoDigestCache map[string]struct{}
peerInfoDigestCacheMu sync.Mutex
keyRegistryDigestCache map[string]struct{}
@@ -595,6 +597,42 @@ func NewGlobalConsensusEngine(
if err != nil {
establishGenesis()
} else {
+ adds := engine.hypergraph.(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(
+ tries.ShardKey{
+ L1: [3]byte{},
+ L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)),
+ },
+ )
+
+ if lc, _ := adds.GetTree().GetMetadata(); lc == 0 {
+ if config.P2P.Network == 0 {
+ genesisData := engine.getMainnetGenesisJSON()
+ if genesisData == nil {
+ panic("no genesis data")
+ }
+
+ state := hgstate.NewHypergraphState(engine.hypergraph)
+
+ err = engine.establishMainnetGenesisProvers(state, genesisData)
+ if err != nil {
+ engine.logger.Error("failed to establish provers", zap.Error(err))
+ panic(err)
+ }
+
+ err = state.Commit()
+ if err != nil {
+ engine.logger.Error("failed to commit", zap.Error(err))
+ panic(err)
+ }
+ } else {
+ engine.establishTestnetGenesisProvers()
+ }
+
+ err := engine.proverRegistry.Refresh()
+ if err != nil {
+ panic(err)
+ }
+ }
if latest.LatestTimeout != nil {
logger.Info(
"obtained latest consensus state",
diff --git a/node/consensus/global/message_collector.go b/node/consensus/global/message_collector.go
index 338e1d3..164fa61 100644
--- a/node/consensus/global/message_collector.go
+++ b/node/consensus/global/message_collector.go
@@ -234,6 +234,15 @@ func (e *GlobalConsensusEngine) addGlobalMessage(data []byte) {
return
}
+ // In prover-only mode, filter out non-prover messages
+ if e.proverOnlyMode.Load() {
+ bundle.Requests = e.filterProverOnlyRequests(bundle.Requests)
+ if len(bundle.Requests) == 0 {
+ // All requests were filtered out
+ return
+ }
+ }
+
if len(bundle.Requests) > maxGlobalMessagesPerFrame {
if e.logger != nil {
e.logger.Debug(
@@ -265,6 +274,49 @@ func (e *GlobalConsensusEngine) addGlobalMessage(data []byte) {
e.messageAggregator.Add(record)
}
+// filterProverOnlyRequests filters a list of message requests to only include
+// prover-related messages. This is used when in prover-only mode due to
+// insufficient coverage.
+func (e *GlobalConsensusEngine) filterProverOnlyRequests(
+ requests []*protobufs.MessageRequest,
+) []*protobufs.MessageRequest {
+ filtered := make([]*protobufs.MessageRequest, 0, len(requests))
+ droppedCount := 0
+
+ for _, req := range requests {
+ if req == nil || req.GetRequest() == nil {
+ continue
+ }
+
+ // Only allow prover-related message types
+ switch req.GetRequest().(type) {
+ case *protobufs.MessageRequest_Join,
+ *protobufs.MessageRequest_Leave,
+ *protobufs.MessageRequest_Pause,
+ *protobufs.MessageRequest_Resume,
+ *protobufs.MessageRequest_Confirm,
+ *protobufs.MessageRequest_Reject,
+ *protobufs.MessageRequest_Kick,
+ *protobufs.MessageRequest_Update:
+ // Prover messages are allowed
+ filtered = append(filtered, req)
+ default:
+ // All other messages are dropped in prover-only mode
+ droppedCount++
+ }
+ }
+
+ if droppedCount > 0 && e.logger != nil {
+ e.logger.Debug(
+ "dropped non-prover messages in prover-only mode",
+ zap.Int("dropped_count", droppedCount),
+ zap.Int("allowed_count", len(filtered)),
+ )
+ }
+
+ return filtered
+}
+
func (e *GlobalConsensusEngine) logBundleRequestTypes(
bundle *protobufs.MessageBundle,
) {
diff --git a/node/consensus/provers/prover_registry.go b/node/consensus/provers/prover_registry.go
index d5ffbc8..5d64e76 100644
--- a/node/consensus/provers/prover_registry.go
+++ b/node/consensus/provers/prover_registry.go
@@ -12,7 +12,6 @@ import (
"github.com/iden3/go-iden3-crypto/poseidon"
"github.com/pkg/errors"
"go.uber.org/zap"
- hgcrdt "source.quilibrium.com/quilibrium/monorepo/hypergraph"
"source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/global"
hgstate "source.quilibrium.com/quilibrium/monorepo/node/execution/state/hypergraph"
"source.quilibrium.com/quilibrium/monorepo/types/consensus"
@@ -414,19 +413,30 @@ func (r *ProverRegistry) PruneOrphanJoins(frameNumber uint64) error {
}
cutoff := frameNumber - 760
- var pruned int
+ var prunedAllocations int
+ var prunedProvers int
- set := r.hypergraph.(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(tries.ShardKey{
+ shardKey := tries.ShardKey{
L1: [3]byte{0x00, 0x00, 0x00},
L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)),
- })
+ }
- txn, err := set.GetTree().Store.NewTransaction(false)
+ txn, err := r.hypergraph.NewTransaction(false)
if err != nil {
return errors.Wrap(err, "prune orphan joins")
}
- for _, info := range r.proverCache {
+ // Track provers to remove from cache after pruning
+ proversToRemove := []string{}
+
+ r.logger.Debug(
+ "starting prune orphan joins scan",
+ zap.Uint64("frame_number", frameNumber),
+ zap.Uint64("cutoff", cutoff),
+ zap.Int("prover_cache_size", len(r.proverCache)),
+ )
+
+ for addr, info := range r.proverCache {
if info == nil || len(info.Allocations) == 0 {
continue
}
@@ -435,7 +445,20 @@ func (r *ProverRegistry) PruneOrphanJoins(frameNumber uint64) error {
var removedFilters map[string]struct{}
for _, allocation := range info.Allocations {
- if allocation.Status == consensus.ProverStatusJoining &&
+ // Log each allocation being evaluated
+ r.logger.Debug(
+ "evaluating allocation for prune",
+ zap.String("prover_address", hex.EncodeToString(info.Address)),
+ zap.Int("status", int(allocation.Status)),
+ zap.Uint64("join_frame", allocation.JoinFrameNumber),
+ zap.Uint64("cutoff", cutoff),
+ zap.Bool("is_joining", allocation.Status == consensus.ProverStatusJoining),
+ zap.Bool("is_rejected", allocation.Status == consensus.ProverStatusRejected),
+ zap.Bool("is_old_enough", allocation.JoinFrameNumber < cutoff),
+ )
+
+ if (allocation.Status == consensus.ProverStatusJoining ||
+ allocation.Status == consensus.ProverStatusRejected) &&
allocation.JoinFrameNumber < cutoff {
if err := r.pruneAllocationVertex(txn, info, allocation); err != nil {
txn.Abort()
@@ -446,7 +469,7 @@ func (r *ProverRegistry) PruneOrphanJoins(frameNumber uint64) error {
removedFilters = make(map[string]struct{})
}
removedFilters[string(allocation.ConfirmationFilter)] = struct{}{}
- pruned++
+ prunedAllocations++
continue
}
@@ -456,17 +479,33 @@ func (r *ProverRegistry) PruneOrphanJoins(frameNumber uint64) error {
if len(updated) != len(info.Allocations) {
info.Allocations = updated
r.cleanupFilterCache(info, removedFilters)
+
+ // If no allocations remain, prune the prover record as well
+ if len(updated) == 0 {
+ if err := r.pruneProverRecord(txn, shardKey, info); err != nil {
+ txn.Abort()
+ return errors.Wrap(err, "prune orphan joins")
+ }
+ proversToRemove = append(proversToRemove, addr)
+ prunedProvers++
+ }
}
}
- if pruned > 0 {
+ // Remove pruned provers from cache
+ for _, addr := range proversToRemove {
+ delete(r.proverCache, addr)
+ }
+
+ if prunedAllocations > 0 || prunedProvers > 0 {
if err := txn.Commit(); err != nil {
return errors.Wrap(err, "prune orphan joins")
}
r.logger.Info(
"pruned orphan prover allocations",
- zap.Int("allocations_pruned", pruned),
+ zap.Int("allocations_pruned", prunedAllocations),
+ zap.Int("provers_pruned", prunedProvers),
zap.Uint64("frame_cutoff", cutoff),
)
} else {
@@ -510,33 +549,96 @@ func (r *ProverRegistry) pruneAllocationVertex(
allocationHash.FillBytes(make([]byte, 32)),
)
- _, err = r.hypergraph.GetVertex(vertexID)
- if err != nil {
- if errors.Cause(err) == hypergraph.ErrRemoved {
- return nil
- }
+ shardKey := tries.ShardKey{
+ L1: [3]byte{0x00, 0x00, 0x00},
+ L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)),
+ }
+
+ // Use DeleteVertexAdd which properly handles locking and deletes both
+ // the tree entry and the vertex data atomically
+ if err := r.hypergraph.DeleteVertexAdd(txn, shardKey, vertexID); err != nil {
r.logger.Debug(
- "allocation vertex missing during prune",
+ "could not delete allocation vertex during prune",
zap.String("address", hex.EncodeToString(info.Address)),
zap.String(
"filter",
hex.EncodeToString(allocation.ConfirmationFilter),
),
+ zap.String("vertex_id", hex.EncodeToString(vertexID[:])),
zap.Error(err),
)
- return nil
+ // Don't return error - the vertex may already be deleted
+ } else {
+ r.logger.Debug(
+ "deleted allocation vertex during prune",
+ zap.String("address", hex.EncodeToString(info.Address)),
+ zap.String(
+ "filter",
+ hex.EncodeToString(allocation.ConfirmationFilter),
+ ),
+ zap.String("vertex_id", hex.EncodeToString(vertexID[:])),
+ )
}
- set := r.hypergraph.(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(tries.ShardKey{
- L1: [3]byte{0x00, 0x00, 0x00},
- L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)),
- })
+ return nil
+}
- vtree := set.GetTree()
- if err := vtree.Delete(txn, vertexID[:]); err != nil {
- return errors.Wrap(err, "prune allocation remove vertex")
+// pruneProverRecord removes a prover's vertex, hyperedge, and associated data
+// when all of its allocations have been pruned.
+func (r *ProverRegistry) pruneProverRecord(
+ txn tries.TreeBackingStoreTransaction,
+ shardKey tries.ShardKey,
+ info *consensus.ProverInfo,
+) error {
+ if info == nil || len(info.Address) == 0 {
+ return errors.New("missing prover info")
}
+ // Construct the prover vertex ID
+ var proverVertexID [64]byte
+ copy(proverVertexID[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:])
+ copy(proverVertexID[32:], info.Address)
+
+ // Delete prover vertex using DeleteVertexAdd which properly handles locking
+ // and deletes both the tree entry and vertex data atomically
+ if err := r.hypergraph.DeleteVertexAdd(txn, shardKey, proverVertexID); err != nil {
+ r.logger.Debug(
+ "could not delete prover vertex during prune",
+ zap.String("address", hex.EncodeToString(info.Address)),
+ zap.String("vertex_id", hex.EncodeToString(proverVertexID[:])),
+ zap.Error(err),
+ )
+ // Don't return error - the vertex may already be deleted
+ } else {
+ r.logger.Debug(
+ "deleted prover vertex during prune",
+ zap.String("address", hex.EncodeToString(info.Address)),
+ zap.String("vertex_id", hex.EncodeToString(proverVertexID[:])),
+ )
+ }
+
+ // Delete prover hyperedge using DeleteHyperedgeAdd
+ if err := r.hypergraph.DeleteHyperedgeAdd(txn, shardKey, proverVertexID); err != nil {
+ r.logger.Debug(
+ "could not delete prover hyperedge during prune",
+ zap.String("address", hex.EncodeToString(info.Address)),
+ zap.String("hyperedge_id", hex.EncodeToString(proverVertexID[:])),
+ zap.Error(err),
+ )
+ // Don't return error - the hyperedge may already be deleted
+ } else {
+ r.logger.Debug(
+ "deleted prover hyperedge during prune",
+ zap.String("address", hex.EncodeToString(info.Address)),
+ zap.String("hyperedge_id", hex.EncodeToString(proverVertexID[:])),
+ )
+ }
+
+ r.logger.Debug(
+ "pruned prover record",
+ zap.String("address", hex.EncodeToString(info.Address)),
+ )
+
return nil
}
diff --git a/node/consensus/provers/prover_registry_test.go b/node/consensus/provers/prover_registry_test.go
index 9cb9e3c..8c0d5f7 100644
--- a/node/consensus/provers/prover_registry_test.go
+++ b/node/consensus/provers/prover_registry_test.go
@@ -2,6 +2,9 @@ package provers
import (
"bytes"
+ "encoding/binary"
+ "encoding/hex"
+ "fmt"
"math/big"
"slices"
"testing"
@@ -11,10 +14,18 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
+ "source.quilibrium.com/quilibrium/monorepo/bls48581"
+ "source.quilibrium.com/quilibrium/monorepo/config"
+ hgcrdt "source.quilibrium.com/quilibrium/monorepo/hypergraph"
+ "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/global"
+ "source.quilibrium.com/quilibrium/monorepo/node/store"
"source.quilibrium.com/quilibrium/monorepo/node/tests"
"source.quilibrium.com/quilibrium/monorepo/types/consensus"
+ "source.quilibrium.com/quilibrium/monorepo/types/execution/intrinsics"
"source.quilibrium.com/quilibrium/monorepo/types/mocks"
+ "source.quilibrium.com/quilibrium/monorepo/types/schema"
"source.quilibrium.com/quilibrium/monorepo/types/tries"
+ "source.quilibrium.com/quilibrium/monorepo/verenc"
)
type mockIterator struct {
@@ -171,3 +182,1220 @@ func TestProverRegistryWithShards(t *testing.T) {
assert.Equal(t, 0, count)
})
}
+
+// TestPruneOrphanJoins_Comprehensive tests the pruning of orphan prover joins
+// with a comprehensive scenario covering global provers, app shard provers,
+// and mixed allocation states.
+func TestPruneOrphanJoins_Comprehensive(t *testing.T) {
+ logger := zap.NewNop()
+
+ // Create stores with in-memory pebble DB
+ pebbleDB := store.NewPebbleDB(
+ logger,
+ &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/prune_orphan"},
+ 0,
+ )
+ defer pebbleDB.Close()
+
+ // Create inclusion prover and verifiable encryptor
+ inclusionProver := bls48581.NewKZGInclusionProver(logger)
+ verifiableEncryptor := verenc.NewMPCitHVerifiableEncryptor(1)
+
+ // Create hypergraph store and hypergraph
+ hypergraphStore := store.NewPebbleHypergraphStore(
+ &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/prune_orphan"},
+ pebbleDB,
+ logger,
+ verifiableEncryptor,
+ inclusionProver,
+ )
+ hg, err := hypergraphStore.LoadHypergraph(&tests.Nopthenticator{}, 1)
+ require.NoError(t, err)
+
+ // Create RDF multiprover for setting up test data
+ rdfMultiprover := schema.NewRDFMultiprover(
+ &schema.TurtleRDFParser{},
+ inclusionProver,
+ )
+
+ // Current frame for testing - pruning will use cutoff = currentFrame - 760
+ // For currentFrame=1000, cutoff = 240
+ // Allocations with JoinFrameNumber < 240 will be pruned
+ const currentFrame = uint64(1000)
+ const oldJoinFrame = uint64(100) // 100 < 240, will be pruned
+ const recentJoinFrame = currentFrame - 10 // 990 > 240, will NOT be pruned
+
+ type allocationSpec struct {
+ filter []byte
+ joinFrame uint64
+ status byte // 0=Joining, 1=Active
+ }
+
+ // Helper to create a prover with specific allocations
+ createProverWithAllocations := func(
+ publicKey []byte,
+ proverStatus byte,
+ allocations []allocationSpec,
+ ) ([]byte, error) {
+ proverAddressBI, err := poseidon.HashBytes(publicKey)
+ if err != nil {
+ return nil, err
+ }
+ proverAddress := proverAddressBI.FillBytes(make([]byte, 32))
+
+ hgCRDT := hg.(*hgcrdt.HypergraphCRDT)
+ txn, err := hgCRDT.NewTransaction(false)
+ if err != nil {
+ return nil, err
+ }
+
+ // Create prover vertex
+ proverTree := &tries.VectorCommitmentTree{}
+ err = rdfMultiprover.Set(
+ global.GLOBAL_RDF_SCHEMA,
+ intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
+ "prover:Prover",
+ "PublicKey",
+ publicKey,
+ proverTree,
+ )
+ if err != nil {
+ txn.Abort()
+ return nil, err
+ }
+
+ err = rdfMultiprover.Set(
+ global.GLOBAL_RDF_SCHEMA,
+ intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
+ "prover:Prover",
+ "Status",
+ []byte{proverStatus},
+ proverTree,
+ )
+ if err != nil {
+ txn.Abort()
+ return nil, err
+ }
+
+ availableStorageBytes := make([]byte, 8)
+ err = rdfMultiprover.Set(
+ global.GLOBAL_RDF_SCHEMA,
+ intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
+ "prover:Prover",
+ "AvailableStorage",
+ availableStorageBytes,
+ proverTree,
+ )
+ if err != nil {
+ txn.Abort()
+ return nil, err
+ }
+
+ seniorityBytes := make([]byte, 8)
+ err = rdfMultiprover.Set(
+ global.GLOBAL_RDF_SCHEMA,
+ intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
+ "prover:Prover",
+ "Seniority",
+ seniorityBytes,
+ proverTree,
+ )
+ if err != nil {
+ txn.Abort()
+ return nil, err
+ }
+
+ // Add prover vertex to hypergraph
+ proverVertex := hgcrdt.NewVertex(
+ intrinsics.GLOBAL_INTRINSIC_ADDRESS,
+ [32]byte(proverAddress),
+ proverTree.Commit(inclusionProver, false),
+ big.NewInt(0),
+ )
+ err = hg.AddVertex(txn, proverVertex)
+ if err != nil {
+ txn.Abort()
+ return nil, err
+ }
+
+ // Save prover vertex data
+ var proverVertexID [64]byte
+ copy(proverVertexID[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:])
+ copy(proverVertexID[32:], proverAddress)
+ err = hg.SetVertexData(txn, proverVertexID, proverTree)
+ if err != nil {
+ txn.Abort()
+ return nil, err
+ }
+
+ // Create hyperedge for prover
+ hyperedge := hgcrdt.NewHyperedge(
+ intrinsics.GLOBAL_INTRINSIC_ADDRESS,
+ [32]byte(proverAddress),
+ )
+
+ // Create allocation vertices for each allocation spec
+ for _, alloc := range allocations {
+ allocationAddressBI, err := poseidon.HashBytes(
+ slices.Concat([]byte("PROVER_ALLOCATION"), publicKey, alloc.filter),
+ )
+ if err != nil {
+ txn.Abort()
+ return nil, err
+ }
+ allocationAddress := allocationAddressBI.FillBytes(make([]byte, 32))
+
+ allocationTree := &tries.VectorCommitmentTree{}
+ err = rdfMultiprover.Set(
+ global.GLOBAL_RDF_SCHEMA,
+ intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
+ "allocation:ProverAllocation",
+ "Prover",
+ proverAddress,
+ allocationTree,
+ )
+ if err != nil {
+ txn.Abort()
+ return nil, err
+ }
+
+ err = rdfMultiprover.Set(
+ global.GLOBAL_RDF_SCHEMA,
+ intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
+ "allocation:ProverAllocation",
+ "Status",
+ []byte{alloc.status},
+ allocationTree,
+ )
+ if err != nil {
+ txn.Abort()
+ return nil, err
+ }
+
+ err = rdfMultiprover.Set(
+ global.GLOBAL_RDF_SCHEMA,
+ intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
+ "allocation:ProverAllocation",
+ "ConfirmationFilter",
+ alloc.filter,
+ allocationTree,
+ )
+ if err != nil {
+ txn.Abort()
+ return nil, err
+ }
+
+ frameNumberBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(frameNumberBytes, alloc.joinFrame)
+ err = rdfMultiprover.Set(
+ global.GLOBAL_RDF_SCHEMA,
+ intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
+ "allocation:ProverAllocation",
+ "JoinFrameNumber",
+ frameNumberBytes,
+ allocationTree,
+ )
+ if err != nil {
+ txn.Abort()
+ return nil, err
+ }
+
+ // Add allocation vertex
+ allocationVertex := hgcrdt.NewVertex(
+ intrinsics.GLOBAL_INTRINSIC_ADDRESS,
+ [32]byte(allocationAddress),
+ allocationTree.Commit(inclusionProver, false),
+ big.NewInt(0),
+ )
+ err = hg.AddVertex(txn, allocationVertex)
+ if err != nil {
+ txn.Abort()
+ return nil, err
+ }
+
+ // Save allocation vertex data
+ var allocationVertexID [64]byte
+ copy(allocationVertexID[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:])
+ copy(allocationVertexID[32:], allocationAddress)
+ err = hg.SetVertexData(txn, allocationVertexID, allocationTree)
+ if err != nil {
+ txn.Abort()
+ return nil, err
+ }
+
+ // Add allocation to hyperedge
+ hyperedge.AddExtrinsic(allocationVertex)
+ }
+
+ // Add hyperedge
+ err = hg.AddHyperedge(txn, hyperedge)
+ if err != nil {
+ txn.Abort()
+ return nil, err
+ }
+
+ err = txn.Commit()
+ if err != nil {
+ return nil, err
+ }
+
+ return proverAddress, nil
+ }
+
+ // Helper to check if vertex exists
+ vertexExists := func(vertexID [64]byte) bool {
+ _, err := hg.GetVertex(vertexID)
+ return err == nil
+ }
+
+ // Helper to check if vertex data exists
+ vertexDataExists := func(vertexID [64]byte) bool {
+ data, err := hg.GetVertexData(vertexID)
+ return err == nil && data != nil
+ }
+
+ // Helper to check if hyperedge exists
+ hyperedgeExists := func(hyperedgeID [64]byte) bool {
+ _, err := hg.GetHyperedge(hyperedgeID)
+ return err == nil
+ }
+
+ // Helper to compute prover vertex ID
+ getProverVertexID := func(proverAddress []byte) [64]byte {
+ var id [64]byte
+ copy(id[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:])
+ copy(id[32:], proverAddress)
+ return id
+ }
+
+ // Helper to compute allocation vertex ID
+ getAllocationVertexID := func(publicKey, filter []byte) [64]byte {
+ allocationHash, _ := poseidon.HashBytes(
+ slices.Concat([]byte("PROVER_ALLOCATION"), publicKey, filter),
+ )
+ var id [64]byte
+ copy(id[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:])
+ copy(id[32:], allocationHash.FillBytes(make([]byte, 32)))
+ return id
+ }
+
+ // ===== CREATE TEST DATA =====
+
+ // 1. Create 6 global provers with single allocation each, filter=nil, status=active
+ globalProverAddrs := make([][]byte, 6)
+ globalProverKeys := make([][]byte, 6)
+ for i := 0; i < 6; i++ {
+ publicKey := bytes.Repeat([]byte{byte(0x10 + i)}, 585)
+ globalProverKeys[i] = publicKey
+
+ proverAddr, err := createProverWithAllocations(
+ publicKey,
+ 1, // Active prover status
+ []allocationSpec{
+ {filter: nil, joinFrame: recentJoinFrame, status: 1}, // Active global allocation
+ },
+ )
+ require.NoError(t, err)
+ globalProverAddrs[i] = proverAddr
+ t.Logf("Created global prover %d at address: %s",
+ i, hex.EncodeToString(proverAddr))
+ }
+
+ // 2. Create 5 app shard provers with 100 allocations each, ALL with old join frame
+ // These should be completely pruned (prover and all allocations)
+ allOldProverAddrs := make([][]byte, 5)
+ allOldProverKeys := make([][]byte, 5)
+ allOldFilters := make([][][]byte, 5) // Store filters for each prover
+ for i := 0; i < 5; i++ {
+ publicKey := bytes.Repeat([]byte{byte(0x20 + i)}, 585)
+ allOldProverKeys[i] = publicKey
+
+ allocations := make([]allocationSpec, 100)
+ filters := make([][]byte, 100)
+ for j := 0; j < 100; j++ {
+ filter := []byte(fmt.Sprintf("shard_%d_%d", i, j))
+ filters[j] = filter
+ allocations[j] = allocationSpec{
+ filter: filter,
+ joinFrame: oldJoinFrame, // All old - will be pruned
+ status: 0, // Joining
+ }
+ }
+ allOldFilters[i] = filters
+
+ proverAddr, err := createProverWithAllocations(
+ publicKey,
+ 0, // Joining prover status
+ allocations,
+ )
+ require.NoError(t, err)
+ allOldProverAddrs[i] = proverAddr
+ t.Logf("Created all-old app shard prover %d at address: %s",
+ i, hex.EncodeToString(proverAddr))
+ }
+
+ // 3. Create 5 app shard provers with 100 allocations each:
+ // - 50 with old join frame (will be pruned)
+ // - 50 with recent join frame (will remain)
+ // Prover should remain but with only 50 allocations
+ mixedProverAddrs := make([][]byte, 5)
+ mixedProverKeys := make([][]byte, 5)
+ mixedOldFilters := make([][][]byte, 5) // Old filters that should be pruned
+ mixedNewFilters := make([][][]byte, 5) // Recent filters that should remain
+ for i := 0; i < 5; i++ {
+ publicKey := bytes.Repeat([]byte{byte(0x30 + i)}, 585)
+ mixedProverKeys[i] = publicKey
+
+ allocations := make([]allocationSpec, 100)
+ oldFilters := make([][]byte, 50)
+ newFilters := make([][]byte, 50)
+
+ for j := 0; j < 100; j++ {
+ filter := []byte(fmt.Sprintf("mixed_shard_%d_%d", i, j))
+ if j < 50 {
+ // First 50: old join frame - will be pruned
+ oldFilters[j] = filter
+ allocations[j] = allocationSpec{
+ filter: filter,
+ joinFrame: oldJoinFrame,
+ status: 0, // Joining
+ }
+ } else {
+ // Last 50: recent join frame - will remain
+ newFilters[j-50] = filter
+ allocations[j] = allocationSpec{
+ filter: filter,
+ joinFrame: recentJoinFrame,
+ status: 0, // Joining
+ }
+ }
+ }
+ mixedOldFilters[i] = oldFilters
+ mixedNewFilters[i] = newFilters
+
+ proverAddr, err := createProverWithAllocations(
+ publicKey,
+ 0, // Joining prover status
+ allocations,
+ )
+ require.NoError(t, err)
+ mixedProverAddrs[i] = proverAddr
+ t.Logf("Created mixed app shard prover %d at address: %s",
+ i, hex.EncodeToString(proverAddr))
+ }
+
+ // 4. Create 3 provers with rejected allocations (old join frame)
+ // These should be completely pruned like joining allocations
+ rejectedProverAddrs := make([][]byte, 3)
+ rejectedProverKeys := make([][]byte, 3)
+ rejectedFilters := make([][][]byte, 3)
+ for i := 0; i < 3; i++ {
+ publicKey := bytes.Repeat([]byte{byte(0x40 + i)}, 585)
+ rejectedProverKeys[i] = publicKey
+
+ allocations := make([]allocationSpec, 10)
+ filters := make([][]byte, 10)
+ for j := 0; j < 10; j++ {
+ filter := []byte(fmt.Sprintf("rejected_shard_%d_%d", i, j))
+ filters[j] = filter
+ allocations[j] = allocationSpec{
+ filter: filter,
+ joinFrame: oldJoinFrame,
+ status: 4, // Rejected
+ }
+ }
+ rejectedFilters[i] = filters
+
+ proverAddr, err := createProverWithAllocations(
+ publicKey,
+ 0, // Joining prover status
+ allocations,
+ )
+ require.NoError(t, err)
+ rejectedProverAddrs[i] = proverAddr
+ t.Logf("Created rejected prover %d at address: %s",
+ i, hex.EncodeToString(proverAddr))
+ }
+
+ // 5. Create 3 provers with MIXED Active and Joining allocations (all with old join frame)
+ // Active allocations should NOT be pruned even with old join frame
+ // Joining allocations should be pruned
+ // Prover should remain because it has Active allocations
+ mixedActiveJoiningProverAddrs := make([][]byte, 3)
+ mixedActiveJoiningProverKeys := make([][]byte, 3)
+ mixedActiveFilters := make([][][]byte, 3) // Active filters that should remain
+ mixedJoiningFilters := make([][][]byte, 3) // Joining filters that should be pruned
+ for i := 0; i < 3; i++ {
+ publicKey := bytes.Repeat([]byte{byte(0x50 + i)}, 585)
+ mixedActiveJoiningProverKeys[i] = publicKey
+
+ allocations := make([]allocationSpec, 20)
+ activeFilters := make([][]byte, 10)
+ joiningFilters := make([][]byte, 10)
+
+ for j := 0; j < 20; j++ {
+ filter := []byte(fmt.Sprintf("mixed_active_joining_%d_%d", i, j))
+ if j < 10 {
+ // First 10: Active status with old join frame - should NOT be pruned
+ activeFilters[j] = filter
+ allocations[j] = allocationSpec{
+ filter: filter,
+ joinFrame: oldJoinFrame, // Old, but Active so should remain
+ status: 1, // Active
+ }
+ } else {
+ // Last 10: Joining status with old join frame - should be pruned
+ joiningFilters[j-10] = filter
+ allocations[j] = allocationSpec{
+ filter: filter,
+ joinFrame: oldJoinFrame, // Old and Joining, so should be pruned
+ status: 0, // Joining
+ }
+ }
+ }
+ mixedActiveFilters[i] = activeFilters
+ mixedJoiningFilters[i] = joiningFilters
+
+ proverAddr, err := createProverWithAllocations(
+ publicKey,
+ 1, // Active prover status
+ allocations,
+ )
+ require.NoError(t, err)
+ mixedActiveJoiningProverAddrs[i] = proverAddr
+ t.Logf("Created mixed active/joining prover %d at address: %s",
+ i, hex.EncodeToString(proverAddr))
+ }
+
+ // 6. Create 2 provers with ALL Active allocations (old join frame)
+ // None of these should be pruned - Active status protects them
+ allActiveProverAddrs := make([][]byte, 2)
+ allActiveProverKeys := make([][]byte, 2)
+ allActiveFilters := make([][][]byte, 2)
+ for i := 0; i < 2; i++ {
+ publicKey := bytes.Repeat([]byte{byte(0x60 + i)}, 585)
+ allActiveProverKeys[i] = publicKey
+
+ allocations := make([]allocationSpec, 50)
+ filters := make([][]byte, 50)
+ for j := 0; j < 50; j++ {
+ filter := []byte(fmt.Sprintf("all_active_%d_%d", i, j))
+ filters[j] = filter
+ allocations[j] = allocationSpec{
+ filter: filter,
+ joinFrame: oldJoinFrame, // Old, but Active so should remain
+ status: 1, // Active
+ }
+ }
+ allActiveFilters[i] = filters
+
+ proverAddr, err := createProverWithAllocations(
+ publicKey,
+ 1, // Active prover status
+ allocations,
+ )
+ require.NoError(t, err)
+ allActiveProverAddrs[i] = proverAddr
+ t.Logf("Created all-active prover %d at address: %s",
+ i, hex.EncodeToString(proverAddr))
+ }
+
+ // ===== VERIFY INITIAL STATE =====
+
+ // Verify all global provers exist
+ for i := 0; i < 6; i++ {
+ proverID := getProverVertexID(globalProverAddrs[i])
+ allocID := getAllocationVertexID(globalProverKeys[i], nil)
+
+ assert.True(t, vertexExists(proverID),
+ "Global prover %d vertex should exist before prune", i)
+ assert.True(t, vertexDataExists(proverID),
+ "Global prover %d vertex data should exist before prune", i)
+ assert.True(t, vertexExists(allocID),
+ "Global prover %d allocation should exist before prune", i)
+ assert.True(t, hyperedgeExists(proverID),
+ "Global prover %d hyperedge should exist before prune", i)
+ }
+
+ // Verify all-old provers exist
+ for i := 0; i < 5; i++ {
+ proverID := getProverVertexID(allOldProverAddrs[i])
+ assert.True(t, vertexExists(proverID),
+ "All-old prover %d vertex should exist before prune", i)
+ assert.True(t, vertexDataExists(proverID),
+ "All-old prover %d vertex data should exist before prune", i)
+ assert.True(t, hyperedgeExists(proverID),
+ "All-old prover %d hyperedge should exist before prune", i)
+
+ // Verify all 100 allocations exist
+ for j := 0; j < 100; j++ {
+ allocID := getAllocationVertexID(allOldProverKeys[i], allOldFilters[i][j])
+ assert.True(t, vertexExists(allocID),
+ "All-old prover %d allocation %d should exist before prune", i, j)
+ }
+ }
+
+ // Verify mixed provers exist
+ for i := 0; i < 5; i++ {
+ proverID := getProverVertexID(mixedProverAddrs[i])
+ assert.True(t, vertexExists(proverID),
+ "Mixed prover %d vertex should exist before prune", i)
+ assert.True(t, vertexDataExists(proverID),
+ "Mixed prover %d vertex data should exist before prune", i)
+ assert.True(t, hyperedgeExists(proverID),
+ "Mixed prover %d hyperedge should exist before prune", i)
+
+ // Verify all 100 allocations exist
+ for j := 0; j < 50; j++ {
+ oldAllocID := getAllocationVertexID(mixedProverKeys[i], mixedOldFilters[i][j])
+ assert.True(t, vertexExists(oldAllocID),
+ "Mixed prover %d old allocation %d should exist before prune", i, j)
+
+ newAllocID := getAllocationVertexID(mixedProverKeys[i], mixedNewFilters[i][j])
+ assert.True(t, vertexExists(newAllocID),
+ "Mixed prover %d new allocation %d should exist before prune", i, j)
+ }
+ }
+
+ // Verify rejected provers exist
+ for i := 0; i < 3; i++ {
+ proverID := getProverVertexID(rejectedProverAddrs[i])
+ assert.True(t, vertexExists(proverID),
+ "Rejected prover %d vertex should exist before prune", i)
+ assert.True(t, vertexDataExists(proverID),
+ "Rejected prover %d vertex data should exist before prune", i)
+ assert.True(t, hyperedgeExists(proverID),
+ "Rejected prover %d hyperedge should exist before prune", i)
+
+ // Verify all 10 rejected allocations exist
+ for j := 0; j < 10; j++ {
+ allocID := getAllocationVertexID(rejectedProverKeys[i], rejectedFilters[i][j])
+ assert.True(t, vertexExists(allocID),
+ "Rejected prover %d allocation %d should exist before prune", i, j)
+ }
+ }
+
+ // Verify mixed active/joining provers exist
+ for i := 0; i < 3; i++ {
+ proverID := getProverVertexID(mixedActiveJoiningProverAddrs[i])
+ assert.True(t, vertexExists(proverID),
+ "Mixed active/joining prover %d vertex should exist before prune", i)
+ assert.True(t, vertexDataExists(proverID),
+ "Mixed active/joining prover %d vertex data should exist before prune", i)
+ assert.True(t, hyperedgeExists(proverID),
+ "Mixed active/joining prover %d hyperedge should exist before prune", i)
+
+ // Verify all 10 active allocations exist
+ for j := 0; j < 10; j++ {
+ allocID := getAllocationVertexID(mixedActiveJoiningProverKeys[i], mixedActiveFilters[i][j])
+ assert.True(t, vertexExists(allocID),
+ "Mixed active/joining prover %d active allocation %d should exist before prune", i, j)
+ }
+ // Verify all 10 joining allocations exist
+ for j := 0; j < 10; j++ {
+ allocID := getAllocationVertexID(mixedActiveJoiningProverKeys[i], mixedJoiningFilters[i][j])
+ assert.True(t, vertexExists(allocID),
+ "Mixed active/joining prover %d joining allocation %d should exist before prune", i, j)
+ }
+ }
+
+ // Verify all-active provers exist
+ for i := 0; i < 2; i++ {
+ proverID := getProverVertexID(allActiveProverAddrs[i])
+ assert.True(t, vertexExists(proverID),
+ "All-active prover %d vertex should exist before prune", i)
+ assert.True(t, vertexDataExists(proverID),
+ "All-active prover %d vertex data should exist before prune", i)
+ assert.True(t, hyperedgeExists(proverID),
+ "All-active prover %d hyperedge should exist before prune", i)
+
+ // Verify all 50 active allocations exist
+ for j := 0; j < 50; j++ {
+ allocID := getAllocationVertexID(allActiveProverKeys[i], allActiveFilters[i][j])
+ assert.True(t, vertexExists(allocID),
+ "All-active prover %d allocation %d should exist before prune", i, j)
+ }
+ }
+
+ // ===== CREATE REGISTRY AND PRUNE =====
+
+ registry, err := NewProverRegistry(logger, hg)
+ require.NoError(t, err)
+
+ // Run pruning
+ err = registry.PruneOrphanJoins(currentFrame)
+ require.NoError(t, err)
+
+ // ===== VERIFY POST-PRUNE STATE =====
+
+ // 1. Verify global provers are COMPLETELY UNTOUCHED
+ for i := 0; i < 6; i++ {
+ proverID := getProverVertexID(globalProverAddrs[i])
+ allocID := getAllocationVertexID(globalProverKeys[i], nil)
+
+ assert.True(t, vertexExists(proverID),
+ "Global prover %d vertex should STILL exist after prune", i)
+ assert.True(t, vertexDataExists(proverID),
+ "Global prover %d vertex data should STILL exist after prune", i)
+ assert.True(t, vertexExists(allocID),
+ "Global prover %d allocation should STILL exist after prune", i)
+ assert.True(t, vertexDataExists(allocID),
+ "Global prover %d allocation data should STILL exist after prune", i)
+ assert.True(t, hyperedgeExists(proverID),
+ "Global prover %d hyperedge should STILL exist after prune", i)
+ }
+
+ // 2. Verify all-old provers are COMPLETELY PRUNED (prover vertex gone too)
+ for i := 0; i < 5; i++ {
+ proverID := getProverVertexID(allOldProverAddrs[i])
+
+ assert.False(t, vertexExists(proverID),
+ "All-old prover %d vertex should be DELETED after prune", i)
+ assert.False(t, vertexDataExists(proverID),
+ "All-old prover %d vertex data should be DELETED after prune", i)
+ assert.False(t, hyperedgeExists(proverID),
+ "All-old prover %d hyperedge should be DELETED after prune", i)
+
+ // Verify all 100 allocations are deleted
+ for j := 0; j < 100; j++ {
+ allocID := getAllocationVertexID(allOldProverKeys[i], allOldFilters[i][j])
+ assert.False(t, vertexExists(allocID),
+ "All-old prover %d allocation %d should be DELETED after prune", i, j)
+ assert.False(t, vertexDataExists(allocID),
+ "All-old prover %d allocation %d data should be DELETED after prune", i, j)
+ }
+ }
+
+ // 3. Verify mixed provers: prover remains, old allocations pruned, new allocations remain
+ for i := 0; i < 5; i++ {
+ proverID := getProverVertexID(mixedProverAddrs[i])
+
+ assert.True(t, vertexExists(proverID),
+ "Mixed prover %d vertex should STILL exist after prune", i)
+ assert.True(t, vertexDataExists(proverID),
+ "Mixed prover %d vertex data should STILL exist after prune", i)
+ assert.True(t, hyperedgeExists(proverID),
+ "Mixed prover %d hyperedge should STILL exist after prune", i)
+
+ // Verify old allocations are deleted
+ for j := 0; j < 50; j++ {
+ oldAllocID := getAllocationVertexID(mixedProverKeys[i], mixedOldFilters[i][j])
+ assert.False(t, vertexExists(oldAllocID),
+ "Mixed prover %d old allocation %d should be DELETED after prune", i, j)
+ assert.False(t, vertexDataExists(oldAllocID),
+ "Mixed prover %d old allocation %d data should be DELETED after prune", i, j)
+ }
+
+ // Verify new allocations remain
+ for j := 0; j < 50; j++ {
+ newAllocID := getAllocationVertexID(mixedProverKeys[i], mixedNewFilters[i][j])
+ assert.True(t, vertexExists(newAllocID),
+ "Mixed prover %d new allocation %d should STILL exist after prune", i, j)
+ assert.True(t, vertexDataExists(newAllocID),
+ "Mixed prover %d new allocation %d data should STILL exist after prune", i, j)
+ }
+ }
+
+ // 4. Verify rejected provers are COMPLETELY PRUNED (prover vertex gone too)
+ for i := 0; i < 3; i++ {
+ proverID := getProverVertexID(rejectedProverAddrs[i])
+
+ assert.False(t, vertexExists(proverID),
+ "Rejected prover %d vertex should be DELETED after prune", i)
+ assert.False(t, vertexDataExists(proverID),
+ "Rejected prover %d vertex data should be DELETED after prune", i)
+ assert.False(t, hyperedgeExists(proverID),
+ "Rejected prover %d hyperedge should be DELETED after prune", i)
+
+ // Verify all 10 rejected allocations are deleted
+ for j := 0; j < 10; j++ {
+ allocID := getAllocationVertexID(rejectedProverKeys[i], rejectedFilters[i][j])
+ assert.False(t, vertexExists(allocID),
+ "Rejected prover %d allocation %d should be DELETED after prune", i, j)
+ assert.False(t, vertexDataExists(allocID),
+ "Rejected prover %d allocation %d data should be DELETED after prune", i, j)
+ }
+ }
+
+ // 5. Verify mixed active/joining provers: prover remains, Active allocations remain,
+ // Joining allocations are pruned
+ for i := 0; i < 3; i++ {
+ proverID := getProverVertexID(mixedActiveJoiningProverAddrs[i])
+
+ assert.True(t, vertexExists(proverID),
+ "Mixed active/joining prover %d vertex should STILL exist after prune", i)
+ assert.True(t, vertexDataExists(proverID),
+ "Mixed active/joining prover %d vertex data should STILL exist after prune", i)
+ assert.True(t, hyperedgeExists(proverID),
+ "Mixed active/joining prover %d hyperedge should STILL exist after prune", i)
+
+ // Verify Active allocations REMAIN (not pruned despite old join frame)
+ for j := 0; j < 10; j++ {
+ activeAllocID := getAllocationVertexID(mixedActiveJoiningProverKeys[i], mixedActiveFilters[i][j])
+ assert.True(t, vertexExists(activeAllocID),
+ "Mixed active/joining prover %d ACTIVE allocation %d should STILL exist after prune", i, j)
+ assert.True(t, vertexDataExists(activeAllocID),
+ "Mixed active/joining prover %d ACTIVE allocation %d data should STILL exist after prune", i, j)
+ }
+
+ // Verify Joining allocations are DELETED
+ for j := 0; j < 10; j++ {
+ joiningAllocID := getAllocationVertexID(mixedActiveJoiningProverKeys[i], mixedJoiningFilters[i][j])
+ assert.False(t, vertexExists(joiningAllocID),
+ "Mixed active/joining prover %d JOINING allocation %d should be DELETED after prune", i, j)
+ assert.False(t, vertexDataExists(joiningAllocID),
+ "Mixed active/joining prover %d JOINING allocation %d data should be DELETED after prune", i, j)
+ }
+ }
+
+ // 6. Verify all-active provers are COMPLETELY UNTOUCHED
+ // Active allocations with old join frame should NOT be pruned
+ for i := 0; i < 2; i++ {
+ proverID := getProverVertexID(allActiveProverAddrs[i])
+
+ assert.True(t, vertexExists(proverID),
+ "All-active prover %d vertex should STILL exist after prune", i)
+ assert.True(t, vertexDataExists(proverID),
+ "All-active prover %d vertex data should STILL exist after prune", i)
+ assert.True(t, hyperedgeExists(proverID),
+ "All-active prover %d hyperedge should STILL exist after prune", i)
+
+ // Verify all 50 Active allocations REMAIN
+ for j := 0; j < 50; j++ {
+ allocID := getAllocationVertexID(allActiveProverKeys[i], allActiveFilters[i][j])
+ assert.True(t, vertexExists(allocID),
+ "All-active prover %d allocation %d should STILL exist after prune", i, j)
+ assert.True(t, vertexDataExists(allocID),
+ "All-active prover %d allocation %d data should STILL exist after prune", i, j)
+ }
+ }
+
+ // ===== VERIFY REGISTRY CACHE STATE =====
+
+ // Global provers should still be in cache
+ for i := 0; i < 6; i++ {
+ info, err := registry.GetProverInfo(globalProverAddrs[i])
+ require.NoError(t, err)
+ assert.NotNil(t, info, "Global prover %d should still be in registry cache", i)
+ assert.Len(t, info.Allocations, 1, "Global prover %d should have 1 allocation", i)
+ }
+
+ // All-old provers should be removed from cache
+ for i := 0; i < 5; i++ {
+ info, err := registry.GetProverInfo(allOldProverAddrs[i])
+ require.NoError(t, err)
+ assert.Nil(t, info, "All-old prover %d should be removed from registry cache", i)
+ }
+
+ // Mixed provers should still be in cache with only 50 allocations
+ for i := 0; i < 5; i++ {
+ info, err := registry.GetProverInfo(mixedProverAddrs[i])
+ require.NoError(t, err)
+ assert.NotNil(t, info, "Mixed prover %d should still be in registry cache", i)
+ assert.Len(t, info.Allocations, 50,
+ "Mixed prover %d should have 50 allocations after prune", i)
+ }
+
+ // Rejected provers should be removed from cache
+ for i := 0; i < 3; i++ {
+ info, err := registry.GetProverInfo(rejectedProverAddrs[i])
+ require.NoError(t, err)
+ assert.Nil(t, info, "Rejected prover %d should be removed from registry cache", i)
+ }
+
+ // Mixed active/joining provers should still be in cache with only 10 allocations (the Active ones)
+ for i := 0; i < 3; i++ {
+ info, err := registry.GetProverInfo(mixedActiveJoiningProverAddrs[i])
+ require.NoError(t, err)
+ assert.NotNil(t, info, "Mixed active/joining prover %d should still be in registry cache", i)
+ assert.Len(t, info.Allocations, 10,
+ "Mixed active/joining prover %d should have 10 allocations (Active ones) after prune", i)
+
+ // Verify all remaining allocations are Active status
+ for _, alloc := range info.Allocations {
+ assert.Equal(t, consensus.ProverStatusActive, alloc.Status,
+ "Mixed active/joining prover %d should only have Active allocations remaining", i)
+ }
+ }
+
+ // All-active provers should still be in cache with all 50 allocations
+ for i := 0; i < 2; i++ {
+ info, err := registry.GetProverInfo(allActiveProverAddrs[i])
+ require.NoError(t, err)
+ assert.NotNil(t, info, "All-active prover %d should still be in registry cache", i)
+ assert.Len(t, info.Allocations, 50,
+ "All-active prover %d should still have all 50 allocations after prune", i)
+
+ // Verify all allocations are Active status
+ for _, alloc := range info.Allocations {
+ assert.Equal(t, consensus.ProverStatusActive, alloc.Status,
+ "All-active prover %d should only have Active allocations", i)
+ }
+ }
+
+ // ===== VERIFY THROUGH ADDITIONAL REGISTRY METHODS =====
+
+ // Verify all-old provers don't appear in GetProversByStatus(Joining)
+ joiningProvers, err := registry.GetProversByStatus(nil, consensus.ProverStatusJoining)
+ require.NoError(t, err)
+ for _, prover := range joiningProvers {
+ for i, addr := range allOldProverAddrs {
+ assert.NotEqual(t, hex.EncodeToString(addr), hex.EncodeToString(prover.Address),
+ "All-old prover %d should not appear in GetProversByStatus(Joining)", i)
+ }
+ for i, addr := range rejectedProverAddrs {
+ assert.NotEqual(t, hex.EncodeToString(addr), hex.EncodeToString(prover.Address),
+ "Rejected prover %d should not appear in GetProversByStatus(Joining)", i)
+ }
+ }
+
+ // Verify all-old provers don't appear in GetProvers for their filters
+ for i := 0; i < 5; i++ {
+ for j := 0; j < 100; j++ {
+ filter := allOldFilters[i][j]
+ provers, err := registry.GetProvers(filter)
+ require.NoError(t, err)
+ for _, p := range provers {
+ assert.NotEqual(t, hex.EncodeToString(allOldProverAddrs[i]), hex.EncodeToString(p.Address),
+ "All-old prover %d should not appear in GetProvers for filter %d", i, j)
+ }
+ }
+ }
+
+ // Verify all-active provers appear in GetProversByStatus(Active) for their specific filters
+ // Note: GetProversByStatus(nil, ...) only returns global provers (filter=nil)
+ // The all-active provers are on app shards, so we need to check their specific filters
+ for i := 0; i < 2; i++ {
+ for j := 0; j < 50; j++ {
+ filter := allActiveFilters[i][j]
+ activeProvers, err := registry.GetProversByStatus(filter, consensus.ProverStatusActive)
+ require.NoError(t, err)
+ found := false
+ for _, prover := range activeProvers {
+ if hex.EncodeToString(allActiveProverAddrs[i]) == hex.EncodeToString(prover.Address) {
+ found = true
+ break
+ }
+ }
+ assert.True(t, found,
+ "All-active prover %d should appear in GetProversByStatus(Active) for filter %d", i, j)
+ }
+ }
+
+ t.Logf("Prune test completed successfully:")
+ t.Logf(" - 6 global provers: untouched")
+ t.Logf(" - 5 all-old provers: completely pruned (500 allocations)")
+ t.Logf(" - 5 mixed provers: 250 old allocations pruned, 250 recent allocations remain")
+ t.Logf(" - 3 rejected provers: completely pruned (30 rejected allocations)")
+ t.Logf(" - 3 mixed active/joining provers: 30 Joining allocations pruned, 30 Active allocations remain")
+ t.Logf(" - 2 all-active provers: untouched (100 Active allocations remain)")
+}
+
+// TestPruneOrphanJoins_IncompleteState tests the scenario where a previous prune
+// deleted the vertex ID set entry but not the vertex data (simulating the original bug).
+// The registry should still be able to prune these allocations by cleaning up the
+// orphaned vertex data.
+func TestPruneOrphanJoins_IncompleteState(t *testing.T) {
+ logger := zap.NewNop()
+
+ // Create stores with in-memory pebble DB
+ pebbleDB := store.NewPebbleDB(
+ logger,
+ &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/prune_incomplete"},
+ 0,
+ )
+ defer pebbleDB.Close()
+
+ // Create inclusion prover and verifiable encryptor
+ inclusionProver := bls48581.NewKZGInclusionProver(logger)
+ verifiableEncryptor := verenc.NewMPCitHVerifiableEncryptor(1)
+
+ // Create hypergraph store and hypergraph
+ hypergraphStore := store.NewPebbleHypergraphStore(
+ &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/prune_incomplete"},
+ pebbleDB,
+ logger,
+ verifiableEncryptor,
+ inclusionProver,
+ )
+ hg, err := hypergraphStore.LoadHypergraph(&tests.Nopthenticator{}, 1)
+ require.NoError(t, err)
+
+ // Create RDF multiprover for setting up test data
+ rdfMultiprover := schema.NewRDFMultiprover(
+ &schema.TurtleRDFParser{},
+ inclusionProver,
+ )
+
+ const currentFrame = uint64(1000)
+ const oldJoinFrame = uint64(100) // Will be pruned
+
+ // Helper to create a prover with allocations, returning the prover address
+ createProverWithAllocations := func(
+ publicKey []byte,
+ filters [][]byte,
+ joinFrame uint64,
+ ) ([]byte, error) {
+ proverAddressBI, err := poseidon.HashBytes(publicKey)
+ if err != nil {
+ return nil, err
+ }
+ proverAddress := proverAddressBI.FillBytes(make([]byte, 32))
+
+ hgCRDT := hg.(*hgcrdt.HypergraphCRDT)
+ txn, err := hgCRDT.NewTransaction(false)
+ if err != nil {
+ return nil, err
+ }
+
+ // Create prover vertex
+ proverTree := &tries.VectorCommitmentTree{}
+ _ = rdfMultiprover.Set(global.GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
+ "prover:Prover", "PublicKey", publicKey, proverTree)
+ _ = rdfMultiprover.Set(global.GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
+ "prover:Prover", "Status", []byte{0}, proverTree)
+ _ = rdfMultiprover.Set(global.GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
+ "prover:Prover", "AvailableStorage", make([]byte, 8), proverTree)
+ _ = rdfMultiprover.Set(global.GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
+ "prover:Prover", "Seniority", make([]byte, 8), proverTree)
+
+ proverVertex := hgcrdt.NewVertex(
+ intrinsics.GLOBAL_INTRINSIC_ADDRESS,
+ [32]byte(proverAddress),
+ proverTree.Commit(inclusionProver, false),
+ big.NewInt(0),
+ )
+ if err := hg.AddVertex(txn, proverVertex); err != nil {
+ txn.Abort()
+ return nil, err
+ }
+
+ var proverVertexID [64]byte
+ copy(proverVertexID[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:])
+ copy(proverVertexID[32:], proverAddress)
+ if err := hg.SetVertexData(txn, proverVertexID, proverTree); err != nil {
+ txn.Abort()
+ return nil, err
+ }
+
+ hyperedge := hgcrdt.NewHyperedge(
+ intrinsics.GLOBAL_INTRINSIC_ADDRESS,
+ [32]byte(proverAddress),
+ )
+
+ // Create allocation vertices
+ for _, filter := range filters {
+ allocationAddressBI, _ := poseidon.HashBytes(
+ slices.Concat([]byte("PROVER_ALLOCATION"), publicKey, filter),
+ )
+ allocationAddress := allocationAddressBI.FillBytes(make([]byte, 32))
+
+ allocationTree := &tries.VectorCommitmentTree{}
+ _ = rdfMultiprover.Set(global.GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
+ "allocation:ProverAllocation", "Prover", proverAddress, allocationTree)
+ _ = rdfMultiprover.Set(global.GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
+ "allocation:ProverAllocation", "Status", []byte{0}, allocationTree)
+ _ = rdfMultiprover.Set(global.GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
+ "allocation:ProverAllocation", "ConfirmationFilter", filter, allocationTree)
+
+ frameNumberBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(frameNumberBytes, joinFrame)
+ _ = rdfMultiprover.Set(global.GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
+ "allocation:ProverAllocation", "JoinFrameNumber", frameNumberBytes, allocationTree)
+
+ allocationVertex := hgcrdt.NewVertex(
+ intrinsics.GLOBAL_INTRINSIC_ADDRESS,
+ [32]byte(allocationAddress),
+ allocationTree.Commit(inclusionProver, false),
+ big.NewInt(0),
+ )
+ if err := hg.AddVertex(txn, allocationVertex); err != nil {
+ txn.Abort()
+ return nil, err
+ }
+
+ var allocationVertexID [64]byte
+ copy(allocationVertexID[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:])
+ copy(allocationVertexID[32:], allocationAddress)
+ if err := hg.SetVertexData(txn, allocationVertexID, allocationTree); err != nil {
+ txn.Abort()
+ return nil, err
+ }
+
+ hyperedge.AddExtrinsic(allocationVertex)
+ }
+
+ if err := hg.AddHyperedge(txn, hyperedge); err != nil {
+ txn.Abort()
+ return nil, err
+ }
+
+ if err := txn.Commit(); err != nil {
+ return nil, err
+ }
+
+ return proverAddress, nil
+ }
+
+ // Helper to delete ONLY the vertex ID set entry (not the vertex data)
+ // This simulates the state after a previous incomplete prune
+ deleteVertexIDSetOnly := func(publicKey []byte, filter []byte) error {
+ allocationAddressBI, _ := poseidon.HashBytes(
+ slices.Concat([]byte("PROVER_ALLOCATION"), publicKey, filter),
+ )
+ allocationAddress := allocationAddressBI.FillBytes(make([]byte, 32))
+
+ var vertexID [64]byte
+ copy(vertexID[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:])
+ copy(vertexID[32:], allocationAddress)
+
+ hgCRDT := hg.(*hgcrdt.HypergraphCRDT)
+ txn, err := hgCRDT.NewTransaction(false)
+ if err != nil {
+ return err
+ }
+
+ shardKey := tries.ShardKey{
+ L1: [3]byte{0x00, 0x00, 0x00},
+ L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)),
+ }
+ vtree := hgCRDT.GetVertexAddsSet(shardKey).GetTree()
+
+ // Delete from ID set only
+ if err := vtree.Delete(txn, vertexID[:]); err != nil {
+ txn.Abort()
+ return err
+ }
+
+ return txn.Commit()
+ }
+
+ // Helper to check if vertex exists in ID set
+ vertexExistsInIDSet := func(vertexID [64]byte) bool {
+ _, err := hg.GetVertex(vertexID)
+ return err == nil
+ }
+
+ // Helper to check if vertex data exists
+ vertexDataExists := func(vertexID [64]byte) bool {
+ data, err := hg.GetVertexData(vertexID)
+ return err == nil && data != nil
+ }
+
+ // Helper to compute allocation vertex ID
+ getAllocationVertexID := func(publicKey, filter []byte) [64]byte {
+ allocationHash, _ := poseidon.HashBytes(
+ slices.Concat([]byte("PROVER_ALLOCATION"), publicKey, filter),
+ )
+ var id [64]byte
+ copy(id[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:])
+ copy(id[32:], allocationHash.FillBytes(make([]byte, 32)))
+ return id
+ }
+
+ // Create a prover with 5 allocations
+ publicKey := bytes.Repeat([]byte{0x50}, 585)
+ filters := [][]byte{
+ []byte("incomplete_filter_0"),
+ []byte("incomplete_filter_1"),
+ []byte("incomplete_filter_2"),
+ []byte("incomplete_filter_3"),
+ []byte("incomplete_filter_4"),
+ }
+
+ proverAddr, err := createProverWithAllocations(publicKey, filters, oldJoinFrame)
+ require.NoError(t, err)
+ t.Logf("Created prover at address: %s", hex.EncodeToString(proverAddr))
+
+ // Verify all allocations exist (both ID set and data)
+ for i, filter := range filters {
+ allocID := getAllocationVertexID(publicKey, filter)
+ assert.True(t, vertexExistsInIDSet(allocID),
+ "Allocation %d should exist in ID set before manipulation", i)
+ assert.True(t, vertexDataExists(allocID),
+ "Allocation %d data should exist before manipulation", i)
+ }
+
+ // Now delete the ID set entries for allocations 0, 1, and 2, but leave their vertex data
+ // This simulates the state after an incomplete prune (the original bug)
+ for i := 0; i < 3; i++ {
+ err := deleteVertexIDSetOnly(publicKey, filters[i])
+ require.NoError(t, err)
+ t.Logf("Deleted ID set entry for allocation %d (leaving vertex data)", i)
+ }
+
+ // Verify the incomplete state: ID set entries gone, but data remains
+ for i := 0; i < 3; i++ {
+ allocID := getAllocationVertexID(publicKey, filters[i])
+ assert.False(t, vertexExistsInIDSet(allocID),
+ "Allocation %d should NOT exist in ID set after manipulation", i)
+ assert.True(t, vertexDataExists(allocID),
+ "Allocation %d data should STILL exist after manipulation (orphaned)", i)
+ }
+
+ // Allocations 3 and 4 should still be complete
+ for i := 3; i < 5; i++ {
+ allocID := getAllocationVertexID(publicKey, filters[i])
+ assert.True(t, vertexExistsInIDSet(allocID),
+ "Allocation %d should still exist in ID set", i)
+ assert.True(t, vertexDataExists(allocID),
+ "Allocation %d data should still exist", i)
+ }
+
+ // Create registry - this will load the prover from vertex data iterator
+ // The allocations with missing ID set entries will still be in the cache
+ // because extractGlobalState reads from vertex DATA, not ID set
+ registry, err := NewProverRegistry(logger, hg)
+ require.NoError(t, err)
+
+ // Verify the prover is in cache with all 5 allocations
+ // (because extractGlobalState reads from vertex data which still exists)
+ info, err := registry.GetProverInfo(proverAddr)
+ require.NoError(t, err)
+ require.NotNil(t, info)
+ t.Logf("Prover in cache has %d allocations", len(info.Allocations))
+
+ // Run pruning - this should handle the incomplete state gracefully
+ err = registry.PruneOrphanJoins(currentFrame)
+ require.NoError(t, err)
+
+ // After pruning:
+ // - All allocation vertex DATA should be deleted (both orphaned and complete ones)
+ // - The prover should be removed since all allocations are gone
+ for i, filter := range filters {
+ allocID := getAllocationVertexID(publicKey, filter)
+ assert.False(t, vertexExistsInIDSet(allocID),
+ "Allocation %d should not exist in ID set after prune", i)
+ assert.False(t, vertexDataExists(allocID),
+ "Allocation %d data should be DELETED after prune", i)
+ }
+
+ // Prover should be removed from cache via GetProverInfo
+ info, err = registry.GetProverInfo(proverAddr)
+ require.NoError(t, err)
+ assert.Nil(t, info, "Prover should be removed from cache after all allocations pruned")
+
+ // Also verify through GetProvers that the prover is gone from all filters
+ for _, filter := range filters {
+ provers, err := registry.GetProvers(filter)
+ require.NoError(t, err)
+ for _, p := range provers {
+ assert.NotEqual(t, hex.EncodeToString(proverAddr), hex.EncodeToString(p.Address),
+ "Prover should not appear in GetProvers for filter %s", string(filter))
+ }
+ }
+
+ // Verify through GetProversByStatus that the prover is gone
+ joiningProvers, err := registry.GetProversByStatus(nil, consensus.ProverStatusJoining)
+ require.NoError(t, err)
+ for _, p := range joiningProvers {
+ assert.NotEqual(t, hex.EncodeToString(proverAddr), hex.EncodeToString(p.Address),
+ "Prover should not appear in GetProversByStatus(Joining)")
+ }
+
+ t.Logf("Incomplete state prune test completed successfully")
+ t.Logf(" - 3 allocations with missing ID set entries: vertex data cleaned up")
+ t.Logf(" - 2 allocations with complete state: fully pruned")
+ t.Logf(" - Prover removed after all allocations pruned")
+ t.Logf(" - Registry methods confirm prover is gone")
+}
diff --git a/node/crypto/proof_tree_test.go b/node/crypto/proof_tree_test.go
index 7c62a09..7763c5a 100644
--- a/node/crypto/proof_tree_test.go
+++ b/node/crypto/proof_tree_test.go
@@ -588,6 +588,349 @@ func TestTreeLongestBranchNoBLS(t *testing.T) {
}
}
+// TestTreeNoStaleNodesAfterDelete tests that deleting nodes does not leave
+// orphaned/stale tree nodes in the database. This specifically tests the case
+// where branch merging occurs during deletion.
+func TestTreeNoStaleNodesAfterDeleteNoBLS(t *testing.T) {
+ l, _ := zap.NewProduction()
+ db := store.NewPebbleDB(l, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, 0)
+ s := store.NewPebbleHypergraphStore(&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, db, l, verencr, nil)
+ shardKey := tries.ShardKey{}
+ tree := &tries.LazyVectorCommitmentTree{Store: s, SetType: "vertex", PhaseType: "adds", ShardKey: shardKey}
+
+ // Create keys that will force branch creation and then merging on deletion.
+ // We want keys that share a prefix so that:
+ // 1. Inserting them creates branch nodes
+ // 2. Deleting some of them causes branch merging (childCount == 1 case)
+
+ // Create 3 keys with same first 8 bytes, different after
+ commonPrefix := make([]byte, 8)
+ rand.Read(commonPrefix)
+
+ keys := make([][]byte, 3)
+ values := make([][]byte, 3)
+
+ for i := 0; i < 3; i++ {
+ key := make([]byte, 64)
+ copy(key[:8], commonPrefix)
+ // Vary byte 8 to create branching
+ key[8] = byte(i * 64) // 0x00, 0x40, 0x80
+ rand.Read(key[9:])
+ keys[i] = key
+
+ val := make([]byte, 32)
+ rand.Read(val)
+ values[i] = val
+
+ err := tree.Insert(nil, key, val, nil, big.NewInt(1))
+ if err != nil {
+ t.Fatalf("Failed to insert key %d: %v", i, err)
+ }
+ }
+
+ // Verify all 3 keys exist
+ for i, key := range keys {
+ _, err := tree.Get(key)
+ if err != nil {
+ t.Fatalf("Key %d not found after insert: %v", i, err)
+ }
+ }
+
+ // Count tree nodes before deletion
+ nodesBefore := countTreeNodes(t, s, shardKey)
+ t.Logf("Tree nodes before deletion: %d", nodesBefore)
+
+ // Delete one key - this should trigger branch merging
+ err := tree.Delete(nil, keys[1])
+ if err != nil {
+ t.Fatalf("Failed to delete key 1: %v", err)
+ }
+
+ // Verify key 1 is gone
+ _, err = tree.Get(keys[1])
+ if err == nil {
+ t.Fatal("Key 1 should not exist after deletion")
+ }
+
+ // Verify keys 0 and 2 still exist
+ for _, i := range []int{0, 2} {
+ _, err := tree.Get(keys[i])
+ if err != nil {
+ t.Fatalf("Key %d not found after deleting key 1: %v", i, err)
+ }
+ }
+
+ // Count tree nodes after deletion
+ nodesAfter := countTreeNodes(t, s, shardKey)
+ t.Logf("Tree nodes after deletion: %d", nodesAfter)
+
+ // Now verify that all stored nodes are actually reachable from the root.
+ // This is the critical check - any unreachable nodes are "stale".
+ reachableNodes := countReachableNodes(t, tree)
+ t.Logf("Reachable nodes from root: %d", reachableNodes)
+
+ if nodesAfter != reachableNodes {
+ t.Errorf("STALE NODES DETECTED: stored=%d, reachable=%d, stale=%d",
+ nodesAfter, reachableNodes, nodesAfter-reachableNodes)
+ }
+
+ // More aggressive test: delete all but one key
+ err = tree.Delete(nil, keys[2])
+ if err != nil {
+ t.Fatalf("Failed to delete key 2: %v", err)
+ }
+
+ nodesAfterSecondDelete := countTreeNodes(t, s, shardKey)
+ reachableAfterSecondDelete := countReachableNodes(t, tree)
+ t.Logf("After second delete: stored=%d, reachable=%d", nodesAfterSecondDelete, reachableAfterSecondDelete)
+
+ if nodesAfterSecondDelete != reachableAfterSecondDelete {
+ t.Errorf("STALE NODES DETECTED after second delete: stored=%d, reachable=%d, stale=%d",
+ nodesAfterSecondDelete, reachableAfterSecondDelete, nodesAfterSecondDelete-reachableAfterSecondDelete)
+ }
+
+ // Delete the last key - tree should be empty
+ err = tree.Delete(nil, keys[0])
+ if err != nil {
+ t.Fatalf("Failed to delete key 0: %v", err)
+ }
+
+ nodesAfterAllDeleted := countTreeNodes(t, s, shardKey)
+ t.Logf("After all deleted: stored=%d", nodesAfterAllDeleted)
+
+ // There should be no tree nodes left (except possibly the root marker)
+ if nodesAfterAllDeleted > 1 {
+ t.Errorf("STALE NODES DETECTED after all deleted: stored=%d (expected 0 or 1)",
+ nodesAfterAllDeleted)
+ }
+}
+
+// TestTreeNoStaleNodesAfterBranchMerge specifically tests the case where
+// deleting a node causes a branch to merge with its only remaining child branch.
+// This tests the FullPrefix update bug hypothesis.
+func TestTreeNoStaleNodesAfterBranchMergeNoBLS(t *testing.T) {
+ l, _ := zap.NewProduction()
+ db := store.NewPebbleDB(l, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, 0)
+ s := store.NewPebbleHypergraphStore(&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, db, l, verencr, nil)
+ shardKey := tries.ShardKey{}
+ tree := &tries.LazyVectorCommitmentTree{Store: s, SetType: "vertex", PhaseType: "adds", ShardKey: shardKey}
+
+ // To trigger a branch-to-branch merge during deletion, we need:
+ // 1. A parent branch with exactly 2 children
+ // 2. One child is a leaf (to be deleted)
+ // 3. The other child is a branch node
+ //
+ // Structure we want to create:
+ // Root (branch)
+ // ├── Child A (leaf) - will be deleted
+ // └── Child B (branch)
+ // ├── Grandchild 1 (leaf)
+ // └── Grandchild 2 (leaf)
+ //
+ // After deleting Child A, Root should merge with Child B.
+
+ // Keys with controlled nibbles to create specific tree structure
+ // Nibble = 6 bits, so first nibble is bits 0-5 of first byte
+
+ // Key A: first nibble = 0 (byte[0] bits 7-2 = 000000)
+ keyA := make([]byte, 64)
+ keyA[0] = 0x00 // First nibble = 0
+ rand.Read(keyA[1:])
+
+ // Keys for branch B children: first nibble = 1 (byte[0] bits 7-2 = 000001)
+ // They share first nibble (1) but differ at second nibble
+ keyB1 := make([]byte, 64)
+ keyB1[0] = 0x04 // First nibble = 1 (binary: 000001 << 2 = 00000100)
+ keyB1[1] = 0x00 // Second nibble differs
+ rand.Read(keyB1[2:])
+
+ keyB2 := make([]byte, 64)
+ keyB2[0] = 0x04 // First nibble = 1
+ keyB2[1] = 0x40 // Second nibble differs (binary: 010000...)
+ rand.Read(keyB2[2:])
+
+ // Insert all keys
+ val := make([]byte, 32)
+ rand.Read(val)
+ if err := tree.Insert(nil, keyA, val, nil, big.NewInt(1)); err != nil {
+ t.Fatalf("Failed to insert keyA: %v", err)
+ }
+
+ rand.Read(val)
+ if err := tree.Insert(nil, keyB1, val, nil, big.NewInt(1)); err != nil {
+ t.Fatalf("Failed to insert keyB1: %v", err)
+ }
+
+ rand.Read(val)
+ if err := tree.Insert(nil, keyB2, val, nil, big.NewInt(1)); err != nil {
+ t.Fatalf("Failed to insert keyB2: %v", err)
+ }
+
+ // Verify structure
+ nodesBefore := countTreeNodes(t, s, shardKey)
+ reachableBefore := countReachableNodes(t, tree)
+ t.Logf("Before deletion: stored=%d, reachable=%d", nodesBefore, reachableBefore)
+
+ if nodesBefore != reachableBefore {
+ t.Errorf("STALE NODES before deletion: stored=%d, reachable=%d", nodesBefore, reachableBefore)
+ }
+
+ // Delete keyA - this should trigger the merge of root with child B
+ if err := tree.Delete(nil, keyA); err != nil {
+ t.Fatalf("Failed to delete keyA: %v", err)
+ }
+
+ // Verify keyA is gone
+ if _, err := tree.Get(keyA); err == nil {
+ t.Fatal("keyA should not exist after deletion")
+ }
+
+ // Verify B keys still exist
+ if _, err := tree.Get(keyB1); err != nil {
+ t.Fatalf("keyB1 not found after deletion: %v", err)
+ }
+ if _, err := tree.Get(keyB2); err != nil {
+ t.Fatalf("keyB2 not found after deletion: %v", err)
+ }
+
+ // Check for stale nodes
+ nodesAfter := countTreeNodes(t, s, shardKey)
+ reachableAfter := countReachableNodes(t, tree)
+ t.Logf("After deletion: stored=%d, reachable=%d", nodesAfter, reachableAfter)
+
+ if nodesAfter != reachableAfter {
+ t.Errorf("STALE NODES DETECTED after branch merge: stored=%d, reachable=%d, stale=%d",
+ nodesAfter, reachableAfter, nodesAfter-reachableAfter)
+ }
+}
+
+// TestTreeNoStaleNodesAfterMassDelete tests stale node detection with many keys
+func TestTreeNoStaleNodesAfterMassDeleteNoBLS(t *testing.T) {
+ l, _ := zap.NewProduction()
+ db := store.NewPebbleDB(l, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, 0)
+ s := store.NewPebbleHypergraphStore(&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, db, l, verencr, nil)
+ shardKey := tries.ShardKey{}
+ tree := &tries.LazyVectorCommitmentTree{Store: s, SetType: "vertex", PhaseType: "adds", ShardKey: shardKey}
+
+ // Insert 1000 random keys
+ numKeys := 1000
+ keys := make([][]byte, numKeys)
+
+ for i := 0; i < numKeys; i++ {
+ key := make([]byte, 64)
+ rand.Read(key)
+ keys[i] = key
+
+ val := make([]byte, 32)
+ rand.Read(val)
+
+ err := tree.Insert(nil, key, val, nil, big.NewInt(1))
+ if err != nil {
+ t.Fatalf("Failed to insert key %d: %v", i, err)
+ }
+ }
+
+ nodesBefore := countTreeNodes(t, s, shardKey)
+ reachableBefore := countReachableNodes(t, tree)
+ t.Logf("Before deletion: stored=%d, reachable=%d", nodesBefore, reachableBefore)
+
+ if nodesBefore != reachableBefore {
+ t.Errorf("STALE NODES before deletion: stored=%d, reachable=%d", nodesBefore, reachableBefore)
+ }
+
+ // Delete half the keys in random order
+ mrand.Shuffle(numKeys, func(i, j int) {
+ keys[i], keys[j] = keys[j], keys[i]
+ })
+
+ for i := 0; i < numKeys/2; i++ {
+ err := tree.Delete(nil, keys[i])
+ if err != nil {
+ t.Fatalf("Failed to delete key %d: %v", i, err)
+ }
+ }
+
+ nodesAfter := countTreeNodes(t, s, shardKey)
+ reachableAfter := countReachableNodes(t, tree)
+ t.Logf("After deleting half: stored=%d, reachable=%d", nodesAfter, reachableAfter)
+
+ if nodesAfter != reachableAfter {
+ t.Errorf("STALE NODES DETECTED: stored=%d, reachable=%d, stale=%d",
+ nodesAfter, reachableAfter, nodesAfter-reachableAfter)
+ }
+
+ // Verify remaining keys still accessible
+ for i := numKeys / 2; i < numKeys; i++ {
+ _, err := tree.Get(keys[i])
+ if err != nil {
+ t.Errorf("Key %d not found after mass deletion: %v", i, err)
+ }
+ }
+}
+
+// countTreeNodes counts all tree nodes stored in the database for a given shard
+func countTreeNodes(t *testing.T, s *store.PebbleHypergraphStore, shardKey tries.ShardKey) int {
+ count := 0
+
+ // Use the store's iterator to count nodes
+ iter, err := s.IterateRawLeaves("vertex", "adds", shardKey)
+ if err != nil {
+ t.Fatalf("Failed to create iterator: %v", err)
+ }
+ defer iter.Close()
+
+ // Count all entries (both branch and leaf nodes)
+ for valid := iter.First(); valid; valid = iter.Next() {
+ count++
+ }
+
+ return count
+}
+
+// countReachableNodes counts nodes reachable from the tree root by walking the tree
+func countReachableNodes(t *testing.T, tree *tries.LazyVectorCommitmentTree) int {
+ if tree.Root == nil {
+ return 0
+ }
+
+ count := 0
+ var walk func(node tries.LazyVectorCommitmentNode)
+ walk = func(node tries.LazyVectorCommitmentNode) {
+ if node == nil {
+ return
+ }
+ count++
+
+ switch n := node.(type) {
+ case *tries.LazyVectorCommitmentBranchNode:
+ for i := 0; i < 64; i++ {
+ child := n.Children[i]
+ if child == nil {
+ // Try to load from store
+ var err error
+ child, err = tree.Store.GetNodeByPath(
+ tree.SetType,
+ tree.PhaseType,
+ tree.ShardKey,
+ append(n.FullPrefix, i),
+ )
+ if err != nil {
+ continue
+ }
+ }
+ if child != nil {
+ walk(child)
+ }
+ }
+ case *tries.LazyVectorCommitmentLeafNode:
+ // Leaf node, nothing more to walk
+ }
+ }
+
+ walk(tree.Root)
+ return count
+}
+
// TestTreeBranchStructure tests that the tree structure is preserved after
// adding and removing leaves that cause branch creation due to shared prefixes.
func TestTreeBranchStructureNoBLS(t *testing.T) {
diff --git a/node/crypto/rdf_multiprover_integration_test.go b/node/crypto/rdf_multiprover_integration_test.go
index adf1773..4b76272 100644
--- a/node/crypto/rdf_multiprover_integration_test.go
+++ b/node/crypto/rdf_multiprover_integration_test.go
@@ -5,6 +5,7 @@ package crypto_test
import (
"bytes"
+ "fmt"
"math/big"
"testing"
@@ -16,6 +17,265 @@ import (
qcrypto "source.quilibrium.com/quilibrium/monorepo/types/tries"
)
+func TestName(t *testing.T) {
+ rdfDoc := `
+@prefix qcl: .
+@prefix rdfs: .
+@prefix name: .
+
+name:NameRecord a rdfs:Class ;
+ rdfs:comment "Quilibrium name service record" .
+
+name:Name a rdfs:Property ;
+ rdfs:range name:NameRecord ;
+ rdfs:domain qcl:String ;
+ qcl:size 32 ;
+ qcl:order 1 .
+
+name:AuthorityKey a rdfs:Property ;
+ rdfs:range name:NameRecord ;
+ rdfs:domain qcl:ByteArray ;
+ qcl:size 57 ;
+ qcl:order 2 .
+
+name:Parent a rdfs:Property ;
+ rdfs:range name:NameRecord ;
+ rdfs:domain qcl:ByteArray ;
+ qcl:size 32 ;
+ qcl:order 3 .
+
+name:CreatedAt a rdfs:Property ;
+ rdfs:range name:NameRecord ;
+ rdfs:domain qcl:Uint ;
+ qcl:size 64 ;
+ qcl:order 4 .
+
+name:UpdatedAt a rdfs:Property ;
+ rdfs:range name:NameRecord ;
+ rdfs:domain qcl:Uint ;
+ qcl:size 64 ;
+ qcl:order 5 .
+
+name:RecordType a rdfs:Property ;
+ rdfs:range name:NameRecord ;
+ rdfs:domain qcl:Uint ;
+ qcl:size 8 ;
+ qcl:order 6 .
+
+name:Data a rdfs:Property ;
+ rdfs:range name:NameRecord ;
+ rdfs:domain qcl:String ;
+ qcl:size 64 ;
+ qcl:order 7 .
+ `
+ // Create components
+ parser := &schema.TurtleRDFParser{}
+ log := zap.NewNop()
+ inclusionProver := bls48581.NewKZGInclusionProver(log)
+ multiprover := schema.NewRDFMultiprover(parser, inclusionProver)
+
+ // Create a test tree
+ tree := &qcrypto.VectorCommitmentTree{}
+
+ // Insert test data at the correct indices
+ // The tree needs to have data at indices that will be used in the polynomial
+ // Insert enough data to ensure polynomial has values at indices 1, 2, 3
+ for i := 0; i < 63; i++ {
+ data := bytes.Repeat([]byte{byte(i + 1)}, 57)
+ err := tree.Insert([]byte{byte(i << 2)}, data, nil, big.NewInt(57))
+ require.NoError(t, err)
+ }
+ tree.Commit(inclusionProver, false)
+
+ t.Run("Prove", func(t *testing.T) {
+ fields := []string{"Name", "AuthorityKey"}
+ proof, err := multiprover.Prove(rdfDoc, fields, tree)
+ pb, _ := proof.ToBytes()
+ fmt.Printf("proof %x\n", pb)
+ require.NoError(t, err)
+ assert.NotNil(t, proof)
+ })
+
+ t.Run("ProveWithType", func(t *testing.T) {
+ fields := []string{"Name", "AuthorityKey"}
+ typeIndex := uint64(63)
+ proof, err := multiprover.ProveWithType(rdfDoc, fields, tree, &typeIndex)
+ require.NoError(t, err)
+ assert.NotNil(t, proof)
+
+ pb, _ := proof.ToBytes()
+ fmt.Printf("proof with type %x\n", pb)
+ // Test without type index
+ proof, err = multiprover.ProveWithType(rdfDoc, fields, tree, nil)
+ require.NoError(t, err)
+ assert.NotNil(t, proof)
+ pb, _ = proof.ToBytes()
+
+ fmt.Printf("proof with type but type is nil %x\n", pb)
+ })
+
+ t.Run("Get", func(t *testing.T) {
+ // Test getting name field (order 1, so key is 1<<2 = 4, data at index 1)
+ value, err := multiprover.Get(rdfDoc, "name:NameRecord", "Name", tree)
+ require.NoError(t, err)
+ assert.Equal(t, bytes.Repeat([]byte{2}, 57), value) // Index 1 has value 2
+
+ // Test getting one-time key field (order 2, so key is 2<<2 = 8, data at index 2)
+ value, err = multiprover.Get(rdfDoc, "name:NameRecord", "AuthorityKey", tree)
+ require.NoError(t, err)
+ assert.Equal(t, bytes.Repeat([]byte{3}, 57), value) // Index 2 has value 3
+ })
+
+ t.Run("GetFieldOrder", func(t *testing.T) {
+ order, maxOrder, err := multiprover.GetFieldOrder(rdfDoc, "name:NameRecord", "Name")
+ require.NoError(t, err)
+ assert.Equal(t, 1, order)
+ assert.Equal(t, 7, maxOrder)
+
+ order, maxOrder, err = multiprover.GetFieldOrder(rdfDoc, "name:NameRecord", "AuthorityKey")
+ require.NoError(t, err)
+ assert.Equal(t, 2, order)
+ assert.Equal(t, 7, maxOrder)
+
+ order, maxOrder, err = multiprover.GetFieldOrder(rdfDoc, "name:NameRecord", "Parent")
+ require.NoError(t, err)
+ assert.Equal(t, 3, order)
+ assert.Equal(t, 7, maxOrder)
+ })
+
+ t.Run("GetFieldKey", func(t *testing.T) {
+ key, err := multiprover.GetFieldKey(rdfDoc, "name:NameRecord", "Name")
+ require.NoError(t, err)
+ assert.Equal(t, []byte{1 << 2}, key)
+
+ key, err = multiprover.GetFieldKey(rdfDoc, "name:NameRecord", "AuthorityKey")
+ require.NoError(t, err)
+ assert.Equal(t, []byte{2 << 2}, key)
+ })
+
+ t.Run("Verify", func(t *testing.T) {
+ // Create proof without type index for simpler verification
+ fields := []string{"Name", "AuthorityKey", "Parent"}
+ proof, err := multiprover.ProveWithType(rdfDoc, fields, tree, nil)
+ require.NoError(t, err)
+
+ // Get actual data from tree for verification
+ data := make([][]byte, len(fields))
+ for i, field := range fields {
+ value, err := multiprover.Get(rdfDoc, "name:NameRecord", field, tree)
+ require.NoError(t, err)
+ data[i] = value
+ }
+
+ // Create commit
+ commit := tree.Commit(inclusionProver, false)
+ proofBytes, _ := proof.ToBytes()
+
+ // Verify should pass with correct data
+ // keys parameter is nil to use default index-based keys
+ valid, err := multiprover.Verify(rdfDoc, fields, nil, commit, proofBytes, data)
+ require.NoError(t, err)
+ assert.True(t, valid)
+
+ // Verify should fail with wrong data
+ wrongData := make([][]byte, len(fields))
+ for i := range wrongData {
+ wrongData[i] = []byte("wrong data")
+ }
+ valid, err = multiprover.Verify(rdfDoc, fields, nil, commit, proofBytes, wrongData)
+ require.NoError(t, err)
+ assert.False(t, valid)
+
+ // Verify should error with non-existent field
+ invalidFields := []string{"Name", "NonExistent"}
+ _, err = multiprover.Verify(rdfDoc, invalidFields, nil, commit, proofBytes, data[:2])
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "not found")
+ })
+
+ t.Run("VerifyWithType", func(t *testing.T) {
+ // Add type marker to tree
+ typeData := bytes.Repeat([]byte{0xff}, 32)
+ typeIndex := uint64(63)
+ err := tree.Insert(typeData, typeData, nil, big.NewInt(32))
+ require.NoError(t, err)
+
+ // Get commit after all data is inserted
+ commit := tree.Commit(inclusionProver, false)
+
+ // Create proof with type
+ fields := []string{"Name", "AuthorityKey"}
+ proof, err := multiprover.ProveWithType(rdfDoc, fields, tree, &typeIndex)
+ require.NoError(t, err)
+
+ // Get actual data from tree
+ data := make([][]byte, len(fields))
+ for i, field := range fields {
+ value, err := multiprover.Get(rdfDoc, "name:NameRecord", field, tree)
+ require.NoError(t, err)
+ data[i] = value
+ }
+
+ proofBytes, _ := proof.ToBytes()
+
+ // Verify with type should pass
+ valid, err := multiprover.VerifyWithType(rdfDoc, fields, nil, commit, proofBytes, data, &typeIndex, typeData)
+ require.NoError(t, err)
+ assert.True(t, valid)
+
+ // Verify without type when proof was created with type should fail
+ valid, err = multiprover.VerifyWithType(rdfDoc, fields, nil, commit, proofBytes, data, nil, nil)
+ require.NoError(t, err)
+ assert.False(t, valid) // Should fail due to missing type data
+
+ // Create proof without type
+ proofNoType, err := multiprover.ProveWithType(rdfDoc, fields, tree, nil)
+ require.NoError(t, err)
+ proofNoTypeBytes, _ := proofNoType.ToBytes()
+
+ // Verify without type should pass
+ valid, err = multiprover.VerifyWithType(rdfDoc, fields, nil, commit, proofNoTypeBytes, data, nil, nil)
+ require.NoError(t, err)
+ assert.True(t, valid)
+
+ // Verify with wrong type data should fail
+ wrongTypeData := []byte("wrong type data")
+ valid, err = multiprover.VerifyWithType(rdfDoc, fields, nil, commit, proofBytes, data, &typeIndex, wrongTypeData)
+ require.NoError(t, err)
+ assert.False(t, valid)
+
+ // Verify with different type index but same data should still fail
+ // because the hash uses the fixed key bytes.Repeat([]byte{0xff}, 32)
+ differentTypeIndex := uint64(50)
+ valid, err = multiprover.VerifyWithType(rdfDoc, fields, nil, commit, proofBytes, data, &differentTypeIndex, typeData)
+ require.NoError(t, err)
+ assert.False(t, valid) // Should fail because proof was created with index 63
+ })
+
+ t.Run("ErrorCases", func(t *testing.T) {
+ // Test non-existent field
+ _, err := multiprover.Get(rdfDoc, "name:NameRecord", "NonExistent", tree)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "not found")
+
+ // Test invalid document
+ _, err = multiprover.Get("invalid rdf", "name:NameRecord", "name", tree)
+ assert.Error(t, err)
+
+ // Test verify with mismatched data count
+ fields := []string{"Name", "AuthorityKey"}
+ proof, err := multiprover.ProveWithType(rdfDoc, fields, tree, nil)
+ require.NoError(t, err)
+
+ // Wrong number of data elements
+ wrongData := [][]byte{[]byte("data1")}
+ commit := tree.Commit(inclusionProver, false)
+ _, err = multiprover.Verify(rdfDoc, fields, nil, commit, proof.GetProof(), wrongData)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "fields and data length mismatch")
+ })
+}
+
func TestRDFMultiprover(t *testing.T) {
// Create test RDF document
rdfDoc := `
diff --git a/node/dbscan/main.go b/node/dbscan/main.go
index 7766a7c..8d9dea3 100644
--- a/node/dbscan/main.go
+++ b/node/dbscan/main.go
@@ -35,8 +35,28 @@ var (
)
configDirectory2 = flag.String(
"config2",
- filepath.Join(".", ".config"),
- "the configuration directory",
+ "",
+ "the second configuration directory (optional, enables comparison mode)",
+ )
+ keyPrefix = flag.String(
+ "prefix",
+ "",
+ "hex-encoded key prefix to filter results (e.g., '09' for hypergraph keys)",
+ )
+ searchKey = flag.String(
+ "search",
+ "",
+ "hex-encoded key substring to search for in keys",
+ )
+ searchValue = flag.String(
+ "search-value",
+ "",
+ "hex-encoded substring to search for in values",
+ )
+ maxResults = flag.Int(
+ "max",
+ 0,
+ "maximum number of results to display (0 = unlimited)",
)
// *char flags
@@ -50,6 +70,34 @@ func main() {
config.Flags(&char, &ver)
flag.Parse()
+ // Parse filter options
+ var prefixFilter []byte
+ if *keyPrefix != "" {
+ var err error
+ prefixFilter, err = hex.DecodeString(*keyPrefix)
+ if err != nil {
+ log.Fatalf("invalid prefix hex: %v", err)
+ }
+ }
+
+ var keySearchPattern []byte
+ if *searchKey != "" {
+ var err error
+ keySearchPattern, err = hex.DecodeString(*searchKey)
+ if err != nil {
+ log.Fatalf("invalid search hex: %v", err)
+ }
+ }
+
+ var valueSearchPattern []byte
+ if *searchValue != "" {
+ var err error
+ valueSearchPattern, err = hex.DecodeString(*searchValue)
+ if err != nil {
+ log.Fatalf("invalid search-value hex: %v", err)
+ }
+ }
+
nodeConfig1, err := config.LoadConfig(*configDirectory1, "", false)
if err != nil {
log.Fatal("failed to load config", err)
@@ -64,9 +112,102 @@ func main() {
db1 := store.NewPebbleDB(logger, nodeConfig1.DB, uint(0))
defer db1.Close()
- iter1, err := db1.NewIter([]byte{0x00}, []byte{0xff})
+ // Determine iteration bounds based on prefix filter
+ lowerBound := []byte{0x00}
+ upperBound := []byte{0xff}
+ if len(prefixFilter) > 0 {
+ lowerBound = prefixFilter
+ // Create upper bound by incrementing the last byte of the prefix
+ upperBound = make([]byte, len(prefixFilter))
+ copy(upperBound, prefixFilter)
+ for i := len(upperBound) - 1; i >= 0; i-- {
+ if upperBound[i] < 0xff {
+ upperBound[i]++
+ break
+ }
+ upperBound[i] = 0x00
+ if i == 0 {
+ // Prefix is all 0xff, scan to end
+ upperBound = []byte{0xff}
+ }
+ }
+ }
+
+ // Single database mode (read-only dump)
+ if *configDirectory2 == "" {
+ runSingleDBMode(db1, lowerBound, upperBound, prefixFilter, keySearchPattern, valueSearchPattern, logger)
+ return
+ }
+
+ // Comparison mode (two databases)
+ runCompareMode(db1, lowerBound, upperBound, prefixFilter, keySearchPattern, valueSearchPattern, logger)
+}
+
+func runSingleDBMode(
+ db1 *store.PebbleDB,
+ lowerBound, upperBound []byte,
+ prefixFilter, keySearchPattern, valueSearchPattern []byte,
+ logger *zap.Logger,
+) {
+ iter1, err := db1.NewIter(lowerBound, upperBound)
if err != nil {
logger.Error("failed to create iterator", zap.Error(err))
+ return
+ }
+ defer iter1.Close()
+
+ count := 0
+ matched := 0
+
+ for iter1.First(); iter1.Valid(); iter1.Next() {
+ key := iter1.Key()
+ value := iter1.Value()
+
+ // Apply prefix filter
+ if len(prefixFilter) > 0 && !bytes.HasPrefix(key, prefixFilter) {
+ continue
+ }
+
+ // Apply key search pattern
+ if len(keySearchPattern) > 0 && !bytes.Contains(key, keySearchPattern) {
+ continue
+ }
+
+ // Apply value search pattern
+ if len(valueSearchPattern) > 0 && !bytes.Contains(value, valueSearchPattern) {
+ continue
+ }
+
+ count++
+ matched++
+
+ decoded := decodeValue(key, value)
+ fmt.Printf(
+ "key: %s\nsemantic: %s\nvalue:\n%s\n\n",
+ hex.EncodeToString(key),
+ describeKey(key),
+ indent(decoded),
+ )
+
+ if *maxResults > 0 && matched >= *maxResults {
+ fmt.Printf("... (stopped after %d results, use -max to change limit)\n", *maxResults)
+ break
+ }
+ }
+
+ fmt.Printf("\nsummary: %d keys displayed from %s\n", matched, *configDirectory1)
+}
+
+func runCompareMode(
+ db1 *store.PebbleDB,
+ lowerBound, upperBound []byte,
+ prefixFilter, keySearchPattern, valueSearchPattern []byte,
+ logger *zap.Logger,
+) {
+ iter1, err := db1.NewIter(lowerBound, upperBound)
+ if err != nil {
+ logger.Error("failed to create iterator", zap.Error(err))
+ return
}
defer iter1.Close()
@@ -78,9 +219,10 @@ func main() {
db2 := store.NewPebbleDB(logger, nodeConfig2.DB, uint(0))
defer db2.Close()
- iter2, err := db2.NewIter([]byte{0x00}, []byte{0xff})
+ iter2, err := db2.NewIter(lowerBound, upperBound)
if err != nil {
logger.Error("failed to create iterator", zap.Error(err))
+ return
}
defer iter2.Close()
@@ -90,8 +232,22 @@ func main() {
onlyDB1 := 0
onlyDB2 := 0
valueDiff := 0
+ matched := 0
keyPresenceMap := make(map[string]*keyPresence)
+ shouldInclude := func(key, value []byte) bool {
+ if len(prefixFilter) > 0 && !bytes.HasPrefix(key, prefixFilter) {
+ return false
+ }
+ if len(keySearchPattern) > 0 && !bytes.Contains(key, keySearchPattern) {
+ return false
+ }
+ if len(valueSearchPattern) > 0 && !bytes.Contains(value, valueSearchPattern) {
+ return false
+ }
+ return true
+ }
+
for iter1Valid || iter2Valid {
var key1, key2 []byte
var value1, value2 []byte
@@ -114,34 +270,86 @@ func main() {
case iter1Valid && iter2Valid:
comparison := bytes.Compare(key1, key2)
if comparison == 0 {
- if bytes.Equal(value1, value2) {
- fmt.Printf(
- "key: %s\nsemantic: %s\nvalues identical in %s and %s\nvalue:\n%s\n\n",
- shortHex(key1),
- describeKey(key1),
- *configDirectory1,
- *configDirectory2,
- indent(decoded1),
- )
- } else {
- valueDiff++
- fmt.Printf(
- "key: %s\nsemantic: %s\nvalue (%s):\n%s\nvalue (%s):\n%s\n",
- shortHex(key1),
- describeKey(key1),
- *configDirectory1,
- indent(decoded1),
- *configDirectory2,
- indent(decoded2),
- )
- if diff := diffStrings(decoded1, decoded2); diff != "" {
- fmt.Printf("diff:\n%s\n", indent(diff))
+ if shouldInclude(key1, value1) || shouldInclude(key2, value2) {
+ matched++
+ if *maxResults > 0 && matched > *maxResults {
+ fmt.Printf("... (stopped after %d results)\n", *maxResults)
+ goto done
+ }
+
+ if bytes.Equal(value1, value2) {
+ fmt.Printf(
+ "key: %s\nsemantic: %s\nvalues identical in %s and %s\nvalue:\n%s\n\n",
+ shortHex(key1),
+ describeKey(key1),
+ *configDirectory1,
+ *configDirectory2,
+ indent(decoded1),
+ )
+ } else {
+ valueDiff++
+ fmt.Printf(
+ "key: %s\nsemantic: %s\nvalue (%s):\n%s\nvalue (%s):\n%s\n",
+ shortHex(key1),
+ describeKey(key1),
+ *configDirectory1,
+ indent(decoded1),
+ *configDirectory2,
+ indent(decoded2),
+ )
+ if diff := diffStrings(decoded1, decoded2); diff != "" {
+ fmt.Printf("diff:\n%s\n", indent(diff))
+ }
+ fmt.Printf("\n")
}
- fmt.Printf("\n")
}
iter1Valid = iter1.Next()
iter2Valid = iter2.Next()
} else if comparison < 0 {
+ if shouldInclude(key1, value1) {
+ matched++
+ if *maxResults > 0 && matched > *maxResults {
+ fmt.Printf("... (stopped after %d results)\n", *maxResults)
+ goto done
+ }
+
+ onlyDB1++
+ fmt.Printf(
+ "key only in %s: %s\nsemantic: %s\nvalue:\n%s\n\n",
+ *configDirectory1,
+ shortHex(key1),
+ describeKey(key1),
+ indent(decoded1),
+ )
+ }
+ iter1Valid = iter1.Next()
+ } else {
+ if shouldInclude(key2, value2) {
+ matched++
+ if *maxResults > 0 && matched > *maxResults {
+ fmt.Printf("... (stopped after %d results)\n", *maxResults)
+ goto done
+ }
+
+ onlyDB2++
+ fmt.Printf(
+ "key only in %s: %s\nsemantic: %s\nvalue:\n%s\n\n",
+ *configDirectory2,
+ shortHex(key2),
+ describeKey(key2),
+ indent(decoded2),
+ )
+ }
+ iter2Valid = iter2.Next()
+ }
+ case iter1Valid:
+ if shouldInclude(key1, value1) {
+ matched++
+ if *maxResults > 0 && matched > *maxResults {
+ fmt.Printf("... (stopped after %d results)\n", *maxResults)
+ goto done
+ }
+
onlyDB1++
fmt.Printf(
"key only in %s: %s\nsemantic: %s\nvalue:\n%s\n\n",
@@ -150,8 +358,16 @@ func main() {
describeKey(key1),
indent(decoded1),
)
- iter1Valid = iter1.Next()
- } else {
+ }
+ iter1Valid = iter1.Next()
+ case iter2Valid:
+ if shouldInclude(key2, value2) {
+ matched++
+ if *maxResults > 0 && matched > *maxResults {
+ fmt.Printf("... (stopped after %d results)\n", *maxResults)
+ goto done
+ }
+
onlyDB2++
fmt.Printf(
"key only in %s: %s\nsemantic: %s\nvalue:\n%s\n\n",
@@ -160,31 +376,12 @@ func main() {
describeKey(key2),
indent(decoded2),
)
- iter2Valid = iter2.Next()
}
- case iter1Valid:
- onlyDB1++
- fmt.Printf(
- "key only in %s: %s\nsemantic: %s\nvalue:\n%s\n\n",
- *configDirectory1,
- shortHex(key1),
- describeKey(key1),
- indent(decoded1),
- )
- iter1Valid = iter1.Next()
- case iter2Valid:
- onlyDB2++
- fmt.Printf(
- "key only in %s: %s\nsemantic: %s\nvalue:\n%s\n\n",
- *configDirectory2,
- shortHex(key2),
- describeKey(key2),
- indent(decoded2),
- )
iter2Valid = iter2.Next()
}
}
+done:
fmt.Printf(
"summary: %d keys only in %s, %d keys only in %s, %d keys with differing values\n",
onlyDB1,
@@ -687,7 +884,7 @@ func decodeHypergraphValue(key []byte, value []byte) string {
switch sub {
case store.VERTEX_DATA:
- return summarizeVectorCommitmentTree(value)
+ return summarizeVectorCommitmentTree(key, value)
case store.VERTEX_TOMBSTONE:
return shortHex(value)
case store.VERTEX_ADDS_TREE_NODE,
@@ -765,8 +962,11 @@ func decodeHypergraphProto(value []byte) (string, bool) {
return output, matched
}
-func summarizeVectorCommitmentTree(value []byte) string {
- _, err := tries.DeserializeNonLazyTree(value)
+// Global intrinsic address (32 bytes of 0xff)
+var globalIntrinsicAddress = bytes.Repeat([]byte{0xff}, 32)
+
+func summarizeVectorCommitmentTree(key []byte, value []byte) string {
+ tree, err := tries.DeserializeNonLazyTree(value)
if err != nil {
return fmt.Sprintf(
"vector_commitment_tree decode_error=%v raw=%s",
@@ -780,6 +980,24 @@ func summarizeVectorCommitmentTree(value []byte) string {
"size_bytes": len(value),
"sha256": shortHex(sum[:]),
}
+
+ // Check if this is a global intrinsic vertex (domain = 0xff*32)
+ // Key structure for vertex data: {0x09, 0xF0, domain[32], address[32]}
+ if len(key) >= 66 {
+ domain := key[2:34]
+ address := key[34:66]
+
+ if bytes.Equal(domain, globalIntrinsicAddress) {
+ // This is a global intrinsic vertex - decode the fields
+ globalData := decodeGlobalIntrinsicVertex(tree, address)
+ if globalData != nil {
+ for k, v := range globalData {
+ summary[k] = v
+ }
+ }
+ }
+ }
+
jsonBytes, err := json.MarshalIndent(summary, "", " ")
if err != nil {
return fmt.Sprintf("vector_commitment_tree size_bytes=%d", len(value))
@@ -788,6 +1006,188 @@ func summarizeVectorCommitmentTree(value []byte) string {
return string(jsonBytes)
}
+// decodeGlobalIntrinsicVertex attempts to decode the vertex as a global intrinsic type
+// (prover, allocation, or reward)
+func decodeGlobalIntrinsicVertex(tree *tries.VectorCommitmentTree, address []byte) map[string]any {
+ result := make(map[string]any)
+ result["vertex_address"] = hex.EncodeToString(address)
+
+ // Try to detect the type by examining which fields exist
+ // Prover has PublicKey at order 0 (key 0x00) with size 585
+ // Allocation has Prover reference at order 0 (key 0x00)
+ // Reward has DelegateAddress at order 0 (key 0x00) with size 32
+
+ // Check order 0 field
+ order0Value, err := tree.Get([]byte{0x00})
+ if err != nil || len(order0Value) == 0 {
+ result["type"] = "unknown (no order 0 field)"
+ return result
+ }
+
+ switch len(order0Value) {
+ case 585:
+ // Prover: PublicKey is 585 bytes
+ result["type"] = "prover:Prover"
+ result["public_key"] = shortHex(order0Value)
+ decodeProverFields(tree, result)
+ case 32:
+ // Could be Allocation (Prover reference) or Reward (DelegateAddress)
+ // Check for allocation-specific fields
+ confirmFilter, _ := tree.Get([]byte{0x08}) // order 2
+ if len(confirmFilter) > 0 || len(confirmFilter) == 0 {
+ // Check if JoinFrameNumber exists (order 4, key 0x10)
+ joinFrame, _ := tree.Get([]byte{0x10})
+ if len(joinFrame) == 8 {
+ result["type"] = "allocation:ProverAllocation"
+ result["prover_reference"] = hex.EncodeToString(order0Value)
+ decodeAllocationFields(tree, result)
+ } else {
+ // Likely a reward vertex
+ result["type"] = "reward:ProverReward"
+ result["delegate_address"] = hex.EncodeToString(order0Value)
+ decodeRewardFields(tree, result)
+ }
+ }
+ default:
+ result["type"] = "unknown"
+ result["order_0_size"] = len(order0Value)
+ }
+
+ return result
+}
+
+func decodeProverFields(tree *tries.VectorCommitmentTree, result map[string]any) {
+ // Prover schema:
+ // order 0: PublicKey (585 bytes) - already decoded
+ // order 1: Status (1 byte) - key 0x04
+ // order 2: AvailableStorage (8 bytes) - key 0x08
+ // order 3: Seniority (8 bytes) - key 0x0c
+ // order 4: KickFrameNumber (8 bytes) - key 0x10
+
+ if status, err := tree.Get([]byte{0x04}); err == nil && len(status) == 1 {
+ result["status"] = decodeProverStatus(status[0])
+ result["status_raw"] = status[0]
+ }
+
+ if storage, err := tree.Get([]byte{0x08}); err == nil && len(storage) == 8 {
+ result["available_storage"] = binary.BigEndian.Uint64(storage)
+ }
+
+ if seniority, err := tree.Get([]byte{0x0c}); err == nil && len(seniority) == 8 {
+ result["seniority"] = binary.BigEndian.Uint64(seniority)
+ }
+
+ if kickFrame, err := tree.Get([]byte{0x10}); err == nil && len(kickFrame) == 8 {
+ result["kick_frame_number"] = binary.BigEndian.Uint64(kickFrame)
+ }
+}
+
+func decodeAllocationFields(tree *tries.VectorCommitmentTree, result map[string]any) {
+ // Allocation schema:
+ // order 0: Prover (32 bytes) - already decoded
+ // order 1: Status (1 byte) - key 0x04
+ // order 2: ConfirmationFilter (up to 64 bytes) - key 0x08
+ // order 3: RejectionFilter (up to 64 bytes) - key 0x0c
+ // order 4: JoinFrameNumber (8 bytes) - key 0x10
+ // order 5: LeaveFrameNumber (8 bytes) - key 0x14
+ // order 6: PauseFrameNumber (8 bytes) - key 0x18
+ // order 7: ResumeFrameNumber (8 bytes) - key 0x1c
+ // order 8: KickFrameNumber (8 bytes) - key 0x20
+ // order 9: JoinConfirmFrameNumber (8 bytes) - key 0x24
+ // order 10: JoinRejectFrameNumber (8 bytes) - key 0x28
+ // order 11: LeaveConfirmFrameNumber (8 bytes) - key 0x2c
+ // order 12: LeaveRejectFrameNumber (8 bytes) - key 0x30
+ // order 13: LastActiveFrameNumber (8 bytes) - key 0x34
+
+ if status, err := tree.Get([]byte{0x04}); err == nil && len(status) == 1 {
+ result["status"] = decodeAllocationStatus(status[0])
+ result["status_raw"] = status[0]
+ }
+
+ if confirmFilter, err := tree.Get([]byte{0x08}); err == nil && len(confirmFilter) > 0 {
+ result["confirmation_filter"] = hex.EncodeToString(confirmFilter)
+ if bytes.Equal(confirmFilter, make([]byte, len(confirmFilter))) {
+ result["is_global_prover"] = true
+ }
+ } else {
+ result["is_global_prover"] = true
+ }
+
+ if rejFilter, err := tree.Get([]byte{0x0c}); err == nil && len(rejFilter) > 0 {
+ result["rejection_filter"] = hex.EncodeToString(rejFilter)
+ }
+
+ if joinFrame, err := tree.Get([]byte{0x10}); err == nil && len(joinFrame) == 8 {
+ result["join_frame_number"] = binary.BigEndian.Uint64(joinFrame)
+ }
+
+ if leaveFrame, err := tree.Get([]byte{0x14}); err == nil && len(leaveFrame) == 8 {
+ result["leave_frame_number"] = binary.BigEndian.Uint64(leaveFrame)
+ }
+
+ if pauseFrame, err := tree.Get([]byte{0x18}); err == nil && len(pauseFrame) == 8 {
+ result["pause_frame_number"] = binary.BigEndian.Uint64(pauseFrame)
+ }
+
+ if resumeFrame, err := tree.Get([]byte{0x1c}); err == nil && len(resumeFrame) == 8 {
+ result["resume_frame_number"] = binary.BigEndian.Uint64(resumeFrame)
+ }
+
+ if kickFrame, err := tree.Get([]byte{0x20}); err == nil && len(kickFrame) == 8 {
+ result["kick_frame_number"] = binary.BigEndian.Uint64(kickFrame)
+ }
+
+ if joinConfirm, err := tree.Get([]byte{0x24}); err == nil && len(joinConfirm) == 8 {
+ result["join_confirm_frame_number"] = binary.BigEndian.Uint64(joinConfirm)
+ }
+
+ if joinReject, err := tree.Get([]byte{0x28}); err == nil && len(joinReject) == 8 {
+ result["join_reject_frame_number"] = binary.BigEndian.Uint64(joinReject)
+ }
+
+ if leaveConfirm, err := tree.Get([]byte{0x2c}); err == nil && len(leaveConfirm) == 8 {
+ result["leave_confirm_frame_number"] = binary.BigEndian.Uint64(leaveConfirm)
+ }
+
+ if leaveReject, err := tree.Get([]byte{0x30}); err == nil && len(leaveReject) == 8 {
+ result["leave_reject_frame_number"] = binary.BigEndian.Uint64(leaveReject)
+ }
+
+ if lastActive, err := tree.Get([]byte{0x34}); err == nil && len(lastActive) == 8 {
+ result["last_active_frame_number"] = binary.BigEndian.Uint64(lastActive)
+ }
+}
+
+func decodeRewardFields(tree *tries.VectorCommitmentTree, result map[string]any) {
+ // Reward schema - just has DelegateAddress at order 0
+ // Nothing else to decode for now
+}
+
+func decodeProverStatus(status byte) string {
+ // Prover status mapping (internal byte -> name)
+ switch status {
+ case 0:
+ return "Joining"
+ case 1:
+ return "Active"
+ case 2:
+ return "Paused"
+ case 3:
+ return "Leaving"
+ case 4:
+ return "Rejected"
+ case 5:
+ return "Kicked"
+ default:
+ return fmt.Sprintf("Unknown(%d)", status)
+ }
+}
+
+func decodeAllocationStatus(status byte) string {
+ // Allocation status mapping (same as prover status)
+ return decodeProverStatus(status)
+}
+
func summarizeHypergraphTreeNode(value []byte) string {
if len(value) == 0 {
return "hypergraph_tree_node "
diff --git a/node/store/pebble.go b/node/store/pebble.go
index 6cf4e83..7f72f7b 100644
--- a/node/store/pebble.go
+++ b/node/store/pebble.go
@@ -1,6 +1,7 @@
package store
import (
+ "bytes"
"context"
"encoding/binary"
"encoding/hex"
@@ -16,6 +17,7 @@ import (
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/config"
"source.quilibrium.com/quilibrium/monorepo/types/store"
+ "source.quilibrium.com/quilibrium/monorepo/types/tries"
)
type PebbleDB struct {
@@ -55,6 +57,11 @@ var pebbleMigrations = []func(*pebble.Batch) error{
migration_2_1_0_153,
migration_2_1_0_154,
migration_2_1_0_155,
+ migration_2_1_0_156,
+ migration_2_1_0_157,
+ migration_2_1_0_158,
+ migration_2_1_0_159,
+ migration_2_1_0_17,
}
func NewPebbleDB(
@@ -342,8 +349,8 @@ func (p *PebbleDB) NewIter(lowerBound []byte, upperBound []byte) (
error,
) {
return p.db.NewIter(&pebble.IterOptions{
- LowerBound: lowerBound, // buildutils:allow-slice-alias slice is static
- UpperBound: upperBound, // buildutils:allow-slice-alias slice is static
+ LowerBound: lowerBound,
+ UpperBound: upperBound,
})
}
@@ -415,8 +422,8 @@ func (t *PebbleTransaction) NewIter(lowerBound []byte, upperBound []byte) (
error,
) {
return t.b.NewIter(&pebble.IterOptions{
- LowerBound: lowerBound, // buildutils:allow-slice-alias slice is static
- UpperBound: upperBound, // buildutils:allow-slice-alias slice is static
+ LowerBound: lowerBound,
+ UpperBound: upperBound,
})
}
@@ -437,7 +444,7 @@ func rightAlign(data []byte, size int) []byte {
l := len(data)
if l == size {
- return data // buildutils:allow-slice-alias slice is static
+ return data
}
if l > size {
@@ -651,6 +658,103 @@ func migration_2_1_0_155(b *pebble.Batch) error {
return migration_2_1_0_15(b)
}
+func migration_2_1_0_156(b *pebble.Batch) error {
+ return migration_2_1_0_15(b)
+}
+
+func migration_2_1_0_157(b *pebble.Batch) error {
+ return migration_2_1_0_15(b)
+}
+
+func migration_2_1_0_158(b *pebble.Batch) error {
+ return migration_2_1_0_15(b)
+}
+
+func migration_2_1_0_159(b *pebble.Batch) error {
+ return migration_2_1_0_15(b)
+}
+
+func migration_2_1_0_17(b *pebble.Batch) error {
+ // Global shard key: L1={0,0,0}, L2=0xff*32
+ globalShardKey := tries.ShardKey{
+ L1: [3]byte{},
+ L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)),
+ }
+ // Next shard key (for exclusive upper bound): L1={0,0,1}, L2=0x00*32
+ nextShardKey := tries.ShardKey{
+ L1: [3]byte{0, 0, 1},
+ L2: [32]byte{},
+ }
+
+ // Delete vertex data for global domain
+ // Vertex data keys: {0x09, 0xF0, domain[32], address[32]}
+ // Start: {0x09, 0xF0, 0xff*32} (prefix for global domain)
+ // End: {0x09, 0xF1} (next prefix type, ensures we capture all addresses)
+ if err := b.DeleteRange(
+ hypergraphVertexDataKey(globalShardKey.L2[:]),
+ []byte{HYPERGRAPH_SHARD, VERTEX_DATA + 1},
+ &pebble.WriteOptions{},
+ ); err != nil {
+ return err
+ }
+
+ // Delete vertex adds tree nodes
+ if err := b.DeleteRange(
+ hypergraphVertexAddsTreeNodeKey(globalShardKey, []byte{}),
+ hypergraphVertexAddsTreeNodeKey(nextShardKey, []byte{}),
+ &pebble.WriteOptions{},
+ ); err != nil {
+ return err
+ }
+
+ // Delete vertex adds tree nodes by path
+ if err := b.DeleteRange(
+ hypergraphVertexAddsTreeNodeByPathKey(globalShardKey, []int{}),
+ hypergraphVertexAddsTreeNodeByPathKey(nextShardKey, []int{}),
+ &pebble.WriteOptions{},
+ ); err != nil {
+ return err
+ }
+
+ // Delete hyperedge adds tree nodes
+ if err := b.DeleteRange(
+ hypergraphHyperedgeAddsTreeNodeKey(globalShardKey, []byte{}),
+ hypergraphHyperedgeAddsTreeNodeKey(nextShardKey, []byte{}),
+ &pebble.WriteOptions{},
+ ); err != nil {
+ return err
+ }
+
+ // Delete hyperedge adds tree nodes by path
+ if err := b.DeleteRange(
+ hypergraphHyperedgeAddsTreeNodeByPathKey(globalShardKey, []int{}),
+ hypergraphHyperedgeAddsTreeNodeByPathKey(nextShardKey, []int{}),
+ &pebble.WriteOptions{},
+ ); err != nil {
+ return err
+ }
+
+ // Delete vertex adds tree root
+ if err := b.DeleteRange(
+ hypergraphVertexAddsTreeRootKey(globalShardKey),
+ hypergraphVertexAddsTreeRootKey(nextShardKey),
+ &pebble.WriteOptions{},
+ ); err != nil {
+ return err
+ }
+
+ // Delete hyperedge adds tree root
+ if err := b.DeleteRange(
+ hypergraphHyperedgeAddsTreeRootKey(globalShardKey),
+ hypergraphHyperedgeAddsTreeRootKey(nextShardKey),
+ &pebble.WriteOptions{},
+ ); err != nil {
+ return err
+ }
+
+ return nil
+}
+
type pebbleSnapshotDB struct {
snap *pebble.Snapshot
}
@@ -676,8 +780,8 @@ func (p *pebbleSnapshotDB) NewIter(lowerBound []byte, upperBound []byte) (
error,
) {
return p.snap.NewIter(&pebble.IterOptions{
- LowerBound: lowerBound, // buildutils:allow-slice-alias slice is static
- UpperBound: upperBound, // buildutils:allow-slice-alias slice is static
+ LowerBound: lowerBound,
+ UpperBound: upperBound,
})
}
diff --git a/node/store/pebble_test.go b/node/store/pebble_test.go
index 76f4d49..36ea602 100644
--- a/node/store/pebble_test.go
+++ b/node/store/pebble_test.go
@@ -1,10 +1,13 @@
package store
import (
+ "encoding/hex"
+ "fmt"
"os"
"path/filepath"
"testing"
+ "github.com/iden3/go-iden3-crypto/poseidon"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
@@ -12,6 +15,13 @@ import (
"source.quilibrium.com/quilibrium/monorepo/config"
)
+func TestPoseidon(t *testing.T) {
+ bi, err := poseidon.HashBytes([]byte("testvector"))
+ require.NoError(t, err)
+ fmt.Println(hex.EncodeToString(bi.FillBytes(make([]byte, 32))))
+ assert.FailNow(t, "")
+}
+
func TestNewPebbleDB_ExistingDirectory(t *testing.T) {
testDir, err := os.MkdirTemp("", "pebble-test-existing-*")
require.NoError(t, err)
diff --git a/node/worker/manager.go b/node/worker/manager.go
index 862a938..10ada64 100644
--- a/node/worker/manager.go
+++ b/node/worker/manager.go
@@ -819,9 +819,6 @@ func (w *WorkerManager) getMultiaddrOfWorker(coreId uint) (
rpcMultiaddr = strings.Replace(rpcMultiaddr, "/0.0.0.0/", "/127.0.0.1/", 1)
rpcMultiaddr = strings.Replace(rpcMultiaddr, "/0:0:0:0:0:0:0:0/", "/::1/", 1)
rpcMultiaddr = strings.Replace(rpcMultiaddr, "/::/", "/::1/", 1)
- // force TCP as stream is not supported over UDP/QUIC
- rpcMultiaddr = strings.Replace(rpcMultiaddr, "/quic-v1", "", 1)
- rpcMultiaddr = strings.Replace(rpcMultiaddr, "udp", "tcp", 1)
ma, err := multiaddr.StringCast(rpcMultiaddr)
return ma, errors.Wrap(err, "get multiaddr of worker")
diff --git a/types/hypergraph/hypergraph.go b/types/hypergraph/hypergraph.go
index f2ad133..c922124 100644
--- a/types/hypergraph/hypergraph.go
+++ b/types/hypergraph/hypergraph.go
@@ -194,6 +194,42 @@ type Hypergraph interface {
frameNumber uint64,
) error
+ // Hard delete operations - these bypass CRDT semantics for pruning
+
+ // DeleteVertexAdd performs a hard delete of a vertex from the VertexAdds
+ // set. Unlike RemoveVertex (which adds to VertexRemoves for CRDT semantics),
+ // this actually removes the entry from VertexAdds and deletes the associated
+ // vertex data. This is used for pruning stale/orphaned data.
+ DeleteVertexAdd(
+ txn tries.TreeBackingStoreTransaction,
+ shardKey tries.ShardKey,
+ vertexID [64]byte,
+ ) error
+
+ // DeleteVertexRemove performs a hard delete of a vertex from the
+ // VertexRemoves set. This is used for pruning stale data.
+ DeleteVertexRemove(
+ txn tries.TreeBackingStoreTransaction,
+ shardKey tries.ShardKey,
+ vertexID [64]byte,
+ ) error
+
+ // DeleteHyperedgeAdd performs a hard delete of a hyperedge from the
+ // HyperedgeAdds set. This is used for pruning stale/orphaned data.
+ DeleteHyperedgeAdd(
+ txn tries.TreeBackingStoreTransaction,
+ shardKey tries.ShardKey,
+ hyperedgeID [64]byte,
+ ) error
+
+ // DeleteHyperedgeRemove performs a hard delete of a hyperedge from the
+ // HyperedgeRemoves set. This is used for pruning stale data.
+ DeleteHyperedgeRemove(
+ txn tries.TreeBackingStoreTransaction,
+ shardKey tries.ShardKey,
+ hyperedgeID [64]byte,
+ ) error
+
// Hyperedge data operations
// GetHyperedgeExtrinsics retrieves the extrinsic tree of a hyperedge, which
diff --git a/types/mocks/hypergraph.go b/types/mocks/hypergraph.go
index 87c2e2e..541d406 100644
--- a/types/mocks/hypergraph.go
+++ b/types/mocks/hypergraph.go
@@ -555,6 +555,46 @@ func (h *MockHypergraph) ImportTree(
return args.Error(0)
}
+// DeleteVertexAdd implements hypergraph.Hypergraph.
+func (h *MockHypergraph) DeleteVertexAdd(
+ txn tries.TreeBackingStoreTransaction,
+ shardKey tries.ShardKey,
+ vertexID [64]byte,
+) error {
+ args := h.Called(txn, shardKey, vertexID)
+ return args.Error(0)
+}
+
+// DeleteVertexRemove implements hypergraph.Hypergraph.
+func (h *MockHypergraph) DeleteVertexRemove(
+ txn tries.TreeBackingStoreTransaction,
+ shardKey tries.ShardKey,
+ vertexID [64]byte,
+) error {
+ args := h.Called(txn, shardKey, vertexID)
+ return args.Error(0)
+}
+
+// DeleteHyperedgeAdd implements hypergraph.Hypergraph.
+func (h *MockHypergraph) DeleteHyperedgeAdd(
+ txn tries.TreeBackingStoreTransaction,
+ shardKey tries.ShardKey,
+ hyperedgeID [64]byte,
+) error {
+ args := h.Called(txn, shardKey, hyperedgeID)
+ return args.Error(0)
+}
+
+// DeleteHyperedgeRemove implements hypergraph.Hypergraph.
+func (h *MockHypergraph) DeleteHyperedgeRemove(
+ txn tries.TreeBackingStoreTransaction,
+ shardKey tries.ShardKey,
+ hyperedgeID [64]byte,
+) error {
+ args := h.Called(txn, shardKey, hyperedgeID)
+ return args.Error(0)
+}
+
// Ensure MockHypergraph implements Hypergraph
var _ hg.Hypergraph = (*MockHypergraph)(nil)
diff --git a/types/tries/lazy_proof_tree.go b/types/tries/lazy_proof_tree.go
index 6885c39..b47b729 100644
--- a/types/tries/lazy_proof_tree.go
+++ b/types/tries/lazy_proof_tree.go
@@ -569,6 +569,10 @@ type TreeBackingStore interface {
shardKey ShardKey,
leaf *RawLeafData,
) error
+ DeleteVertexTree(
+ txn TreeBackingStoreTransaction,
+ id []byte,
+ ) error
}
// LazyVectorCommitmentTree is a lazy-loaded (from a TreeBackingStore based