ceremonyclient/hypergraph/proofs.go
Cassandra Heart 12996487c3
v2.1.0.18 (#508)
* experiment: reject bad peer info messages

* v2.1.0.18 preview

* add tagged sync

* Add missing hypergraph changes

* small tweaks to sync

* allow local sync, use it for provers with workers

* missing file

* resolve build error

* resolve sync issue, remove raw sync

* resolve deletion promotion bug

* resolve sync abstraction leak from tree deletion changes

* rearrange prover sync

* remove pruning from sync

* restore removed sync flag

* fix: sync, event stream deadlock, heuristic scoring of better shards

* resolve hanging shutdown + pubsub proxy issue

* further bugfixes: sync (restore old leaf sync), pubsub shutdown, merge events

* fix: clean up rust ffi, background coverage events, and sync tweaks

* fix: linking issue for channel, connectivity test aggression, sync regression, join tests

* fix: disjoint sync, improper application of filter

* resolve sync/reel/validation deadlock

* adjust sync to handle no leaf edge cases, multi-path segment traversal

* use simpler sync

* faster, simpler sync with some debug extras

* migration to recalculate

* don't use batch

* square up the roots

* fix nil pointer

* fix: seniority calculation, sync race condition, migration

* make sync dumber

* fix: tree deletion issue

* fix: missing seniority merge request canonical serialization

* address issues from previous commit test

* stale workers should be cleared

* remove missing gap check

* rearrange collect, reduce sync logging noise

* fix: the disjoint leaf/branch sync case

* nuclear option on sync failures

* v2.1.0.18, finalized
2026-02-08 23:51:51 -06:00

659 lines
14 KiB
Go

package hypergraph
import (
"bytes"
"crypto/sha256"
"fmt"
"sort"
"strings"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"source.quilibrium.com/quilibrium/monorepo/types/hypergraph"
"source.quilibrium.com/quilibrium/monorepo/types/tries"
"source.quilibrium.com/quilibrium/monorepo/utils/p2p"
// up2p "source.quilibrium.com/quilibrium/monorepo/utils/p2p"
)
// Commit calculates the hierarchical vector commitments of each set and returns
// the roots of all sets.
func (hg *HypergraphCRDT) Commit(
frameNumber uint64,
) (map[tries.ShardKey][][]byte, error) {
hg.mu.Lock()
defer hg.mu.Unlock()
timer := prometheus.NewTimer(CommitDuration)
defer timer.ObserveDuration()
commits, err := hg.store.GetRootCommits(frameNumber)
if err != nil {
return nil, errors.Wrap(err, "commit")
}
ensureSet := func(shardKey tries.ShardKey) {
if _, ok := commits[shardKey]; !ok {
commits[shardKey] = make([][]byte, 4)
commits[shardKey][0] = make([]byte, 64)
commits[shardKey][1] = make([]byte, 64)
commits[shardKey][2] = make([]byte, 64)
commits[shardKey][3] = make([]byte, 64)
}
}
txn, err := hg.store.NewTransaction(false)
if err != nil {
return nil, errors.Wrap(err, "commit shard")
}
touched := map[tries.ShardKey][]bool{}
for shardKey, vertexAdds := range hg.vertexAdds {
if r, ok := commits[shardKey]; ok && len(r[0]) != 64 {
continue
}
root := vertexAdds.GetTree().Commit(txn, false)
ensureSet(shardKey)
commits[shardKey][0] = root
err = hg.store.SetShardCommit(
txn,
frameNumber,
"adds",
"vertex",
shardKey.L2[:],
root,
)
if err != nil {
txn.Abort()
return nil, errors.Wrap(err, "commit shard")
}
touched[shardKey] = make([]bool, 4)
touched[shardKey][0] = true
}
for shardKey, vertexRemoves := range hg.vertexRemoves {
if r, ok := commits[shardKey]; ok && len(r[1]) != 64 {
continue
}
root := vertexRemoves.GetTree().Commit(txn, false)
ensureSet(shardKey)
commits[shardKey][1] = root
err = hg.store.SetShardCommit(
txn,
frameNumber,
"removes",
"vertex",
shardKey.L2[:],
root,
)
if err != nil {
txn.Abort()
return nil, errors.Wrap(err, "commit shard")
}
if _, ok := touched[shardKey]; !ok {
touched[shardKey] = make([]bool, 4)
}
touched[shardKey][1] = true
}
for shardKey, hyperedgeAdds := range hg.hyperedgeAdds {
if r, ok := commits[shardKey]; ok && len(r[2]) != 64 {
continue
}
root := hyperedgeAdds.GetTree().Commit(txn, false)
ensureSet(shardKey)
commits[shardKey][2] = root
err = hg.store.SetShardCommit(
txn,
frameNumber,
"adds",
"hyperedge",
shardKey.L2[:],
root,
)
if err != nil {
txn.Abort()
return nil, errors.Wrap(err, "commit shard")
}
if _, ok := touched[shardKey]; !ok {
touched[shardKey] = make([]bool, 4)
}
touched[shardKey][2] = true
}
for shardKey, hyperedgeRemoves := range hg.hyperedgeRemoves {
if r, ok := commits[shardKey]; ok && len(r[3]) != 64 {
continue
}
root := hyperedgeRemoves.GetTree().Commit(txn, false)
ensureSet(shardKey)
commits[shardKey][3] = root
err = hg.store.SetShardCommit(
txn,
frameNumber,
"removes",
"hyperedge",
shardKey.L2[:],
root,
)
if err != nil {
txn.Abort()
return nil, errors.Wrap(err, "commit shard")
}
if _, ok := touched[shardKey]; !ok {
touched[shardKey] = make([]bool, 4)
}
touched[shardKey][3] = true
}
for shardKey, touchSet := range touched {
if !touchSet[0] {
err = hg.store.SetShardCommit(
txn,
frameNumber,
"adds",
"vertex",
shardKey.L2[:],
make([]byte, 64),
)
if err != nil {
txn.Abort()
return nil, errors.Wrap(err, "commit shard")
}
}
if !touchSet[1] {
err = hg.store.SetShardCommit(
txn,
frameNumber,
"removes",
"vertex",
shardKey.L2[:],
make([]byte, 64),
)
if err != nil {
txn.Abort()
return nil, errors.Wrap(err, "commit shard")
}
}
if !touchSet[2] {
err = hg.store.SetShardCommit(
txn,
frameNumber,
"adds",
"hyperedge",
shardKey.L2[:],
make([]byte, 64),
)
if err != nil {
txn.Abort()
return nil, errors.Wrap(err, "commit shard")
}
}
if !touchSet[3] {
err = hg.store.SetShardCommit(
txn,
frameNumber,
"removes",
"hyperedge",
shardKey.L2[:],
make([]byte, 64),
)
if err != nil {
txn.Abort()
return nil, errors.Wrap(err, "commit shard")
}
}
}
if err := txn.Commit(); err != nil {
txn.Abort()
return nil, errors.Wrap(err, "commit shard")
}
snapshotRoot := snapshotRootDigest(commits)
// Update metrics
CommitTotal.WithLabelValues("success").Inc()
hg.publishSnapshot(snapshotRoot)
// Update shard count gauges
VertexAddsShards.Set(float64(len(hg.vertexAdds)))
VertexRemovesShards.Set(float64(len(hg.vertexRemoves)))
HyperedgeAddsShards.Set(float64(len(hg.hyperedgeAdds)))
HyperedgeRemovesShards.Set(float64(len(hg.hyperedgeRemoves)))
// Update size gauge
if hg.size != nil {
size, _ := hg.size.Float64()
SizeTotal.Set(size)
}
return commits, nil
}
func snapshotRootDigest(commits map[tries.ShardKey][][]byte) []byte {
hasher := sha256.New()
var zero [64]byte
if len(commits) == 0 {
return hasher.Sum(nil)
}
keys := make([]tries.ShardKey, 0, len(commits))
for k := range commits {
keys = append(keys, k)
}
sort.Slice(keys, func(i, j int) bool {
if cmp := bytes.Compare(keys[i].L1[:], keys[j].L1[:]); cmp != 0 {
return cmp < 0
}
return bytes.Compare(keys[i].L2[:], keys[j].L2[:]) < 0
})
for _, key := range keys {
hasher.Write(key.L1[:])
hasher.Write(key.L2[:])
roots := commits[key]
for phase := 0; phase < 4; phase++ {
var root []byte
if phase < len(roots) {
root = roots[phase]
}
if len(root) != len(zero) {
hasher.Write(zero[:])
} else {
hasher.Write(root)
}
}
}
return hasher.Sum(nil)
}
// Commit calculates the sub-scoped vector commitments of each phase set and
// returns the roots of each.
func (hg *HypergraphCRDT) CommitShard(
frameNumber uint64,
shardAddress []byte,
) ([][]byte, error) {
hg.mu.Lock()
defer hg.mu.Unlock()
if len(shardAddress) < 32 {
return nil, errors.Wrap(errors.New("invalid shard address"), "commit shard")
}
l1 := p2p.GetBloomFilterIndices(shardAddress[:32], 256, 3)
shardKey := tries.ShardKey{
L1: [3]byte(l1),
L2: [32]byte(shardAddress[:32]),
}
vertexAddSet, vertexRemoveSet := hg.getOrCreateIdSet(
shardKey,
hg.vertexAdds,
hg.vertexRemoves,
"vertex",
hg.getCoveredPrefix(),
)
vertexAddTree := vertexAddSet.GetTree()
vertexAddTree.Commit(nil, false)
vertexRemoveTree := vertexRemoveSet.GetTree()
vertexRemoveTree.Commit(nil, false)
path := tries.GetFullPath(shardAddress[:32])
for _, p := range shardAddress[32:] {
path = append(path, int(p))
}
vertexAddNode, err := vertexAddTree.GetByPath(path)
if err != nil && !strings.Contains(err.Error(), "not found") {
return nil, errors.Wrap(err, "commit shard")
}
vertexRemoveNode, err := vertexRemoveTree.GetByPath(path)
if err != nil && !strings.Contains(err.Error(), "not found") {
return nil, errors.Wrap(err, "commit shard")
}
hyperedgeAddSet, hyperedgeRemoveSet := hg.getOrCreateIdSet(
shardKey,
hg.hyperedgeAdds,
hg.hyperedgeRemoves,
"hyperedge",
hg.getCoveredPrefix(),
)
hyperedgeAddTree := hyperedgeAddSet.GetTree()
hyperedgeAddTree.Commit(nil, false)
hyperedgeRemoveTree := hyperedgeRemoveSet.GetTree()
hyperedgeRemoveTree.Commit(nil, false)
hyperedgeAddNode, err := vertexAddTree.GetByPath(path)
if err != nil && !strings.Contains(err.Error(), "not found") {
return nil, errors.Wrap(err, "commit shard")
}
hyperedgeRemoveNode, err := vertexRemoveTree.GetByPath(path)
if err != nil && !strings.Contains(err.Error(), "not found") {
return nil, errors.Wrap(err, "commit shard")
}
txn, err := hg.store.NewTransaction(false)
if err != nil {
return nil, errors.Wrap(err, "commit shard")
}
vertexAddCommit := make([]byte, 64)
if vertexAddNode != nil {
switch n := vertexAddNode.(type) {
case *tries.LazyVectorCommitmentBranchNode:
vertexAddCommit = n.Commitment
case *tries.LazyVectorCommitmentLeafNode:
vertexAddCommit = n.Commitment
}
}
vertexRemoveCommit := make([]byte, 64)
if vertexRemoveNode != nil {
switch n := vertexRemoveNode.(type) {
case *tries.LazyVectorCommitmentBranchNode:
vertexRemoveCommit = n.Commitment
case *tries.LazyVectorCommitmentLeafNode:
vertexRemoveCommit = n.Commitment
}
}
hyperedgeAddCommit := make([]byte, 64)
if hyperedgeAddNode != nil {
switch n := hyperedgeAddNode.(type) {
case *tries.LazyVectorCommitmentBranchNode:
hyperedgeAddCommit = n.Commitment
case *tries.LazyVectorCommitmentLeafNode:
hyperedgeAddCommit = n.Commitment
}
}
hyperedgeRemoveCommit := make([]byte, 64)
if hyperedgeRemoveNode != nil {
switch n := hyperedgeRemoveNode.(type) {
case *tries.LazyVectorCommitmentBranchNode:
hyperedgeRemoveCommit = n.Commitment
case *tries.LazyVectorCommitmentLeafNode:
hyperedgeRemoveCommit = n.Commitment
}
}
err = hg.store.SetShardCommit(
txn,
frameNumber,
"adds",
"vertex",
shardAddress,
vertexAddCommit,
)
if err != nil {
txn.Abort()
return nil, errors.Wrap(err, "commit shard")
}
err = hg.store.SetShardCommit(
txn,
frameNumber,
"removes",
"vertex",
shardAddress,
vertexRemoveCommit,
)
if err != nil {
txn.Abort()
return nil, errors.Wrap(err, "commit shard")
}
err = hg.store.SetShardCommit(
txn,
frameNumber,
"adds",
"hyperedge",
shardAddress,
hyperedgeAddCommit,
)
if err != nil {
txn.Abort()
return nil, errors.Wrap(err, "commit shard")
}
err = hg.store.SetShardCommit(
txn,
frameNumber,
"removes",
"hyperedge",
shardAddress,
hyperedgeRemoveCommit,
)
if err != nil {
txn.Abort()
return nil, errors.Wrap(err, "commit shard")
}
if err := txn.Commit(); err != nil {
txn.Abort()
return nil, errors.Wrap(err, "commit shard")
}
return [][]byte{
vertexAddCommit,
vertexRemoveCommit,
hyperedgeAddCommit,
hyperedgeRemoveCommit,
}, nil
}
// GetShardCommits retries the sub-scoped vector commitments of each phase set
// and returns the commitments of each at the tree level of the shard address.
// If this does not already exist, returns an error.
func (hg *HypergraphCRDT) GetShardCommits(
frameNumber uint64,
shardAddress []byte,
) ([][]byte, error) {
hg.mu.RLock()
defer hg.mu.RUnlock()
vertexAddsCommit, err := hg.store.GetShardCommit(
frameNumber,
"adds",
"vertex",
shardAddress,
)
if err != nil {
return nil, errors.Wrap(
errors.Wrap(
err,
fmt.Sprintf("shard address: (va) %x", shardAddress),
),
"get shard commits",
)
}
vertexRemovesCommit, err := hg.store.GetShardCommit(
frameNumber,
"removes",
"vertex",
shardAddress,
)
if err != nil {
return nil, errors.Wrap(
errors.Wrap(
err,
fmt.Sprintf("shard address: (vr) %x", shardAddress),
),
"get shard commits",
)
}
hyperedgeAddsCommit, err := hg.store.GetShardCommit(
frameNumber,
"adds",
"hyperedge",
shardAddress,
)
if err != nil {
return nil, errors.Wrap(
errors.Wrap(
err,
fmt.Sprintf("shard address: (ha) %x", shardAddress),
),
"get shard commits",
)
}
hyperedgeRemovesCommit, err := hg.store.GetShardCommit(
frameNumber,
"removes",
"hyperedge",
shardAddress,
)
if err != nil {
return nil, errors.Wrap(
errors.Wrap(
err,
fmt.Sprintf("shard address: (he) %x", shardAddress),
),
"get shard commits",
)
}
return [][]byte{
vertexAddsCommit,
vertexRemovesCommit,
hyperedgeAddsCommit,
hyperedgeRemovesCommit,
}, nil
}
// CreateTraversalProofs generates proofs for multiple keys in a shard. The
// domain determines the shard, and proofs are created for the specified atom
// type and phase type (adds or removes).
func (hg *HypergraphCRDT) CreateTraversalProof(
domain [32]byte,
atomType hypergraph.AtomType,
phaseType hypergraph.PhaseType,
keys [][]byte,
) (*tries.TraversalProof, error) {
hg.mu.Lock()
defer hg.mu.Unlock()
timer := prometheus.NewTimer(TraversalProofDuration.WithLabelValues("create"))
defer timer.ObserveDuration()
TraversalProofKeysPerRequest.Observe(float64(len(keys)))
shardKey := tries.ShardKey{
L1: [3]byte(p2p.GetBloomFilterIndices(domain[:], 256, 3)),
L2: domain,
}
var addSet hypergraph.IdSet
var removeSet hypergraph.IdSet
if atomType == hypergraph.VertexAtomType {
addSet, removeSet = hg.getOrCreateIdSet(
shardKey,
hg.vertexAdds,
hg.vertexRemoves,
atomType,
hg.getCoveredPrefix(),
)
} else {
addSet, removeSet = hg.getOrCreateIdSet(
shardKey,
hg.hyperedgeAdds,
hg.hyperedgeRemoves,
atomType,
hg.getCoveredPrefix(),
)
}
var proof *tries.TraversalProof
if phaseType == hypergraph.AddsPhaseType {
proof = addSet.GetTree().ProveMultiple(
hg.prover,
keys,
)
} else {
proof = removeSet.GetTree().ProveMultiple(
hg.prover,
keys,
)
}
TraversalProofCreateTotal.WithLabelValues(
string(atomType),
string(phaseType),
).Inc()
return proof, nil
}
// VerifyTraversalProofs verifies a set of traversal proofs for a shard. Returns
// true if all proofs are valid, false otherwise.
func (hg *HypergraphCRDT) VerifyTraversalProof(
domain [32]byte,
atomType hypergraph.AtomType,
phaseType hypergraph.PhaseType,
root []byte,
traversalProof *tries.TraversalProof,
) (bool, error) {
hg.mu.Lock()
defer hg.mu.Unlock()
timer := prometheus.NewTimer(TraversalProofDuration.WithLabelValues("verify"))
defer timer.ObserveDuration()
shardKey := tries.ShardKey{
L1: [3]byte(p2p.GetBloomFilterIndices(domain[:], 256, 3)),
L2: domain,
}
var addSet hypergraph.IdSet
var removeSet hypergraph.IdSet
if atomType == hypergraph.VertexAtomType {
addSet, removeSet = hg.getOrCreateIdSet(
shardKey,
hg.vertexAdds,
hg.vertexRemoves,
atomType,
hg.getCoveredPrefix(),
)
} else {
addSet, removeSet = hg.getOrCreateIdSet(
shardKey,
hg.hyperedgeAdds,
hg.hyperedgeRemoves,
atomType,
hg.getCoveredPrefix(),
)
}
var valid bool
var err error
if phaseType == hypergraph.AddsPhaseType {
valid, err = addSet.GetTree().Verify(root, traversalProof)
} else {
valid, err = removeSet.GetTree().Verify(root, traversalProof)
}
TraversalProofVerifyTotal.WithLabelValues(
string(atomType),
string(phaseType),
boolToString(valid),
).Inc()
return valid, err
}