Merge branch 'develop' into fix/underscore

This commit is contained in:
Hamza Hamud 2025-12-19 19:04:50 +00:00
commit d3778a7056
25 changed files with 3199 additions and 185 deletions

2
Cargo.lock generated
View File

@ -307,7 +307,7 @@ checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae"
[[package]]
name = "bls48581"
version = "0.1.0"
version = "2.1.0"
dependencies = [
"criterion 0.4.0",
"hex 0.4.3",

View File

@ -1,3 +1,10 @@
# 2.1.0.17
- resolve sync race condition with prover registry pruning
- update hypergraph to directly manage raw deletions
- migration to resolve records issue from above
- resolve early snapshot termination issue
- global halts are now just halts on processing non-global ops
# 2.1.0.16
- build_utils static code analysis checker for underlying slice assignment
- hypergraph snapshot manager now uses in memory snapshot instead of pebble snapshot

View File

@ -43,7 +43,7 @@ func FormatVersion(version []byte) string {
}
func GetPatchNumber() byte {
return 0x10
return 0x11
}
func GetRCNumber() byte {

View File

@ -24,7 +24,6 @@ replace source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub => ../go-
require (
github.com/prometheus/client_golang v1.22.0
google.golang.org/grpc v1.72.0
source.quilibrium.com/quilibrium/monorepo/protobufs v0.0.0-00010101000000-000000000000
source.quilibrium.com/quilibrium/monorepo/types v0.0.0-00010101000000-000000000000
source.quilibrium.com/quilibrium/monorepo/utils v0.0.0-00010101000000-000000000000
@ -70,7 +69,7 @@ require (
golang.org/x/text v0.26.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb // indirect
google.golang.org/grpc v1.72.0 // indirect
google.golang.org/protobuf v1.36.6 // indirect
lukechampine.com/blake3 v1.4.1 // indirect
)

View File

@ -762,3 +762,105 @@ func boolToString(b bool) string {
}
return "false"
}
// DeleteVertexAdd performs a hard delete of a vertex from the VertexAdds set.
// Unlike RemoveVertex (which adds to VertexRemoves for CRDT semantics), this
// actually removes the entry from VertexAdds and deletes the associated vertex
// data. This is used for pruning stale/orphaned data that should not be synced.
//
// The caller must provide a transaction for atomic deletion of both the tree
// entry and the vertex data.
func (hg *HypergraphCRDT) DeleteVertexAdd(
txn tries.TreeBackingStoreTransaction,
shardKey tries.ShardKey,
vertexID [64]byte,
) error {
hg.mu.Lock()
defer hg.mu.Unlock()
set := hg.getVertexAddsSet(shardKey)
tree := set.GetTree()
// Delete from the tree (removes tree node and path index)
if err := tree.Delete(txn, vertexID[:]); err != nil {
return errors.Wrap(err, "delete vertex add: tree delete")
}
// Delete the vertex data
if err := tree.Store.DeleteVertexTree(txn, vertexID[:]); err != nil {
// Log but don't fail - vertex data may already be gone
hg.logger.Debug(
"delete vertex add: vertex data not found",
zap.String("vertex_id", string(vertexID[:])),
zap.Error(err),
)
}
return nil
}
// DeleteVertexRemove performs a hard delete of a vertex from the VertexRemoves
// set. This removes the entry from VertexRemoves, effectively "un-removing" the
// vertex if it still exists in VertexAdds. This is used for pruning stale data.
func (hg *HypergraphCRDT) DeleteVertexRemove(
txn tries.TreeBackingStoreTransaction,
shardKey tries.ShardKey,
vertexID [64]byte,
) error {
hg.mu.Lock()
defer hg.mu.Unlock()
set := hg.getVertexRemovesSet(shardKey)
tree := set.GetTree()
// Delete from the tree
if err := tree.Delete(txn, vertexID[:]); err != nil {
return errors.Wrap(err, "delete vertex remove: tree delete")
}
return nil
}
// DeleteHyperedgeAdd performs a hard delete of a hyperedge from the
// HyperedgeAdds set. Unlike RemoveHyperedge (which adds to HyperedgeRemoves for
// CRDT semantics), this actually removes the entry from HyperedgeAdds. This is
// used for pruning stale/orphaned data.
func (hg *HypergraphCRDT) DeleteHyperedgeAdd(
txn tries.TreeBackingStoreTransaction,
shardKey tries.ShardKey,
hyperedgeID [64]byte,
) error {
hg.mu.Lock()
defer hg.mu.Unlock()
set := hg.getHyperedgeAddsSet(shardKey)
tree := set.GetTree()
// Delete from the tree
if err := tree.Delete(txn, hyperedgeID[:]); err != nil {
return errors.Wrap(err, "delete hyperedge add: tree delete")
}
return nil
}
// DeleteHyperedgeRemove performs a hard delete of a hyperedge from the
// HyperedgeRemoves set. This is used for pruning stale data.
func (hg *HypergraphCRDT) DeleteHyperedgeRemove(
txn tries.TreeBackingStoreTransaction,
shardKey tries.ShardKey,
hyperedgeID [64]byte,
) error {
hg.mu.Lock()
defer hg.mu.Unlock()
set := hg.getHyperedgeRemovesSet(shardKey)
tree := set.GetTree()
// Delete from the tree
if err := tree.Delete(txn, hyperedgeID[:]); err != nil {
return errors.Wrap(err, "delete hyperedge remove: tree delete")
}
return nil
}

View File

@ -175,11 +175,17 @@ func (m *snapshotManager) publish(root []byte) {
m.mu.Lock()
defer m.mu.Unlock()
// Remove all handles from the map so new syncs get new handles.
// Handles with active refs will be released when their last user calls release().
// Handles with no active refs (only the initial ref from creation) are released now.
for key, handle := range m.handles {
delete(m.handles, key)
if handle != nil {
// releaseRef decrements the ref count. If this was the last ref
// (i.e., no active sync sessions), the underlying DB is released.
// If there are active sync sessions, they will release it when done.
handle.releaseRef(m.logger)
}
delete(m.handles, key)
}
m.root = nil
@ -221,6 +227,11 @@ func (m *snapshotManager) acquire(
}
handle := newSnapshotHandle(key, storeSnapshot, release, m.root)
// Acquire a ref for the caller. The handle is created with refs=1 (the owner ref
// held by the snapshot manager), and this adds another ref for the sync session.
// This ensures publish() can release the owner ref without closing the DB while
// a sync is still using it.
handle.acquire()
m.handles[key] = handle
return handle
}

View File

@ -451,7 +451,7 @@ func getChildSegments(
prefix,
)
if err != nil && !strings.Contains(err.Error(), "item not found") {
panic(err)
return nodes, 0
}
if isPrefix(prefix, path) {
@ -1345,7 +1345,7 @@ func getNodeAtPath(
slices.Concat(n.FullPrefix, []int{int(childIndex)}),
)
if err != nil && !strings.Contains(err.Error(), "item not found") {
logger.Panic("failed to get node by path", zap.Error(err))
return nil
}
if child == nil {
@ -1425,7 +1425,7 @@ func getBranchInfoFromTree(
slices.Concat(branch.FullPrefix, []int{i}),
)
if err != nil && !strings.Contains(err.Error(), "item not found") {
logger.Panic("failed to get node by path", zap.Error(err))
return nil, err
}
}
@ -1495,7 +1495,7 @@ func ensureCommittedNode(
path,
)
if err != nil && !strings.Contains(err.Error(), "item not found") {
logger.Panic("failed to reload node by path", zap.Error(err))
return nil
}
if reloaded != nil {
return reloaded

View File

@ -155,6 +155,10 @@ type AppConsensusEngine struct {
globalProverRootSynced atomic.Bool
globalProverSyncInProgress atomic.Bool
// Genesis initialization
genesisInitialized atomic.Bool
genesisInitChan chan *protobufs.GlobalFrame
// Message queues
consensusMessageQueue chan *pb.Message
proverMessageQueue chan *pb.Message
@ -294,6 +298,7 @@ func NewAppConsensusEngine(
peerAuthCache: make(map[string]time.Time),
peerInfoDigestCache: make(map[string]struct{}),
keyRegistryDigestCache: make(map[string]struct{}),
genesisInitChan: make(chan *protobufs.GlobalFrame, 1),
}
engine.frameChainChecker = NewAppFrameChainChecker(clockStore, logger, appAddress)
@ -504,7 +509,10 @@ func NewAppConsensusEngine(
*protobufs.AppShardFrame,
*protobufs.ProposalVote,
]
initializeCertifiedGenesis := func() {
// Check if we need to await network data for genesis initialization
needsNetworkGenesis := false
initializeCertifiedGenesis := func(markInitialized bool) {
frame, qc := engine.initializeGenesis()
state = &models.CertifiedState[*protobufs.AppShardFrame]{
State: &models.State[*protobufs.AppShardFrame]{
@ -515,17 +523,72 @@ func NewAppConsensusEngine(
CertifyingQuorumCertificate: qc,
}
pending = nil
if markInitialized {
engine.genesisInitialized.Store(true)
}
}
initializeCertifiedGenesisFromNetwork := func(
difficulty uint32,
shardInfo []*protobufs.AppShardInfo,
) {
// Delete the temporary genesis frame first
if err := engine.clockStore.DeleteShardClockFrameRange(
engine.appAddress, 0, 1,
); err != nil {
logger.Debug(
"could not delete temporary genesis frame",
zap.Error(err),
)
}
frame, qc := engine.initializeGenesisWithParams(difficulty, shardInfo)
state = &models.CertifiedState[*protobufs.AppShardFrame]{
State: &models.State[*protobufs.AppShardFrame]{
Rank: 0,
Identifier: frame.Identity(),
State: &frame,
},
CertifyingQuorumCertificate: qc,
}
pending = nil
engine.genesisInitialized.Store(true)
logger.Info(
"initialized genesis with network data",
zap.Uint32("difficulty", difficulty),
zap.Int("shard_info_count", len(shardInfo)),
)
}
if err != nil {
initializeCertifiedGenesis()
// No consensus state exists - check if we have a genesis frame already
_, _, genesisErr := engine.clockStore.GetShardClockFrame(
engine.appAddress,
0,
false,
)
if genesisErr != nil && errors.Is(genesisErr, store.ErrNotFound) {
// No genesis exists - we need to await network data
needsNetworkGenesis = true
logger.Warn(
"app genesis missing - will await network data",
zap.String("shard_address", hex.EncodeToString(appAddress)),
)
// Initialize with default values for now
// This will be re-done after receiving network data
// Pass false to NOT mark as initialized - we're waiting for network data
initializeCertifiedGenesis(false)
} else {
initializeCertifiedGenesis(true)
}
} else {
qc, err := engine.clockStore.GetQuorumCertificate(
engine.appAddress,
latest.FinalizedRank,
)
if err != nil || qc.GetFrameNumber() == 0 {
initializeCertifiedGenesis()
initializeCertifiedGenesis(true)
} else {
frame, _, err := engine.clockStore.GetShardClockFrame(
engine.appAddress,
@ -535,8 +598,10 @@ func NewAppConsensusEngine(
if err != nil {
panic(err)
}
parentFrame, err := engine.clockStore.GetGlobalClockFrame(
qc.GetFrameNumber() - 1,
parentFrame, _, err := engine.clockStore.GetShardClockFrame(
engine.appAddress,
qc.GetFrameNumber()-1,
false,
)
if err != nil {
panic(err)
@ -656,6 +721,48 @@ func NewAppConsensusEngine(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
) {
// If we need network genesis, await it before starting consensus
if needsNetworkGenesis {
engine.logger.Info(
"awaiting network data for genesis initialization",
zap.String("shard_address", engine.appAddressHex),
)
// Wait for a global frame from pubsub
globalFrame, err := engine.awaitFirstGlobalFrame(ctx)
if err != nil {
engine.logger.Error(
"failed to await global frame for genesis",
zap.Error(err),
)
ctx.Throw(err)
return
}
// Fetch shard info from bootstrap peers
shardInfo, err := engine.fetchShardInfoFromBootstrap(ctx)
if err != nil {
engine.logger.Warn(
"failed to fetch shard info from bootstrap peers",
zap.Error(err),
)
// Continue anyway - we at least have the global frame
}
engine.logger.Info(
"received network genesis data",
zap.Uint64("global_frame_number", globalFrame.Header.FrameNumber),
zap.Uint32("difficulty", globalFrame.Header.Difficulty),
zap.Int("shard_info_count", len(shardInfo)),
)
// Re-initialize genesis with the correct network data
initializeCertifiedGenesisFromNetwork(
globalFrame.Header.Difficulty,
shardInfo,
)
}
if err := engine.waitForProverRegistration(ctx); err != nil {
engine.logger.Error("prover unavailable", zap.Error(err))
ctx.Throw(err)
@ -1401,6 +1508,16 @@ func (e *AppConsensusEngine) updateMetricsLoop(
func (e *AppConsensusEngine) initializeGenesis() (
*protobufs.AppShardFrame,
*protobufs.QuorumCertificate,
) {
return e.initializeGenesisWithParams(e.config.Engine.Difficulty, nil)
}
func (e *AppConsensusEngine) initializeGenesisWithParams(
difficulty uint32,
shardInfo []*protobufs.AppShardInfo,
) (
*protobufs.AppShardFrame,
*protobufs.QuorumCertificate,
) {
// Initialize state roots for hypergraph
stateRoots := make([][]byte, 4)
@ -1408,9 +1525,14 @@ func (e *AppConsensusEngine) initializeGenesis() (
stateRoots[i] = make([]byte, 64)
}
// Use provided difficulty or fall back to config
if difficulty == 0 {
difficulty = e.config.Engine.Difficulty
}
genesisHeader, err := e.frameProver.ProveFrameHeaderGenesis(
e.appAddress,
80000,
difficulty,
make([]byte, 516),
100,
)
@ -1560,6 +1682,127 @@ func (e *AppConsensusEngine) ensureAppGenesis() error {
return nil
}
// fetchShardInfoFromBootstrap connects to bootstrap peers and fetches shard info
// for this app's address using the GetAppShards RPC.
func (e *AppConsensusEngine) fetchShardInfoFromBootstrap(
ctx context.Context,
) ([]*protobufs.AppShardInfo, error) {
bootstrapPeers := e.config.P2P.BootstrapPeers
if len(bootstrapPeers) == 0 {
return nil, errors.New("no bootstrap peers configured")
}
for _, peerAddr := range bootstrapPeers {
shardInfo, err := e.tryFetchShardInfoFromPeer(ctx, peerAddr)
if err != nil {
e.logger.Debug(
"failed to fetch shard info from peer",
zap.String("peer", peerAddr),
zap.Error(err),
)
continue
}
if len(shardInfo) > 0 {
e.logger.Info(
"fetched shard info from bootstrap peer",
zap.String("peer", peerAddr),
zap.Int("shard_count", len(shardInfo)),
)
return shardInfo, nil
}
}
return nil, errors.New("failed to fetch shard info from any bootstrap peer")
}
func (e *AppConsensusEngine) tryFetchShardInfoFromPeer(
ctx context.Context,
peerAddr string,
) ([]*protobufs.AppShardInfo, error) {
// Parse multiaddr to extract peer ID and address
ma, err := multiaddr.StringCast(peerAddr)
if err != nil {
return nil, errors.Wrap(err, "parse multiaddr")
}
// Extract peer ID from the multiaddr
peerIDStr, err := ma.ValueForProtocol(multiaddr.P_P2P)
if err != nil {
return nil, errors.Wrap(err, "extract peer id")
}
peerID, err := peer.Decode(peerIDStr)
if err != nil {
return nil, errors.Wrap(err, "decode peer id")
}
// Create gRPC connection to the peer
mga, err := mn.ToNetAddr(ma)
if err != nil {
return nil, errors.Wrap(err, "convert multiaddr")
}
creds, err := p2p.NewPeerAuthenticator(
e.logger,
e.config.P2P,
nil,
nil,
nil,
nil,
[][]byte{[]byte(peerID)},
map[string]channel.AllowedPeerPolicyType{},
map[string]channel.AllowedPeerPolicyType{},
).CreateClientTLSCredentials([]byte(peerID))
if err != nil {
return nil, errors.Wrap(err, "create credentials")
}
conn, err := grpc.NewClient(
mga.String(),
grpc.WithTransportCredentials(creds),
)
if err != nil {
return nil, errors.Wrap(err, "dial peer")
}
defer conn.Close()
client := protobufs.NewGlobalServiceClient(conn)
reqCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
resp, err := client.GetAppShards(reqCtx, &protobufs.GetAppShardsRequest{
ShardKey: e.appAddress,
Prefix: []uint32{},
})
if err != nil {
return nil, errors.Wrap(err, "get app shards")
}
return resp.GetInfo(), nil
}
// awaitFirstGlobalFrame waits for a global frame to arrive via pubsub and
// returns it. This is used during genesis initialization to get the correct
// difficulty from the network.
func (e *AppConsensusEngine) awaitFirstGlobalFrame(
ctx context.Context,
) (*protobufs.GlobalFrame, error) {
e.logger.Info("awaiting first global frame from network for genesis initialization")
select {
case <-ctx.Done():
return nil, ctx.Err()
case frame := <-e.genesisInitChan:
e.logger.Info(
"received global frame for genesis initialization",
zap.Uint64("frame_number", frame.Header.FrameNumber),
zap.Uint32("difficulty", frame.Header.Difficulty),
)
return frame, nil
}
}
func (e *AppConsensusEngine) waitForProverRegistration(
ctx lifecycle.SignalerContext,
) error {

View File

@ -123,6 +123,10 @@ func (e *AppConsensusEngine) LeaderForRank(rank uint64) (
return "", errors.Wrap(err, "leader for rank")
}
if e.config.P2P.Network == 0 && len(proverSet) < 3 {
return models.Identity(make([]byte, 32)), nil
}
// Handle condition where prover cannot be yet known due to lack of sync:
if len(proverSet) == 0 {
return models.Identity(make([]byte, 32)), nil

View File

@ -1368,6 +1368,20 @@ func (e *AppConsensusEngine) handleGlobalFrameMessage(message *pb.Message) {
return
}
// If genesis hasn't been initialized yet, send this frame to the
// genesis init channel (non-blocking)
if !e.genesisInitialized.Load() {
select {
case e.genesisInitChan <- frame:
e.logger.Debug(
"sent global frame to genesis init channel",
zap.Uint64("frame_number", frame.Header.FrameNumber),
)
default:
// Channel already has a frame, skip
}
}
if err := e.globalTimeReel.Insert(frame); err != nil {
// Success metric recorded at the end of processing
globalFramesProcessedTotal.WithLabelValues("error").Inc()

View File

@ -116,16 +116,21 @@ func (e *GlobalConsensusEngine) checkShardCoverage(frameNumber uint64) error {
remaining = int(haltGraceFrames - streak.Count)
}
if remaining <= 0 && e.config.P2P.Network == 0 {
e.logger.Error(
"CRITICAL: Shard has insufficient coverage - triggering network halt",
zap.String("shard_address", hex.EncodeToString([]byte(shardAddress))),
zap.Uint64("prover_count", proverCount),
zap.Uint64("halt_threshold", haltThreshold),
)
// Instead of halting, enter prover-only mode at the global level
// This allows prover messages to continue while blocking other messages
if !e.proverOnlyMode.Load() {
e.logger.Warn(
"CRITICAL: Shard has insufficient coverage - entering prover-only mode (non-prover messages will be dropped)",
zap.String("shard_address", hex.EncodeToString([]byte(shardAddress))),
zap.Uint64("prover_count", proverCount),
zap.Uint64("halt_threshold", haltThreshold),
)
e.proverOnlyMode.Store(true)
}
// Emit halt event
// Emit warning event (not halt) so monitoring knows we're in degraded state
e.emitCoverageEvent(
typesconsensus.ControlEventCoverageHalt,
typesconsensus.ControlEventCoverageWarn,
&typesconsensus.CoverageEventData{
ShardAddress: []byte(shardAddress),
ProverCount: int(proverCount),
@ -133,7 +138,7 @@ func (e *GlobalConsensusEngine) checkShardCoverage(frameNumber uint64) error {
AttestedStorage: attestedStorage,
TreeMetadata: coverage.TreeMetadata,
Message: fmt.Sprintf(
"Shard has only %d provers, network halt required",
"Shard has only %d provers, prover-only mode active (non-prover messages dropped)",
proverCount,
),
},
@ -170,6 +175,16 @@ func (e *GlobalConsensusEngine) checkShardCoverage(frameNumber uint64) error {
// Not in critical state — clear any ongoing streak
e.clearStreak(shardAddress)
// If we were in prover-only mode and coverage is restored, exit prover-only mode
if e.proverOnlyMode.Load() {
e.logger.Info(
"Coverage restored - exiting prover-only mode",
zap.String("shard_address", hex.EncodeToString([]byte(shardAddress))),
zap.Uint64("prover_count", proverCount),
)
e.proverOnlyMode.Store(false)
}
// Check for low coverage
if proverCount < minProvers {
e.handleLowCoverage([]byte(shardAddress), coverage, minProvers)

View File

@ -531,6 +531,85 @@ func (e *GlobalConsensusEngine) createStubGenesis() *protobufs.GlobalFrame {
commitments[i] = &tries.VectorCommitmentTree{}
}
e.establishTestnetGenesisProvers()
roots, err := e.hypergraph.Commit(0)
if err != nil {
e.logger.Error("could not commit", zap.Error(err))
return nil
}
// Parse and set initial commitments from JSON
for shardKey, commits := range roots {
for i := 0; i < 3; i++ {
commitments[shardKey.L1[i]].Insert(
shardKey.L2[:],
commits[0],
nil,
big.NewInt(int64(len(commits[0]))),
)
commitments[shardKey.L1[i]].Commit(e.inclusionProver, false)
}
}
proverRoots := roots[tries.ShardKey{
L1: [3]byte{},
L2: intrinsics.GLOBAL_INTRINSIC_ADDRESS,
}]
proverRoot := proverRoots[0]
genesisHeader.ProverTreeCommitment = proverRoot
for i := 0; i < 256; i++ {
genesisHeader.GlobalCommitments[i] = commitments[i].Commit(
e.inclusionProver,
false,
)
}
// Establish an empty signature payload this avoids panics on broken
// header readers
genesisHeader.PublicKeySignatureBls48581 =
&protobufs.BLS48581AggregateSignature{
Signature: make([]byte, 0),
PublicKey: &protobufs.BLS48581G2PublicKey{
KeyValue: make([]byte, 0),
},
Bitmask: make([]byte, 0),
}
genesisFrame := &protobufs.GlobalFrame{
Header: genesisHeader,
}
// Compute frame ID and store the full frame
frameIDBI, _ := poseidon.HashBytes(genesisHeader.Output)
frameID := frameIDBI.FillBytes(make([]byte, 32))
e.frameStoreMu.Lock()
e.frameStore[string(frameID)] = genesisFrame
e.frameStoreMu.Unlock()
// Add to time reel
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
panic(err)
}
if err := e.clockStore.PutGlobalClockFrame(genesisFrame, txn); err != nil {
txn.Abort()
e.logger.Error("could not add frame", zap.Error(err))
return nil
}
if err := txn.Commit(); err != nil {
txn.Abort()
e.logger.Error("could not add frame", zap.Error(err))
return nil
}
return genesisFrame
}
func (e *GlobalConsensusEngine) establishTestnetGenesisProvers() error {
var proverPubKeys [][]byte
var err error
if e.config.P2P.Network != 99 && e.config.Engine != nil &&
@ -647,83 +726,9 @@ func (e *GlobalConsensusEngine) createStubGenesis() *protobufs.GlobalFrame {
err = state.Commit()
if err != nil {
e.logger.Error("failed to commit", zap.Error(err))
return nil
}
roots, err := e.hypergraph.Commit(0)
if err != nil {
e.logger.Error("could not commit", zap.Error(err))
return nil
}
// Parse and set initial commitments from JSON
for shardKey, commits := range roots {
for i := 0; i < 3; i++ {
commitments[shardKey.L1[i]].Insert(
shardKey.L2[:],
commits[0],
nil,
big.NewInt(int64(len(commits[0]))),
)
commitments[shardKey.L1[i]].Commit(e.inclusionProver, false)
}
}
proverRoots := roots[tries.ShardKey{
L1: [3]byte{},
L2: intrinsics.GLOBAL_INTRINSIC_ADDRESS,
}]
proverRoot := proverRoots[0]
genesisHeader.ProverTreeCommitment = proverRoot
for i := 0; i < 256; i++ {
genesisHeader.GlobalCommitments[i] = commitments[i].Commit(
e.inclusionProver,
false,
)
}
// Establish an empty signature payload this avoids panics on broken
// header readers
genesisHeader.PublicKeySignatureBls48581 =
&protobufs.BLS48581AggregateSignature{
Signature: make([]byte, 0),
PublicKey: &protobufs.BLS48581G2PublicKey{
KeyValue: make([]byte, 0),
},
Bitmask: make([]byte, 0),
}
genesisFrame := &protobufs.GlobalFrame{
Header: genesisHeader,
}
// Compute frame ID and store the full frame
frameIDBI, _ := poseidon.HashBytes(genesisHeader.Output)
frameID := frameIDBI.FillBytes(make([]byte, 32))
e.frameStoreMu.Lock()
e.frameStore[string(frameID)] = genesisFrame
e.frameStoreMu.Unlock()
// Add to time reel
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
panic(err)
}
if err := e.clockStore.PutGlobalClockFrame(genesisFrame, txn); err != nil {
txn.Abort()
e.logger.Error("could not add frame", zap.Error(err))
return nil
}
if err := txn.Commit(); err != nil {
txn.Abort()
e.logger.Error("could not add frame", zap.Error(err))
return nil
}
return genesisFrame
return nil
}
// InitializeGenesisState ensures the global genesis frame and QC exist using the

View File

@ -37,6 +37,7 @@ import (
"source.quilibrium.com/quilibrium/monorepo/consensus/validator"
"source.quilibrium.com/quilibrium/monorepo/consensus/verification"
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
hgcrdt "source.quilibrium.com/quilibrium/monorepo/hypergraph"
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/aggregator"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/provers"
@ -199,6 +200,7 @@ type GlobalConsensusEngine struct {
appFrameStore map[string]*protobufs.AppShardFrame
appFrameStoreMu sync.RWMutex
lowCoverageStreak map[string]*coverageStreak
proverOnlyMode atomic.Bool
peerInfoDigestCache map[string]struct{}
peerInfoDigestCacheMu sync.Mutex
keyRegistryDigestCache map[string]struct{}
@ -595,6 +597,42 @@ func NewGlobalConsensusEngine(
if err != nil {
establishGenesis()
} else {
adds := engine.hypergraph.(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(
tries.ShardKey{
L1: [3]byte{},
L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)),
},
)
if lc, _ := adds.GetTree().GetMetadata(); lc == 0 {
if config.P2P.Network == 0 {
genesisData := engine.getMainnetGenesisJSON()
if genesisData == nil {
panic("no genesis data")
}
state := hgstate.NewHypergraphState(engine.hypergraph)
err = engine.establishMainnetGenesisProvers(state, genesisData)
if err != nil {
engine.logger.Error("failed to establish provers", zap.Error(err))
panic(err)
}
err = state.Commit()
if err != nil {
engine.logger.Error("failed to commit", zap.Error(err))
panic(err)
}
} else {
engine.establishTestnetGenesisProvers()
}
err := engine.proverRegistry.Refresh()
if err != nil {
panic(err)
}
}
if latest.LatestTimeout != nil {
logger.Info(
"obtained latest consensus state",

View File

@ -234,6 +234,15 @@ func (e *GlobalConsensusEngine) addGlobalMessage(data []byte) {
return
}
// In prover-only mode, filter out non-prover messages
if e.proverOnlyMode.Load() {
bundle.Requests = e.filterProverOnlyRequests(bundle.Requests)
if len(bundle.Requests) == 0 {
// All requests were filtered out
return
}
}
if len(bundle.Requests) > maxGlobalMessagesPerFrame {
if e.logger != nil {
e.logger.Debug(
@ -265,6 +274,49 @@ func (e *GlobalConsensusEngine) addGlobalMessage(data []byte) {
e.messageAggregator.Add(record)
}
// filterProverOnlyRequests filters a list of message requests to only include
// prover-related messages. This is used when in prover-only mode due to
// insufficient coverage.
func (e *GlobalConsensusEngine) filterProverOnlyRequests(
requests []*protobufs.MessageRequest,
) []*protobufs.MessageRequest {
filtered := make([]*protobufs.MessageRequest, 0, len(requests))
droppedCount := 0
for _, req := range requests {
if req == nil || req.GetRequest() == nil {
continue
}
// Only allow prover-related message types
switch req.GetRequest().(type) {
case *protobufs.MessageRequest_Join,
*protobufs.MessageRequest_Leave,
*protobufs.MessageRequest_Pause,
*protobufs.MessageRequest_Resume,
*protobufs.MessageRequest_Confirm,
*protobufs.MessageRequest_Reject,
*protobufs.MessageRequest_Kick,
*protobufs.MessageRequest_Update:
// Prover messages are allowed
filtered = append(filtered, req)
default:
// All other messages are dropped in prover-only mode
droppedCount++
}
}
if droppedCount > 0 && e.logger != nil {
e.logger.Debug(
"dropped non-prover messages in prover-only mode",
zap.Int("dropped_count", droppedCount),
zap.Int("allowed_count", len(filtered)),
)
}
return filtered
}
func (e *GlobalConsensusEngine) logBundleRequestTypes(
bundle *protobufs.MessageBundle,
) {

View File

@ -12,7 +12,6 @@ import (
"github.com/iden3/go-iden3-crypto/poseidon"
"github.com/pkg/errors"
"go.uber.org/zap"
hgcrdt "source.quilibrium.com/quilibrium/monorepo/hypergraph"
"source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/global"
hgstate "source.quilibrium.com/quilibrium/monorepo/node/execution/state/hypergraph"
"source.quilibrium.com/quilibrium/monorepo/types/consensus"
@ -414,19 +413,30 @@ func (r *ProverRegistry) PruneOrphanJoins(frameNumber uint64) error {
}
cutoff := frameNumber - 760
var pruned int
var prunedAllocations int
var prunedProvers int
set := r.hypergraph.(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(tries.ShardKey{
shardKey := tries.ShardKey{
L1: [3]byte{0x00, 0x00, 0x00},
L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)),
})
}
txn, err := set.GetTree().Store.NewTransaction(false)
txn, err := r.hypergraph.NewTransaction(false)
if err != nil {
return errors.Wrap(err, "prune orphan joins")
}
for _, info := range r.proverCache {
// Track provers to remove from cache after pruning
proversToRemove := []string{}
r.logger.Debug(
"starting prune orphan joins scan",
zap.Uint64("frame_number", frameNumber),
zap.Uint64("cutoff", cutoff),
zap.Int("prover_cache_size", len(r.proverCache)),
)
for addr, info := range r.proverCache {
if info == nil || len(info.Allocations) == 0 {
continue
}
@ -435,7 +445,20 @@ func (r *ProverRegistry) PruneOrphanJoins(frameNumber uint64) error {
var removedFilters map[string]struct{}
for _, allocation := range info.Allocations {
if allocation.Status == consensus.ProverStatusJoining &&
// Log each allocation being evaluated
r.logger.Debug(
"evaluating allocation for prune",
zap.String("prover_address", hex.EncodeToString(info.Address)),
zap.Int("status", int(allocation.Status)),
zap.Uint64("join_frame", allocation.JoinFrameNumber),
zap.Uint64("cutoff", cutoff),
zap.Bool("is_joining", allocation.Status == consensus.ProverStatusJoining),
zap.Bool("is_rejected", allocation.Status == consensus.ProverStatusRejected),
zap.Bool("is_old_enough", allocation.JoinFrameNumber < cutoff),
)
if (allocation.Status == consensus.ProverStatusJoining ||
allocation.Status == consensus.ProverStatusRejected) &&
allocation.JoinFrameNumber < cutoff {
if err := r.pruneAllocationVertex(txn, info, allocation); err != nil {
txn.Abort()
@ -446,7 +469,7 @@ func (r *ProverRegistry) PruneOrphanJoins(frameNumber uint64) error {
removedFilters = make(map[string]struct{})
}
removedFilters[string(allocation.ConfirmationFilter)] = struct{}{}
pruned++
prunedAllocations++
continue
}
@ -456,17 +479,33 @@ func (r *ProverRegistry) PruneOrphanJoins(frameNumber uint64) error {
if len(updated) != len(info.Allocations) {
info.Allocations = updated
r.cleanupFilterCache(info, removedFilters)
// If no allocations remain, prune the prover record as well
if len(updated) == 0 {
if err := r.pruneProverRecord(txn, shardKey, info); err != nil {
txn.Abort()
return errors.Wrap(err, "prune orphan joins")
}
proversToRemove = append(proversToRemove, addr)
prunedProvers++
}
}
}
if pruned > 0 {
// Remove pruned provers from cache
for _, addr := range proversToRemove {
delete(r.proverCache, addr)
}
if prunedAllocations > 0 || prunedProvers > 0 {
if err := txn.Commit(); err != nil {
return errors.Wrap(err, "prune orphan joins")
}
r.logger.Info(
"pruned orphan prover allocations",
zap.Int("allocations_pruned", pruned),
zap.Int("allocations_pruned", prunedAllocations),
zap.Int("provers_pruned", prunedProvers),
zap.Uint64("frame_cutoff", cutoff),
)
} else {
@ -510,33 +549,96 @@ func (r *ProverRegistry) pruneAllocationVertex(
allocationHash.FillBytes(make([]byte, 32)),
)
_, err = r.hypergraph.GetVertex(vertexID)
if err != nil {
if errors.Cause(err) == hypergraph.ErrRemoved {
return nil
}
shardKey := tries.ShardKey{
L1: [3]byte{0x00, 0x00, 0x00},
L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)),
}
// Use DeleteVertexAdd which properly handles locking and deletes both
// the tree entry and the vertex data atomically
if err := r.hypergraph.DeleteVertexAdd(txn, shardKey, vertexID); err != nil {
r.logger.Debug(
"allocation vertex missing during prune",
"could not delete allocation vertex during prune",
zap.String("address", hex.EncodeToString(info.Address)),
zap.String(
"filter",
hex.EncodeToString(allocation.ConfirmationFilter),
),
zap.String("vertex_id", hex.EncodeToString(vertexID[:])),
zap.Error(err),
)
return nil
// Don't return error - the vertex may already be deleted
} else {
r.logger.Debug(
"deleted allocation vertex during prune",
zap.String("address", hex.EncodeToString(info.Address)),
zap.String(
"filter",
hex.EncodeToString(allocation.ConfirmationFilter),
),
zap.String("vertex_id", hex.EncodeToString(vertexID[:])),
)
}
set := r.hypergraph.(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(tries.ShardKey{
L1: [3]byte{0x00, 0x00, 0x00},
L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)),
})
return nil
}
vtree := set.GetTree()
if err := vtree.Delete(txn, vertexID[:]); err != nil {
return errors.Wrap(err, "prune allocation remove vertex")
// pruneProverRecord removes a prover's vertex, hyperedge, and associated data
// when all of its allocations have been pruned.
func (r *ProverRegistry) pruneProverRecord(
txn tries.TreeBackingStoreTransaction,
shardKey tries.ShardKey,
info *consensus.ProverInfo,
) error {
if info == nil || len(info.Address) == 0 {
return errors.New("missing prover info")
}
// Construct the prover vertex ID
var proverVertexID [64]byte
copy(proverVertexID[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:])
copy(proverVertexID[32:], info.Address)
// Delete prover vertex using DeleteVertexAdd which properly handles locking
// and deletes both the tree entry and vertex data atomically
if err := r.hypergraph.DeleteVertexAdd(txn, shardKey, proverVertexID); err != nil {
r.logger.Debug(
"could not delete prover vertex during prune",
zap.String("address", hex.EncodeToString(info.Address)),
zap.String("vertex_id", hex.EncodeToString(proverVertexID[:])),
zap.Error(err),
)
// Don't return error - the vertex may already be deleted
} else {
r.logger.Debug(
"deleted prover vertex during prune",
zap.String("address", hex.EncodeToString(info.Address)),
zap.String("vertex_id", hex.EncodeToString(proverVertexID[:])),
)
}
// Delete prover hyperedge using DeleteHyperedgeAdd
if err := r.hypergraph.DeleteHyperedgeAdd(txn, shardKey, proverVertexID); err != nil {
r.logger.Debug(
"could not delete prover hyperedge during prune",
zap.String("address", hex.EncodeToString(info.Address)),
zap.String("hyperedge_id", hex.EncodeToString(proverVertexID[:])),
zap.Error(err),
)
// Don't return error - the hyperedge may already be deleted
} else {
r.logger.Debug(
"deleted prover hyperedge during prune",
zap.String("address", hex.EncodeToString(info.Address)),
zap.String("hyperedge_id", hex.EncodeToString(proverVertexID[:])),
)
}
r.logger.Debug(
"pruned prover record",
zap.String("address", hex.EncodeToString(info.Address)),
)
return nil
}

File diff suppressed because it is too large Load Diff

View File

@ -588,6 +588,349 @@ func TestTreeLongestBranchNoBLS(t *testing.T) {
}
}
// TestTreeNoStaleNodesAfterDelete tests that deleting nodes does not leave
// orphaned/stale tree nodes in the database. This specifically tests the case
// where branch merging occurs during deletion.
func TestTreeNoStaleNodesAfterDeleteNoBLS(t *testing.T) {
l, _ := zap.NewProduction()
db := store.NewPebbleDB(l, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, 0)
s := store.NewPebbleHypergraphStore(&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, db, l, verencr, nil)
shardKey := tries.ShardKey{}
tree := &tries.LazyVectorCommitmentTree{Store: s, SetType: "vertex", PhaseType: "adds", ShardKey: shardKey}
// Create keys that will force branch creation and then merging on deletion.
// We want keys that share a prefix so that:
// 1. Inserting them creates branch nodes
// 2. Deleting some of them causes branch merging (childCount == 1 case)
// Create 3 keys with same first 8 bytes, different after
commonPrefix := make([]byte, 8)
rand.Read(commonPrefix)
keys := make([][]byte, 3)
values := make([][]byte, 3)
for i := 0; i < 3; i++ {
key := make([]byte, 64)
copy(key[:8], commonPrefix)
// Vary byte 8 to create branching
key[8] = byte(i * 64) // 0x00, 0x40, 0x80
rand.Read(key[9:])
keys[i] = key
val := make([]byte, 32)
rand.Read(val)
values[i] = val
err := tree.Insert(nil, key, val, nil, big.NewInt(1))
if err != nil {
t.Fatalf("Failed to insert key %d: %v", i, err)
}
}
// Verify all 3 keys exist
for i, key := range keys {
_, err := tree.Get(key)
if err != nil {
t.Fatalf("Key %d not found after insert: %v", i, err)
}
}
// Count tree nodes before deletion
nodesBefore := countTreeNodes(t, s, shardKey)
t.Logf("Tree nodes before deletion: %d", nodesBefore)
// Delete one key - this should trigger branch merging
err := tree.Delete(nil, keys[1])
if err != nil {
t.Fatalf("Failed to delete key 1: %v", err)
}
// Verify key 1 is gone
_, err = tree.Get(keys[1])
if err == nil {
t.Fatal("Key 1 should not exist after deletion")
}
// Verify keys 0 and 2 still exist
for _, i := range []int{0, 2} {
_, err := tree.Get(keys[i])
if err != nil {
t.Fatalf("Key %d not found after deleting key 1: %v", i, err)
}
}
// Count tree nodes after deletion
nodesAfter := countTreeNodes(t, s, shardKey)
t.Logf("Tree nodes after deletion: %d", nodesAfter)
// Now verify that all stored nodes are actually reachable from the root.
// This is the critical check - any unreachable nodes are "stale".
reachableNodes := countReachableNodes(t, tree)
t.Logf("Reachable nodes from root: %d", reachableNodes)
if nodesAfter != reachableNodes {
t.Errorf("STALE NODES DETECTED: stored=%d, reachable=%d, stale=%d",
nodesAfter, reachableNodes, nodesAfter-reachableNodes)
}
// More aggressive test: delete all but one key
err = tree.Delete(nil, keys[2])
if err != nil {
t.Fatalf("Failed to delete key 2: %v", err)
}
nodesAfterSecondDelete := countTreeNodes(t, s, shardKey)
reachableAfterSecondDelete := countReachableNodes(t, tree)
t.Logf("After second delete: stored=%d, reachable=%d", nodesAfterSecondDelete, reachableAfterSecondDelete)
if nodesAfterSecondDelete != reachableAfterSecondDelete {
t.Errorf("STALE NODES DETECTED after second delete: stored=%d, reachable=%d, stale=%d",
nodesAfterSecondDelete, reachableAfterSecondDelete, nodesAfterSecondDelete-reachableAfterSecondDelete)
}
// Delete the last key - tree should be empty
err = tree.Delete(nil, keys[0])
if err != nil {
t.Fatalf("Failed to delete key 0: %v", err)
}
nodesAfterAllDeleted := countTreeNodes(t, s, shardKey)
t.Logf("After all deleted: stored=%d", nodesAfterAllDeleted)
// There should be no tree nodes left (except possibly the root marker)
if nodesAfterAllDeleted > 1 {
t.Errorf("STALE NODES DETECTED after all deleted: stored=%d (expected 0 or 1)",
nodesAfterAllDeleted)
}
}
// TestTreeNoStaleNodesAfterBranchMerge specifically tests the case where
// deleting a node causes a branch to merge with its only remaining child branch.
// This tests the FullPrefix update bug hypothesis.
func TestTreeNoStaleNodesAfterBranchMergeNoBLS(t *testing.T) {
l, _ := zap.NewProduction()
db := store.NewPebbleDB(l, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, 0)
s := store.NewPebbleHypergraphStore(&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, db, l, verencr, nil)
shardKey := tries.ShardKey{}
tree := &tries.LazyVectorCommitmentTree{Store: s, SetType: "vertex", PhaseType: "adds", ShardKey: shardKey}
// To trigger a branch-to-branch merge during deletion, we need:
// 1. A parent branch with exactly 2 children
// 2. One child is a leaf (to be deleted)
// 3. The other child is a branch node
//
// Structure we want to create:
// Root (branch)
// ├── Child A (leaf) - will be deleted
// └── Child B (branch)
// ├── Grandchild 1 (leaf)
// └── Grandchild 2 (leaf)
//
// After deleting Child A, Root should merge with Child B.
// Keys with controlled nibbles to create specific tree structure
// Nibble = 6 bits, so first nibble is bits 0-5 of first byte
// Key A: first nibble = 0 (byte[0] bits 7-2 = 000000)
keyA := make([]byte, 64)
keyA[0] = 0x00 // First nibble = 0
rand.Read(keyA[1:])
// Keys for branch B children: first nibble = 1 (byte[0] bits 7-2 = 000001)
// They share first nibble (1) but differ at second nibble
keyB1 := make([]byte, 64)
keyB1[0] = 0x04 // First nibble = 1 (binary: 000001 << 2 = 00000100)
keyB1[1] = 0x00 // Second nibble differs
rand.Read(keyB1[2:])
keyB2 := make([]byte, 64)
keyB2[0] = 0x04 // First nibble = 1
keyB2[1] = 0x40 // Second nibble differs (binary: 010000...)
rand.Read(keyB2[2:])
// Insert all keys
val := make([]byte, 32)
rand.Read(val)
if err := tree.Insert(nil, keyA, val, nil, big.NewInt(1)); err != nil {
t.Fatalf("Failed to insert keyA: %v", err)
}
rand.Read(val)
if err := tree.Insert(nil, keyB1, val, nil, big.NewInt(1)); err != nil {
t.Fatalf("Failed to insert keyB1: %v", err)
}
rand.Read(val)
if err := tree.Insert(nil, keyB2, val, nil, big.NewInt(1)); err != nil {
t.Fatalf("Failed to insert keyB2: %v", err)
}
// Verify structure
nodesBefore := countTreeNodes(t, s, shardKey)
reachableBefore := countReachableNodes(t, tree)
t.Logf("Before deletion: stored=%d, reachable=%d", nodesBefore, reachableBefore)
if nodesBefore != reachableBefore {
t.Errorf("STALE NODES before deletion: stored=%d, reachable=%d", nodesBefore, reachableBefore)
}
// Delete keyA - this should trigger the merge of root with child B
if err := tree.Delete(nil, keyA); err != nil {
t.Fatalf("Failed to delete keyA: %v", err)
}
// Verify keyA is gone
if _, err := tree.Get(keyA); err == nil {
t.Fatal("keyA should not exist after deletion")
}
// Verify B keys still exist
if _, err := tree.Get(keyB1); err != nil {
t.Fatalf("keyB1 not found after deletion: %v", err)
}
if _, err := tree.Get(keyB2); err != nil {
t.Fatalf("keyB2 not found after deletion: %v", err)
}
// Check for stale nodes
nodesAfter := countTreeNodes(t, s, shardKey)
reachableAfter := countReachableNodes(t, tree)
t.Logf("After deletion: stored=%d, reachable=%d", nodesAfter, reachableAfter)
if nodesAfter != reachableAfter {
t.Errorf("STALE NODES DETECTED after branch merge: stored=%d, reachable=%d, stale=%d",
nodesAfter, reachableAfter, nodesAfter-reachableAfter)
}
}
// TestTreeNoStaleNodesAfterMassDelete tests stale node detection with many keys
func TestTreeNoStaleNodesAfterMassDeleteNoBLS(t *testing.T) {
l, _ := zap.NewProduction()
db := store.NewPebbleDB(l, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, 0)
s := store.NewPebbleHypergraphStore(&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, db, l, verencr, nil)
shardKey := tries.ShardKey{}
tree := &tries.LazyVectorCommitmentTree{Store: s, SetType: "vertex", PhaseType: "adds", ShardKey: shardKey}
// Insert 1000 random keys
numKeys := 1000
keys := make([][]byte, numKeys)
for i := 0; i < numKeys; i++ {
key := make([]byte, 64)
rand.Read(key)
keys[i] = key
val := make([]byte, 32)
rand.Read(val)
err := tree.Insert(nil, key, val, nil, big.NewInt(1))
if err != nil {
t.Fatalf("Failed to insert key %d: %v", i, err)
}
}
nodesBefore := countTreeNodes(t, s, shardKey)
reachableBefore := countReachableNodes(t, tree)
t.Logf("Before deletion: stored=%d, reachable=%d", nodesBefore, reachableBefore)
if nodesBefore != reachableBefore {
t.Errorf("STALE NODES before deletion: stored=%d, reachable=%d", nodesBefore, reachableBefore)
}
// Delete half the keys in random order
mrand.Shuffle(numKeys, func(i, j int) {
keys[i], keys[j] = keys[j], keys[i]
})
for i := 0; i < numKeys/2; i++ {
err := tree.Delete(nil, keys[i])
if err != nil {
t.Fatalf("Failed to delete key %d: %v", i, err)
}
}
nodesAfter := countTreeNodes(t, s, shardKey)
reachableAfter := countReachableNodes(t, tree)
t.Logf("After deleting half: stored=%d, reachable=%d", nodesAfter, reachableAfter)
if nodesAfter != reachableAfter {
t.Errorf("STALE NODES DETECTED: stored=%d, reachable=%d, stale=%d",
nodesAfter, reachableAfter, nodesAfter-reachableAfter)
}
// Verify remaining keys still accessible
for i := numKeys / 2; i < numKeys; i++ {
_, err := tree.Get(keys[i])
if err != nil {
t.Errorf("Key %d not found after mass deletion: %v", i, err)
}
}
}
// countTreeNodes counts all tree nodes stored in the database for a given shard
func countTreeNodes(t *testing.T, s *store.PebbleHypergraphStore, shardKey tries.ShardKey) int {
count := 0
// Use the store's iterator to count nodes
iter, err := s.IterateRawLeaves("vertex", "adds", shardKey)
if err != nil {
t.Fatalf("Failed to create iterator: %v", err)
}
defer iter.Close()
// Count all entries (both branch and leaf nodes)
for valid := iter.First(); valid; valid = iter.Next() {
count++
}
return count
}
// countReachableNodes counts nodes reachable from the tree root by walking the tree
func countReachableNodes(t *testing.T, tree *tries.LazyVectorCommitmentTree) int {
if tree.Root == nil {
return 0
}
count := 0
var walk func(node tries.LazyVectorCommitmentNode)
walk = func(node tries.LazyVectorCommitmentNode) {
if node == nil {
return
}
count++
switch n := node.(type) {
case *tries.LazyVectorCommitmentBranchNode:
for i := 0; i < 64; i++ {
child := n.Children[i]
if child == nil {
// Try to load from store
var err error
child, err = tree.Store.GetNodeByPath(
tree.SetType,
tree.PhaseType,
tree.ShardKey,
append(n.FullPrefix, i),
)
if err != nil {
continue
}
}
if child != nil {
walk(child)
}
}
case *tries.LazyVectorCommitmentLeafNode:
// Leaf node, nothing more to walk
}
}
walk(tree.Root)
return count
}
// TestTreeBranchStructure tests that the tree structure is preserved after
// adding and removing leaves that cause branch creation due to shared prefixes.
func TestTreeBranchStructureNoBLS(t *testing.T) {

View File

@ -5,6 +5,7 @@ package crypto_test
import (
"bytes"
"fmt"
"math/big"
"testing"
@ -16,6 +17,265 @@ import (
qcrypto "source.quilibrium.com/quilibrium/monorepo/types/tries"
)
func TestName(t *testing.T) {
rdfDoc := `
@prefix qcl: <https://types.quilibrium.com/qcl/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix name: <https://types.quilibrium.com/schema-repository/name/> .
name:NameRecord a rdfs:Class ;
rdfs:comment "Quilibrium name service record" .
name:Name a rdfs:Property ;
rdfs:range name:NameRecord ;
rdfs:domain qcl:String ;
qcl:size 32 ;
qcl:order 1 .
name:AuthorityKey a rdfs:Property ;
rdfs:range name:NameRecord ;
rdfs:domain qcl:ByteArray ;
qcl:size 57 ;
qcl:order 2 .
name:Parent a rdfs:Property ;
rdfs:range name:NameRecord ;
rdfs:domain qcl:ByteArray ;
qcl:size 32 ;
qcl:order 3 .
name:CreatedAt a rdfs:Property ;
rdfs:range name:NameRecord ;
rdfs:domain qcl:Uint ;
qcl:size 64 ;
qcl:order 4 .
name:UpdatedAt a rdfs:Property ;
rdfs:range name:NameRecord ;
rdfs:domain qcl:Uint ;
qcl:size 64 ;
qcl:order 5 .
name:RecordType a rdfs:Property ;
rdfs:range name:NameRecord ;
rdfs:domain qcl:Uint ;
qcl:size 8 ;
qcl:order 6 .
name:Data a rdfs:Property ;
rdfs:range name:NameRecord ;
rdfs:domain qcl:String ;
qcl:size 64 ;
qcl:order 7 .
`
// Create components
parser := &schema.TurtleRDFParser{}
log := zap.NewNop()
inclusionProver := bls48581.NewKZGInclusionProver(log)
multiprover := schema.NewRDFMultiprover(parser, inclusionProver)
// Create a test tree
tree := &qcrypto.VectorCommitmentTree{}
// Insert test data at the correct indices
// The tree needs to have data at indices that will be used in the polynomial
// Insert enough data to ensure polynomial has values at indices 1, 2, 3
for i := 0; i < 63; i++ {
data := bytes.Repeat([]byte{byte(i + 1)}, 57)
err := tree.Insert([]byte{byte(i << 2)}, data, nil, big.NewInt(57))
require.NoError(t, err)
}
tree.Commit(inclusionProver, false)
t.Run("Prove", func(t *testing.T) {
fields := []string{"Name", "AuthorityKey"}
proof, err := multiprover.Prove(rdfDoc, fields, tree)
pb, _ := proof.ToBytes()
fmt.Printf("proof %x\n", pb)
require.NoError(t, err)
assert.NotNil(t, proof)
})
t.Run("ProveWithType", func(t *testing.T) {
fields := []string{"Name", "AuthorityKey"}
typeIndex := uint64(63)
proof, err := multiprover.ProveWithType(rdfDoc, fields, tree, &typeIndex)
require.NoError(t, err)
assert.NotNil(t, proof)
pb, _ := proof.ToBytes()
fmt.Printf("proof with type %x\n", pb)
// Test without type index
proof, err = multiprover.ProveWithType(rdfDoc, fields, tree, nil)
require.NoError(t, err)
assert.NotNil(t, proof)
pb, _ = proof.ToBytes()
fmt.Printf("proof with type but type is nil %x\n", pb)
})
t.Run("Get", func(t *testing.T) {
// Test getting name field (order 1, so key is 1<<2 = 4, data at index 1)
value, err := multiprover.Get(rdfDoc, "name:NameRecord", "Name", tree)
require.NoError(t, err)
assert.Equal(t, bytes.Repeat([]byte{2}, 57), value) // Index 1 has value 2
// Test getting one-time key field (order 2, so key is 2<<2 = 8, data at index 2)
value, err = multiprover.Get(rdfDoc, "name:NameRecord", "AuthorityKey", tree)
require.NoError(t, err)
assert.Equal(t, bytes.Repeat([]byte{3}, 57), value) // Index 2 has value 3
})
t.Run("GetFieldOrder", func(t *testing.T) {
order, maxOrder, err := multiprover.GetFieldOrder(rdfDoc, "name:NameRecord", "Name")
require.NoError(t, err)
assert.Equal(t, 1, order)
assert.Equal(t, 7, maxOrder)
order, maxOrder, err = multiprover.GetFieldOrder(rdfDoc, "name:NameRecord", "AuthorityKey")
require.NoError(t, err)
assert.Equal(t, 2, order)
assert.Equal(t, 7, maxOrder)
order, maxOrder, err = multiprover.GetFieldOrder(rdfDoc, "name:NameRecord", "Parent")
require.NoError(t, err)
assert.Equal(t, 3, order)
assert.Equal(t, 7, maxOrder)
})
t.Run("GetFieldKey", func(t *testing.T) {
key, err := multiprover.GetFieldKey(rdfDoc, "name:NameRecord", "Name")
require.NoError(t, err)
assert.Equal(t, []byte{1 << 2}, key)
key, err = multiprover.GetFieldKey(rdfDoc, "name:NameRecord", "AuthorityKey")
require.NoError(t, err)
assert.Equal(t, []byte{2 << 2}, key)
})
t.Run("Verify", func(t *testing.T) {
// Create proof without type index for simpler verification
fields := []string{"Name", "AuthorityKey", "Parent"}
proof, err := multiprover.ProveWithType(rdfDoc, fields, tree, nil)
require.NoError(t, err)
// Get actual data from tree for verification
data := make([][]byte, len(fields))
for i, field := range fields {
value, err := multiprover.Get(rdfDoc, "name:NameRecord", field, tree)
require.NoError(t, err)
data[i] = value
}
// Create commit
commit := tree.Commit(inclusionProver, false)
proofBytes, _ := proof.ToBytes()
// Verify should pass with correct data
// keys parameter is nil to use default index-based keys
valid, err := multiprover.Verify(rdfDoc, fields, nil, commit, proofBytes, data)
require.NoError(t, err)
assert.True(t, valid)
// Verify should fail with wrong data
wrongData := make([][]byte, len(fields))
for i := range wrongData {
wrongData[i] = []byte("wrong data")
}
valid, err = multiprover.Verify(rdfDoc, fields, nil, commit, proofBytes, wrongData)
require.NoError(t, err)
assert.False(t, valid)
// Verify should error with non-existent field
invalidFields := []string{"Name", "NonExistent"}
_, err = multiprover.Verify(rdfDoc, invalidFields, nil, commit, proofBytes, data[:2])
assert.Error(t, err)
assert.Contains(t, err.Error(), "not found")
})
t.Run("VerifyWithType", func(t *testing.T) {
// Add type marker to tree
typeData := bytes.Repeat([]byte{0xff}, 32)
typeIndex := uint64(63)
err := tree.Insert(typeData, typeData, nil, big.NewInt(32))
require.NoError(t, err)
// Get commit after all data is inserted
commit := tree.Commit(inclusionProver, false)
// Create proof with type
fields := []string{"Name", "AuthorityKey"}
proof, err := multiprover.ProveWithType(rdfDoc, fields, tree, &typeIndex)
require.NoError(t, err)
// Get actual data from tree
data := make([][]byte, len(fields))
for i, field := range fields {
value, err := multiprover.Get(rdfDoc, "name:NameRecord", field, tree)
require.NoError(t, err)
data[i] = value
}
proofBytes, _ := proof.ToBytes()
// Verify with type should pass
valid, err := multiprover.VerifyWithType(rdfDoc, fields, nil, commit, proofBytes, data, &typeIndex, typeData)
require.NoError(t, err)
assert.True(t, valid)
// Verify without type when proof was created with type should fail
valid, err = multiprover.VerifyWithType(rdfDoc, fields, nil, commit, proofBytes, data, nil, nil)
require.NoError(t, err)
assert.False(t, valid) // Should fail due to missing type data
// Create proof without type
proofNoType, err := multiprover.ProveWithType(rdfDoc, fields, tree, nil)
require.NoError(t, err)
proofNoTypeBytes, _ := proofNoType.ToBytes()
// Verify without type should pass
valid, err = multiprover.VerifyWithType(rdfDoc, fields, nil, commit, proofNoTypeBytes, data, nil, nil)
require.NoError(t, err)
assert.True(t, valid)
// Verify with wrong type data should fail
wrongTypeData := []byte("wrong type data")
valid, err = multiprover.VerifyWithType(rdfDoc, fields, nil, commit, proofBytes, data, &typeIndex, wrongTypeData)
require.NoError(t, err)
assert.False(t, valid)
// Verify with different type index but same data should still fail
// because the hash uses the fixed key bytes.Repeat([]byte{0xff}, 32)
differentTypeIndex := uint64(50)
valid, err = multiprover.VerifyWithType(rdfDoc, fields, nil, commit, proofBytes, data, &differentTypeIndex, typeData)
require.NoError(t, err)
assert.False(t, valid) // Should fail because proof was created with index 63
})
t.Run("ErrorCases", func(t *testing.T) {
// Test non-existent field
_, err := multiprover.Get(rdfDoc, "name:NameRecord", "NonExistent", tree)
assert.Error(t, err)
assert.Contains(t, err.Error(), "not found")
// Test invalid document
_, err = multiprover.Get("invalid rdf", "name:NameRecord", "name", tree)
assert.Error(t, err)
// Test verify with mismatched data count
fields := []string{"Name", "AuthorityKey"}
proof, err := multiprover.ProveWithType(rdfDoc, fields, tree, nil)
require.NoError(t, err)
// Wrong number of data elements
wrongData := [][]byte{[]byte("data1")}
commit := tree.Commit(inclusionProver, false)
_, err = multiprover.Verify(rdfDoc, fields, nil, commit, proof.GetProof(), wrongData)
assert.Error(t, err)
assert.Contains(t, err.Error(), "fields and data length mismatch")
})
}
func TestRDFMultiprover(t *testing.T) {
// Create test RDF document
rdfDoc := `

View File

@ -35,8 +35,28 @@ var (
)
configDirectory2 = flag.String(
"config2",
filepath.Join(".", ".config"),
"the configuration directory",
"",
"the second configuration directory (optional, enables comparison mode)",
)
keyPrefix = flag.String(
"prefix",
"",
"hex-encoded key prefix to filter results (e.g., '09' for hypergraph keys)",
)
searchKey = flag.String(
"search",
"",
"hex-encoded key substring to search for in keys",
)
searchValue = flag.String(
"search-value",
"",
"hex-encoded substring to search for in values",
)
maxResults = flag.Int(
"max",
0,
"maximum number of results to display (0 = unlimited)",
)
// *char flags
@ -50,6 +70,34 @@ func main() {
config.Flags(&char, &ver)
flag.Parse()
// Parse filter options
var prefixFilter []byte
if *keyPrefix != "" {
var err error
prefixFilter, err = hex.DecodeString(*keyPrefix)
if err != nil {
log.Fatalf("invalid prefix hex: %v", err)
}
}
var keySearchPattern []byte
if *searchKey != "" {
var err error
keySearchPattern, err = hex.DecodeString(*searchKey)
if err != nil {
log.Fatalf("invalid search hex: %v", err)
}
}
var valueSearchPattern []byte
if *searchValue != "" {
var err error
valueSearchPattern, err = hex.DecodeString(*searchValue)
if err != nil {
log.Fatalf("invalid search-value hex: %v", err)
}
}
nodeConfig1, err := config.LoadConfig(*configDirectory1, "", false)
if err != nil {
log.Fatal("failed to load config", err)
@ -64,9 +112,102 @@ func main() {
db1 := store.NewPebbleDB(logger, nodeConfig1.DB, uint(0))
defer db1.Close()
iter1, err := db1.NewIter([]byte{0x00}, []byte{0xff})
// Determine iteration bounds based on prefix filter
lowerBound := []byte{0x00}
upperBound := []byte{0xff}
if len(prefixFilter) > 0 {
lowerBound = prefixFilter
// Create upper bound by incrementing the last byte of the prefix
upperBound = make([]byte, len(prefixFilter))
copy(upperBound, prefixFilter)
for i := len(upperBound) - 1; i >= 0; i-- {
if upperBound[i] < 0xff {
upperBound[i]++
break
}
upperBound[i] = 0x00
if i == 0 {
// Prefix is all 0xff, scan to end
upperBound = []byte{0xff}
}
}
}
// Single database mode (read-only dump)
if *configDirectory2 == "" {
runSingleDBMode(db1, lowerBound, upperBound, prefixFilter, keySearchPattern, valueSearchPattern, logger)
return
}
// Comparison mode (two databases)
runCompareMode(db1, lowerBound, upperBound, prefixFilter, keySearchPattern, valueSearchPattern, logger)
}
func runSingleDBMode(
db1 *store.PebbleDB,
lowerBound, upperBound []byte,
prefixFilter, keySearchPattern, valueSearchPattern []byte,
logger *zap.Logger,
) {
iter1, err := db1.NewIter(lowerBound, upperBound)
if err != nil {
logger.Error("failed to create iterator", zap.Error(err))
return
}
defer iter1.Close()
count := 0
matched := 0
for iter1.First(); iter1.Valid(); iter1.Next() {
key := iter1.Key()
value := iter1.Value()
// Apply prefix filter
if len(prefixFilter) > 0 && !bytes.HasPrefix(key, prefixFilter) {
continue
}
// Apply key search pattern
if len(keySearchPattern) > 0 && !bytes.Contains(key, keySearchPattern) {
continue
}
// Apply value search pattern
if len(valueSearchPattern) > 0 && !bytes.Contains(value, valueSearchPattern) {
continue
}
count++
matched++
decoded := decodeValue(key, value)
fmt.Printf(
"key: %s\nsemantic: %s\nvalue:\n%s\n\n",
hex.EncodeToString(key),
describeKey(key),
indent(decoded),
)
if *maxResults > 0 && matched >= *maxResults {
fmt.Printf("... (stopped after %d results, use -max to change limit)\n", *maxResults)
break
}
}
fmt.Printf("\nsummary: %d keys displayed from %s\n", matched, *configDirectory1)
}
func runCompareMode(
db1 *store.PebbleDB,
lowerBound, upperBound []byte,
prefixFilter, keySearchPattern, valueSearchPattern []byte,
logger *zap.Logger,
) {
iter1, err := db1.NewIter(lowerBound, upperBound)
if err != nil {
logger.Error("failed to create iterator", zap.Error(err))
return
}
defer iter1.Close()
@ -78,9 +219,10 @@ func main() {
db2 := store.NewPebbleDB(logger, nodeConfig2.DB, uint(0))
defer db2.Close()
iter2, err := db2.NewIter([]byte{0x00}, []byte{0xff})
iter2, err := db2.NewIter(lowerBound, upperBound)
if err != nil {
logger.Error("failed to create iterator", zap.Error(err))
return
}
defer iter2.Close()
@ -90,8 +232,22 @@ func main() {
onlyDB1 := 0
onlyDB2 := 0
valueDiff := 0
matched := 0
keyPresenceMap := make(map[string]*keyPresence)
shouldInclude := func(key, value []byte) bool {
if len(prefixFilter) > 0 && !bytes.HasPrefix(key, prefixFilter) {
return false
}
if len(keySearchPattern) > 0 && !bytes.Contains(key, keySearchPattern) {
return false
}
if len(valueSearchPattern) > 0 && !bytes.Contains(value, valueSearchPattern) {
return false
}
return true
}
for iter1Valid || iter2Valid {
var key1, key2 []byte
var value1, value2 []byte
@ -114,34 +270,86 @@ func main() {
case iter1Valid && iter2Valid:
comparison := bytes.Compare(key1, key2)
if comparison == 0 {
if bytes.Equal(value1, value2) {
fmt.Printf(
"key: %s\nsemantic: %s\nvalues identical in %s and %s\nvalue:\n%s\n\n",
shortHex(key1),
describeKey(key1),
*configDirectory1,
*configDirectory2,
indent(decoded1),
)
} else {
valueDiff++
fmt.Printf(
"key: %s\nsemantic: %s\nvalue (%s):\n%s\nvalue (%s):\n%s\n",
shortHex(key1),
describeKey(key1),
*configDirectory1,
indent(decoded1),
*configDirectory2,
indent(decoded2),
)
if diff := diffStrings(decoded1, decoded2); diff != "" {
fmt.Printf("diff:\n%s\n", indent(diff))
if shouldInclude(key1, value1) || shouldInclude(key2, value2) {
matched++
if *maxResults > 0 && matched > *maxResults {
fmt.Printf("... (stopped after %d results)\n", *maxResults)
goto done
}
if bytes.Equal(value1, value2) {
fmt.Printf(
"key: %s\nsemantic: %s\nvalues identical in %s and %s\nvalue:\n%s\n\n",
shortHex(key1),
describeKey(key1),
*configDirectory1,
*configDirectory2,
indent(decoded1),
)
} else {
valueDiff++
fmt.Printf(
"key: %s\nsemantic: %s\nvalue (%s):\n%s\nvalue (%s):\n%s\n",
shortHex(key1),
describeKey(key1),
*configDirectory1,
indent(decoded1),
*configDirectory2,
indent(decoded2),
)
if diff := diffStrings(decoded1, decoded2); diff != "" {
fmt.Printf("diff:\n%s\n", indent(diff))
}
fmt.Printf("\n")
}
fmt.Printf("\n")
}
iter1Valid = iter1.Next()
iter2Valid = iter2.Next()
} else if comparison < 0 {
if shouldInclude(key1, value1) {
matched++
if *maxResults > 0 && matched > *maxResults {
fmt.Printf("... (stopped after %d results)\n", *maxResults)
goto done
}
onlyDB1++
fmt.Printf(
"key only in %s: %s\nsemantic: %s\nvalue:\n%s\n\n",
*configDirectory1,
shortHex(key1),
describeKey(key1),
indent(decoded1),
)
}
iter1Valid = iter1.Next()
} else {
if shouldInclude(key2, value2) {
matched++
if *maxResults > 0 && matched > *maxResults {
fmt.Printf("... (stopped after %d results)\n", *maxResults)
goto done
}
onlyDB2++
fmt.Printf(
"key only in %s: %s\nsemantic: %s\nvalue:\n%s\n\n",
*configDirectory2,
shortHex(key2),
describeKey(key2),
indent(decoded2),
)
}
iter2Valid = iter2.Next()
}
case iter1Valid:
if shouldInclude(key1, value1) {
matched++
if *maxResults > 0 && matched > *maxResults {
fmt.Printf("... (stopped after %d results)\n", *maxResults)
goto done
}
onlyDB1++
fmt.Printf(
"key only in %s: %s\nsemantic: %s\nvalue:\n%s\n\n",
@ -150,8 +358,16 @@ func main() {
describeKey(key1),
indent(decoded1),
)
iter1Valid = iter1.Next()
} else {
}
iter1Valid = iter1.Next()
case iter2Valid:
if shouldInclude(key2, value2) {
matched++
if *maxResults > 0 && matched > *maxResults {
fmt.Printf("... (stopped after %d results)\n", *maxResults)
goto done
}
onlyDB2++
fmt.Printf(
"key only in %s: %s\nsemantic: %s\nvalue:\n%s\n\n",
@ -160,31 +376,12 @@ func main() {
describeKey(key2),
indent(decoded2),
)
iter2Valid = iter2.Next()
}
case iter1Valid:
onlyDB1++
fmt.Printf(
"key only in %s: %s\nsemantic: %s\nvalue:\n%s\n\n",
*configDirectory1,
shortHex(key1),
describeKey(key1),
indent(decoded1),
)
iter1Valid = iter1.Next()
case iter2Valid:
onlyDB2++
fmt.Printf(
"key only in %s: %s\nsemantic: %s\nvalue:\n%s\n\n",
*configDirectory2,
shortHex(key2),
describeKey(key2),
indent(decoded2),
)
iter2Valid = iter2.Next()
}
}
done:
fmt.Printf(
"summary: %d keys only in %s, %d keys only in %s, %d keys with differing values\n",
onlyDB1,
@ -687,7 +884,7 @@ func decodeHypergraphValue(key []byte, value []byte) string {
switch sub {
case store.VERTEX_DATA:
return summarizeVectorCommitmentTree(value)
return summarizeVectorCommitmentTree(key, value)
case store.VERTEX_TOMBSTONE:
return shortHex(value)
case store.VERTEX_ADDS_TREE_NODE,
@ -765,8 +962,11 @@ func decodeHypergraphProto(value []byte) (string, bool) {
return output, matched
}
func summarizeVectorCommitmentTree(value []byte) string {
_, err := tries.DeserializeNonLazyTree(value)
// Global intrinsic address (32 bytes of 0xff)
var globalIntrinsicAddress = bytes.Repeat([]byte{0xff}, 32)
func summarizeVectorCommitmentTree(key []byte, value []byte) string {
tree, err := tries.DeserializeNonLazyTree(value)
if err != nil {
return fmt.Sprintf(
"vector_commitment_tree decode_error=%v raw=%s",
@ -780,6 +980,24 @@ func summarizeVectorCommitmentTree(value []byte) string {
"size_bytes": len(value),
"sha256": shortHex(sum[:]),
}
// Check if this is a global intrinsic vertex (domain = 0xff*32)
// Key structure for vertex data: {0x09, 0xF0, domain[32], address[32]}
if len(key) >= 66 {
domain := key[2:34]
address := key[34:66]
if bytes.Equal(domain, globalIntrinsicAddress) {
// This is a global intrinsic vertex - decode the fields
globalData := decodeGlobalIntrinsicVertex(tree, address)
if globalData != nil {
for k, v := range globalData {
summary[k] = v
}
}
}
}
jsonBytes, err := json.MarshalIndent(summary, "", " ")
if err != nil {
return fmt.Sprintf("vector_commitment_tree size_bytes=%d", len(value))
@ -788,6 +1006,188 @@ func summarizeVectorCommitmentTree(value []byte) string {
return string(jsonBytes)
}
// decodeGlobalIntrinsicVertex attempts to decode the vertex as a global intrinsic type
// (prover, allocation, or reward)
func decodeGlobalIntrinsicVertex(tree *tries.VectorCommitmentTree, address []byte) map[string]any {
result := make(map[string]any)
result["vertex_address"] = hex.EncodeToString(address)
// Try to detect the type by examining which fields exist
// Prover has PublicKey at order 0 (key 0x00) with size 585
// Allocation has Prover reference at order 0 (key 0x00)
// Reward has DelegateAddress at order 0 (key 0x00) with size 32
// Check order 0 field
order0Value, err := tree.Get([]byte{0x00})
if err != nil || len(order0Value) == 0 {
result["type"] = "unknown (no order 0 field)"
return result
}
switch len(order0Value) {
case 585:
// Prover: PublicKey is 585 bytes
result["type"] = "prover:Prover"
result["public_key"] = shortHex(order0Value)
decodeProverFields(tree, result)
case 32:
// Could be Allocation (Prover reference) or Reward (DelegateAddress)
// Check for allocation-specific fields
confirmFilter, _ := tree.Get([]byte{0x08}) // order 2
if len(confirmFilter) > 0 || len(confirmFilter) == 0 {
// Check if JoinFrameNumber exists (order 4, key 0x10)
joinFrame, _ := tree.Get([]byte{0x10})
if len(joinFrame) == 8 {
result["type"] = "allocation:ProverAllocation"
result["prover_reference"] = hex.EncodeToString(order0Value)
decodeAllocationFields(tree, result)
} else {
// Likely a reward vertex
result["type"] = "reward:ProverReward"
result["delegate_address"] = hex.EncodeToString(order0Value)
decodeRewardFields(tree, result)
}
}
default:
result["type"] = "unknown"
result["order_0_size"] = len(order0Value)
}
return result
}
func decodeProverFields(tree *tries.VectorCommitmentTree, result map[string]any) {
// Prover schema:
// order 0: PublicKey (585 bytes) - already decoded
// order 1: Status (1 byte) - key 0x04
// order 2: AvailableStorage (8 bytes) - key 0x08
// order 3: Seniority (8 bytes) - key 0x0c
// order 4: KickFrameNumber (8 bytes) - key 0x10
if status, err := tree.Get([]byte{0x04}); err == nil && len(status) == 1 {
result["status"] = decodeProverStatus(status[0])
result["status_raw"] = status[0]
}
if storage, err := tree.Get([]byte{0x08}); err == nil && len(storage) == 8 {
result["available_storage"] = binary.BigEndian.Uint64(storage)
}
if seniority, err := tree.Get([]byte{0x0c}); err == nil && len(seniority) == 8 {
result["seniority"] = binary.BigEndian.Uint64(seniority)
}
if kickFrame, err := tree.Get([]byte{0x10}); err == nil && len(kickFrame) == 8 {
result["kick_frame_number"] = binary.BigEndian.Uint64(kickFrame)
}
}
func decodeAllocationFields(tree *tries.VectorCommitmentTree, result map[string]any) {
// Allocation schema:
// order 0: Prover (32 bytes) - already decoded
// order 1: Status (1 byte) - key 0x04
// order 2: ConfirmationFilter (up to 64 bytes) - key 0x08
// order 3: RejectionFilter (up to 64 bytes) - key 0x0c
// order 4: JoinFrameNumber (8 bytes) - key 0x10
// order 5: LeaveFrameNumber (8 bytes) - key 0x14
// order 6: PauseFrameNumber (8 bytes) - key 0x18
// order 7: ResumeFrameNumber (8 bytes) - key 0x1c
// order 8: KickFrameNumber (8 bytes) - key 0x20
// order 9: JoinConfirmFrameNumber (8 bytes) - key 0x24
// order 10: JoinRejectFrameNumber (8 bytes) - key 0x28
// order 11: LeaveConfirmFrameNumber (8 bytes) - key 0x2c
// order 12: LeaveRejectFrameNumber (8 bytes) - key 0x30
// order 13: LastActiveFrameNumber (8 bytes) - key 0x34
if status, err := tree.Get([]byte{0x04}); err == nil && len(status) == 1 {
result["status"] = decodeAllocationStatus(status[0])
result["status_raw"] = status[0]
}
if confirmFilter, err := tree.Get([]byte{0x08}); err == nil && len(confirmFilter) > 0 {
result["confirmation_filter"] = hex.EncodeToString(confirmFilter)
if bytes.Equal(confirmFilter, make([]byte, len(confirmFilter))) {
result["is_global_prover"] = true
}
} else {
result["is_global_prover"] = true
}
if rejFilter, err := tree.Get([]byte{0x0c}); err == nil && len(rejFilter) > 0 {
result["rejection_filter"] = hex.EncodeToString(rejFilter)
}
if joinFrame, err := tree.Get([]byte{0x10}); err == nil && len(joinFrame) == 8 {
result["join_frame_number"] = binary.BigEndian.Uint64(joinFrame)
}
if leaveFrame, err := tree.Get([]byte{0x14}); err == nil && len(leaveFrame) == 8 {
result["leave_frame_number"] = binary.BigEndian.Uint64(leaveFrame)
}
if pauseFrame, err := tree.Get([]byte{0x18}); err == nil && len(pauseFrame) == 8 {
result["pause_frame_number"] = binary.BigEndian.Uint64(pauseFrame)
}
if resumeFrame, err := tree.Get([]byte{0x1c}); err == nil && len(resumeFrame) == 8 {
result["resume_frame_number"] = binary.BigEndian.Uint64(resumeFrame)
}
if kickFrame, err := tree.Get([]byte{0x20}); err == nil && len(kickFrame) == 8 {
result["kick_frame_number"] = binary.BigEndian.Uint64(kickFrame)
}
if joinConfirm, err := tree.Get([]byte{0x24}); err == nil && len(joinConfirm) == 8 {
result["join_confirm_frame_number"] = binary.BigEndian.Uint64(joinConfirm)
}
if joinReject, err := tree.Get([]byte{0x28}); err == nil && len(joinReject) == 8 {
result["join_reject_frame_number"] = binary.BigEndian.Uint64(joinReject)
}
if leaveConfirm, err := tree.Get([]byte{0x2c}); err == nil && len(leaveConfirm) == 8 {
result["leave_confirm_frame_number"] = binary.BigEndian.Uint64(leaveConfirm)
}
if leaveReject, err := tree.Get([]byte{0x30}); err == nil && len(leaveReject) == 8 {
result["leave_reject_frame_number"] = binary.BigEndian.Uint64(leaveReject)
}
if lastActive, err := tree.Get([]byte{0x34}); err == nil && len(lastActive) == 8 {
result["last_active_frame_number"] = binary.BigEndian.Uint64(lastActive)
}
}
func decodeRewardFields(tree *tries.VectorCommitmentTree, result map[string]any) {
// Reward schema - just has DelegateAddress at order 0
// Nothing else to decode for now
}
func decodeProverStatus(status byte) string {
// Prover status mapping (internal byte -> name)
switch status {
case 0:
return "Joining"
case 1:
return "Active"
case 2:
return "Paused"
case 3:
return "Leaving"
case 4:
return "Rejected"
case 5:
return "Kicked"
default:
return fmt.Sprintf("Unknown(%d)", status)
}
}
func decodeAllocationStatus(status byte) string {
// Allocation status mapping (same as prover status)
return decodeProverStatus(status)
}
func summarizeHypergraphTreeNode(value []byte) string {
if len(value) == 0 {
return "hypergraph_tree_node <empty>"

View File

@ -1,6 +1,7 @@
package store
import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
@ -16,6 +17,7 @@ import (
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/config"
"source.quilibrium.com/quilibrium/monorepo/types/store"
"source.quilibrium.com/quilibrium/monorepo/types/tries"
)
type PebbleDB struct {
@ -55,6 +57,11 @@ var pebbleMigrations = []func(*pebble.Batch) error{
migration_2_1_0_153,
migration_2_1_0_154,
migration_2_1_0_155,
migration_2_1_0_156,
migration_2_1_0_157,
migration_2_1_0_158,
migration_2_1_0_159,
migration_2_1_0_17,
}
func NewPebbleDB(
@ -342,8 +349,8 @@ func (p *PebbleDB) NewIter(lowerBound []byte, upperBound []byte) (
error,
) {
return p.db.NewIter(&pebble.IterOptions{
LowerBound: lowerBound, // buildutils:allow-slice-alias slice is static
UpperBound: upperBound, // buildutils:allow-slice-alias slice is static
LowerBound: lowerBound,
UpperBound: upperBound,
})
}
@ -415,8 +422,8 @@ func (t *PebbleTransaction) NewIter(lowerBound []byte, upperBound []byte) (
error,
) {
return t.b.NewIter(&pebble.IterOptions{
LowerBound: lowerBound, // buildutils:allow-slice-alias slice is static
UpperBound: upperBound, // buildutils:allow-slice-alias slice is static
LowerBound: lowerBound,
UpperBound: upperBound,
})
}
@ -437,7 +444,7 @@ func rightAlign(data []byte, size int) []byte {
l := len(data)
if l == size {
return data // buildutils:allow-slice-alias slice is static
return data
}
if l > size {
@ -651,6 +658,103 @@ func migration_2_1_0_155(b *pebble.Batch) error {
return migration_2_1_0_15(b)
}
func migration_2_1_0_156(b *pebble.Batch) error {
return migration_2_1_0_15(b)
}
func migration_2_1_0_157(b *pebble.Batch) error {
return migration_2_1_0_15(b)
}
func migration_2_1_0_158(b *pebble.Batch) error {
return migration_2_1_0_15(b)
}
func migration_2_1_0_159(b *pebble.Batch) error {
return migration_2_1_0_15(b)
}
func migration_2_1_0_17(b *pebble.Batch) error {
// Global shard key: L1={0,0,0}, L2=0xff*32
globalShardKey := tries.ShardKey{
L1: [3]byte{},
L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)),
}
// Next shard key (for exclusive upper bound): L1={0,0,1}, L2=0x00*32
nextShardKey := tries.ShardKey{
L1: [3]byte{0, 0, 1},
L2: [32]byte{},
}
// Delete vertex data for global domain
// Vertex data keys: {0x09, 0xF0, domain[32], address[32]}
// Start: {0x09, 0xF0, 0xff*32} (prefix for global domain)
// End: {0x09, 0xF1} (next prefix type, ensures we capture all addresses)
if err := b.DeleteRange(
hypergraphVertexDataKey(globalShardKey.L2[:]),
[]byte{HYPERGRAPH_SHARD, VERTEX_DATA + 1},
&pebble.WriteOptions{},
); err != nil {
return err
}
// Delete vertex adds tree nodes
if err := b.DeleteRange(
hypergraphVertexAddsTreeNodeKey(globalShardKey, []byte{}),
hypergraphVertexAddsTreeNodeKey(nextShardKey, []byte{}),
&pebble.WriteOptions{},
); err != nil {
return err
}
// Delete vertex adds tree nodes by path
if err := b.DeleteRange(
hypergraphVertexAddsTreeNodeByPathKey(globalShardKey, []int{}),
hypergraphVertexAddsTreeNodeByPathKey(nextShardKey, []int{}),
&pebble.WriteOptions{},
); err != nil {
return err
}
// Delete hyperedge adds tree nodes
if err := b.DeleteRange(
hypergraphHyperedgeAddsTreeNodeKey(globalShardKey, []byte{}),
hypergraphHyperedgeAddsTreeNodeKey(nextShardKey, []byte{}),
&pebble.WriteOptions{},
); err != nil {
return err
}
// Delete hyperedge adds tree nodes by path
if err := b.DeleteRange(
hypergraphHyperedgeAddsTreeNodeByPathKey(globalShardKey, []int{}),
hypergraphHyperedgeAddsTreeNodeByPathKey(nextShardKey, []int{}),
&pebble.WriteOptions{},
); err != nil {
return err
}
// Delete vertex adds tree root
if err := b.DeleteRange(
hypergraphVertexAddsTreeRootKey(globalShardKey),
hypergraphVertexAddsTreeRootKey(nextShardKey),
&pebble.WriteOptions{},
); err != nil {
return err
}
// Delete hyperedge adds tree root
if err := b.DeleteRange(
hypergraphHyperedgeAddsTreeRootKey(globalShardKey),
hypergraphHyperedgeAddsTreeRootKey(nextShardKey),
&pebble.WriteOptions{},
); err != nil {
return err
}
return nil
}
type pebbleSnapshotDB struct {
snap *pebble.Snapshot
}
@ -676,8 +780,8 @@ func (p *pebbleSnapshotDB) NewIter(lowerBound []byte, upperBound []byte) (
error,
) {
return p.snap.NewIter(&pebble.IterOptions{
LowerBound: lowerBound, // buildutils:allow-slice-alias slice is static
UpperBound: upperBound, // buildutils:allow-slice-alias slice is static
LowerBound: lowerBound,
UpperBound: upperBound,
})
}

View File

@ -1,10 +1,13 @@
package store
import (
"encoding/hex"
"fmt"
"os"
"path/filepath"
"testing"
"github.com/iden3/go-iden3-crypto/poseidon"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
@ -12,6 +15,13 @@ import (
"source.quilibrium.com/quilibrium/monorepo/config"
)
func TestPoseidon(t *testing.T) {
bi, err := poseidon.HashBytes([]byte("testvector"))
require.NoError(t, err)
fmt.Println(hex.EncodeToString(bi.FillBytes(make([]byte, 32))))
assert.FailNow(t, "")
}
func TestNewPebbleDB_ExistingDirectory(t *testing.T) {
testDir, err := os.MkdirTemp("", "pebble-test-existing-*")
require.NoError(t, err)

View File

@ -819,9 +819,6 @@ func (w *WorkerManager) getMultiaddrOfWorker(coreId uint) (
rpcMultiaddr = strings.Replace(rpcMultiaddr, "/0.0.0.0/", "/127.0.0.1/", 1)
rpcMultiaddr = strings.Replace(rpcMultiaddr, "/0:0:0:0:0:0:0:0/", "/::1/", 1)
rpcMultiaddr = strings.Replace(rpcMultiaddr, "/::/", "/::1/", 1)
// force TCP as stream is not supported over UDP/QUIC
rpcMultiaddr = strings.Replace(rpcMultiaddr, "/quic-v1", "", 1)
rpcMultiaddr = strings.Replace(rpcMultiaddr, "udp", "tcp", 1)
ma, err := multiaddr.StringCast(rpcMultiaddr)
return ma, errors.Wrap(err, "get multiaddr of worker")

View File

@ -194,6 +194,42 @@ type Hypergraph interface {
frameNumber uint64,
) error
// Hard delete operations - these bypass CRDT semantics for pruning
// DeleteVertexAdd performs a hard delete of a vertex from the VertexAdds
// set. Unlike RemoveVertex (which adds to VertexRemoves for CRDT semantics),
// this actually removes the entry from VertexAdds and deletes the associated
// vertex data. This is used for pruning stale/orphaned data.
DeleteVertexAdd(
txn tries.TreeBackingStoreTransaction,
shardKey tries.ShardKey,
vertexID [64]byte,
) error
// DeleteVertexRemove performs a hard delete of a vertex from the
// VertexRemoves set. This is used for pruning stale data.
DeleteVertexRemove(
txn tries.TreeBackingStoreTransaction,
shardKey tries.ShardKey,
vertexID [64]byte,
) error
// DeleteHyperedgeAdd performs a hard delete of a hyperedge from the
// HyperedgeAdds set. This is used for pruning stale/orphaned data.
DeleteHyperedgeAdd(
txn tries.TreeBackingStoreTransaction,
shardKey tries.ShardKey,
hyperedgeID [64]byte,
) error
// DeleteHyperedgeRemove performs a hard delete of a hyperedge from the
// HyperedgeRemoves set. This is used for pruning stale data.
DeleteHyperedgeRemove(
txn tries.TreeBackingStoreTransaction,
shardKey tries.ShardKey,
hyperedgeID [64]byte,
) error
// Hyperedge data operations
// GetHyperedgeExtrinsics retrieves the extrinsic tree of a hyperedge, which

View File

@ -555,6 +555,46 @@ func (h *MockHypergraph) ImportTree(
return args.Error(0)
}
// DeleteVertexAdd implements hypergraph.Hypergraph.
func (h *MockHypergraph) DeleteVertexAdd(
txn tries.TreeBackingStoreTransaction,
shardKey tries.ShardKey,
vertexID [64]byte,
) error {
args := h.Called(txn, shardKey, vertexID)
return args.Error(0)
}
// DeleteVertexRemove implements hypergraph.Hypergraph.
func (h *MockHypergraph) DeleteVertexRemove(
txn tries.TreeBackingStoreTransaction,
shardKey tries.ShardKey,
vertexID [64]byte,
) error {
args := h.Called(txn, shardKey, vertexID)
return args.Error(0)
}
// DeleteHyperedgeAdd implements hypergraph.Hypergraph.
func (h *MockHypergraph) DeleteHyperedgeAdd(
txn tries.TreeBackingStoreTransaction,
shardKey tries.ShardKey,
hyperedgeID [64]byte,
) error {
args := h.Called(txn, shardKey, hyperedgeID)
return args.Error(0)
}
// DeleteHyperedgeRemove implements hypergraph.Hypergraph.
func (h *MockHypergraph) DeleteHyperedgeRemove(
txn tries.TreeBackingStoreTransaction,
shardKey tries.ShardKey,
hyperedgeID [64]byte,
) error {
args := h.Called(txn, shardKey, hyperedgeID)
return args.Error(0)
}
// Ensure MockHypergraph implements Hypergraph
var _ hg.Hypergraph = (*MockHypergraph)(nil)

View File

@ -569,6 +569,10 @@ type TreeBackingStore interface {
shardKey ShardKey,
leaf *RawLeafData,
) error
DeleteVertexTree(
txn TreeBackingStoreTransaction,
id []byte,
) error
}
// LazyVectorCommitmentTree is a lazy-loaded (from a TreeBackingStore based