ceremonyclient/hypergraph/vertex.go
Cassandra Heart 53f7c2b5c9
v2.1.0.2 (#442)
* v2.1.0.2

* restore tweaks to simlibp2p

* fix: nil ref on size calc

* fix: panic should induce shutdown from event_distributor

* fix: friendlier initialization that requires less manual kickstarting for test/devnets

* fix: fewer available shards than provers should choose shard length

* fix: update stored worker registry, improve logging for debug mode

* fix: shut the fuck up, peer log

* qol: log value should be snake cased

* fix:non-archive snap sync issues

* fix: separate X448/Decaf448 signed keys, add onion key to registry

* fix: overflow arithmetic on frame number comparison

* fix: worker registration should be idempotent if inputs are same, otherwise permit updated records

* fix: remove global prover state from size calculation

* fix: divide by zero case

* fix: eager prover

* fix: broadcast listener default

* qol: diagnostic data for peer authenticator

* fix: master/worker connectivity issue in sparse networks

tight coupling of peer and workers can sometimes interfere if mesh is sparse, so give workers a pseudoidentity but publish messages with the proper peer key

* fix: reorder steps of join creation

* fix: join verify frame source + ensure domain is properly padded (unnecessary but good for consistency)

* fix: add delegate to protobuf <-> reified join conversion

* fix: preempt prover from planning with no workers

* fix: use the unallocated workers to generate a proof

* qol: underflow causes join fail in first ten frames on test/devnets

* qol: small logging tweaks for easier log correlation in debug mode

* qol: use fisher-yates shuffle to ensure prover allocations are evenly distributed when scores are equal

* qol: separate decisional logic on post-enrollment confirmation into consensus engine, proposer, and worker manager where relevant, refactor out scoring

* reuse shard descriptors for both join planning and confirm/reject decisions

* fix: add missing interface method and amend test blossomsub to use new peer id basis

* fix: only check allocations if they exist

* fix: pomw mint proof data needs to be hierarchically under global intrinsic domain

* staging temporary state under diagnostics

* fix: first phase of distributed lock refactoring

* fix: compute intrinsic locking

* fix: hypergraph intrinsic locking

* fix: token intrinsic locking

* fix: update execution engines to support new locking model

* fix: adjust tests with new execution shape

* fix: weave in lock/unlock semantics to liveness provider

* fix lock fallthrough, add missing allocation update

* qol: additional logging for diagnostics, also testnet/devnet handling for confirmations

* fix: establish grace period on halt scenario to permit recovery

* fix: support test/devnet defaults for coverage scenarios

* fix: nil ref on consensus halts for non-archive nodes

* fix: remove unnecessary prefix from prover ref

* add test coverage for fork choice behaviors and replay – once passing, blocker (2) is resolved

* fix: no fork replay on repeat for non-archive nodes, snap now behaves correctly

* rollup of pre-liveness check lock interactions

* ahead of tests, get the protobuf/metrics-related changes out so teams can prepare

* add test coverage for distributed lock behaviors – once passing, blocker (3) is resolved

* fix: blocker (3)

* Dev docs improvements (#445)

* Make install deps script more robust

* Improve testing instructions

* Worker node should stop upon OS SIGINT/SIGTERM signal (#447)

* move pebble close to Stop()

* move deferred Stop() to Start()

* add core id to worker stop log message

* create done os signal channel and stop worker upon message to it

---------

Co-authored-by: Cassandra Heart <7929478+CassOnMars@users.noreply.github.com>

---------

Co-authored-by: Daz <daz_the_corgi@proton.me>
Co-authored-by: Black Swan <3999712+blacks1ne@users.noreply.github.com>
2025-10-23 01:03:06 -05:00

312 lines
7.9 KiB
Go

package hypergraph
import (
"math/big"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"source.quilibrium.com/quilibrium/monorepo/types/crypto"
"source.quilibrium.com/quilibrium/monorepo/types/hypergraph"
"source.quilibrium.com/quilibrium/monorepo/types/tries"
"source.quilibrium.com/quilibrium/monorepo/utils/p2p"
)
type vertex struct {
appAddress [32]byte
dataAddress [32]byte
commitment []byte
size *big.Int
}
var _ hypergraph.Vertex = (*vertex)(nil)
// NewVertex creates a new vertex with the specified parameters.
func NewVertex(
appAddress [32]byte,
dataAddress [32]byte,
commitment []byte,
size *big.Int,
) hypergraph.Vertex {
return &vertex{
appAddress,
dataAddress,
commitment,
size,
}
}
func (v *vertex) GetID() [64]byte {
id := [64]byte{}
copy(id[:32], v.appAddress[:])
copy(id[32:64], v.dataAddress[:])
return id
}
func (v *vertex) GetSize() *big.Int {
return v.size
}
func (v *vertex) GetAtomType() hypergraph.AtomType {
return hypergraph.VertexAtomType
}
func (v *vertex) GetAppAddress() [32]byte {
return v.appAddress
}
func (v *vertex) GetDataAddress() [32]byte {
return v.dataAddress
}
func (v *vertex) ToBytes() []byte {
return append(
append(
append(
append(
[]byte{0x00},
v.appAddress[:]...,
),
v.dataAddress[:]...,
),
v.commitment[:]...,
),
v.size.FillBytes(make([]byte, 32))...,
)
}
func (v *vertex) Commit(prover crypto.InclusionProver) []byte {
return v.commitment
}
// GetVertex retrieves a vertex by its ID. Returns ErrRemoved if the vertex has
// been removed, or an error if not found.
func (hg *HypergraphCRDT) GetVertex(id [64]byte) (hypergraph.Vertex, error) {
hg.mu.RLock()
defer hg.mu.RUnlock()
timer := prometheus.NewTimer(GetDuration.WithLabelValues("vertex"))
defer timer.ObserveDuration()
shardKey := tries.ShardKey{
L1: [3]byte(p2p.GetBloomFilterIndices(id[:32], 256, 3)),
L2: [32]byte(append([]byte{}, id[:32]...)),
}
addSet, removeSet := hg.getOrCreateIdSet(
shardKey,
hg.vertexAdds,
hg.vertexRemoves,
hypergraph.VertexAtomType,
hg.getCoveredPrefix(),
)
if removeSet.Has(id) {
GetVertexTotal.WithLabelValues("removed").Inc()
ErrorsTotal.WithLabelValues("get_vertex", "removed").Inc()
return nil, errors.Wrap(hypergraph.ErrRemoved, "get vertex")
}
value, err := addSet.GetTree().Get(id[:])
if err != nil {
GetVertexTotal.WithLabelValues("error").Inc()
ErrorsTotal.WithLabelValues("get_vertex", "not_found").Inc()
return nil, errors.Wrap(err, "get vertex")
}
atom := AtomFromBytes(value)
if atom == nil {
GetVertexTotal.WithLabelValues("error").Inc()
ErrorsTotal.WithLabelValues("get_vertex", "invalid_atom").Inc()
return nil, errors.Wrap(hypergraph.ErrInvalidAtomType, "get vertex")
}
vertex, ok := atom.(*vertex)
if !ok {
GetVertexTotal.WithLabelValues("error").Inc()
ErrorsTotal.WithLabelValues("get_vertex", "invalid_atom").Inc()
return nil, errors.Wrap(hypergraph.ErrInvalidAtomType, "get vertex")
}
GetVertexTotal.WithLabelValues("success").Inc()
return vertex, nil
}
// AddVertex adds a vertex to the hypergraph. The vertex is added to the
// appropriate shard based on its ID.
func (hg *HypergraphCRDT) AddVertex(
txn tries.TreeBackingStoreTransaction,
v hypergraph.Vertex,
) error {
hg.mu.Lock()
defer hg.mu.Unlock()
return hg.addVertex(txn, v)
}
func (hg *HypergraphCRDT) addVertex(
txn tries.TreeBackingStoreTransaction,
v hypergraph.Vertex,
) error {
timer := prometheus.NewTimer(AddVertexDuration)
defer timer.ObserveDuration()
shardAddr := hypergraph.GetShardKey(v)
addSet, _ := hg.getOrCreateIdSet(
shardAddr,
hg.vertexAdds,
hg.vertexRemoves,
hypergraph.VertexAtomType,
hg.getCoveredPrefix(),
)
err := addSet.Add(txn, v)
if err != nil {
AddVertexTotal.WithLabelValues("error").Inc()
ErrorsTotal.WithLabelValues("add_vertex", "add_error").Inc()
return errors.Wrap(err, "add vertex")
}
hg.size.Add(hg.size, v.GetSize())
AddVertexTotal.WithLabelValues("success").Inc()
return nil
}
// RevertAddVertex undoes the addition of a vertex. This is used for rolling
// back failed transactions.
func (hg *HypergraphCRDT) RevertAddVertex(
txn tries.TreeBackingStoreTransaction,
v hypergraph.Vertex,
) error {
hg.mu.Lock()
defer hg.mu.Unlock()
shardAddr := hypergraph.GetShardKey(v)
addSet, _ := hg.getOrCreateIdSet(
shardAddr,
hg.vertexAdds,
hg.vertexRemoves,
hypergraph.VertexAtomType,
hg.getCoveredPrefix(),
)
if !addSet.Has(v.GetID()) {
RevertAddVertexTotal.WithLabelValues("success").Inc()
return nil
}
err := addSet.Delete(txn, v)
if err != nil {
RevertAddVertexTotal.WithLabelValues("error").Inc()
ErrorsTotal.WithLabelValues("revert_add_vertex", "delete_error").Inc()
return errors.Wrap(err, "revert add vertex")
}
hg.size.Sub(hg.size, v.GetSize())
RevertAddVertexTotal.WithLabelValues("success").Inc()
return nil
}
// RemoveVertex removes a vertex from the hypergraph. In CRDT semantics, this
// adds the vertex to the remove set. If the vertex doesn't exist, it's added to
// both sets for future conflict resolution.
func (hg *HypergraphCRDT) RemoveVertex(
txn tries.TreeBackingStoreTransaction,
v hypergraph.Vertex,
) error {
hg.mu.Lock()
defer hg.mu.Unlock()
timer := prometheus.NewTimer(RemoveVertexDuration)
defer timer.ObserveDuration()
shardKey := hypergraph.GetShardKey(v)
if !hg.lookupVertex(v) {
addSet, removeSet := hg.getOrCreateIdSet(
shardKey,
hg.vertexAdds,
hg.vertexRemoves,
hypergraph.VertexAtomType,
hg.getCoveredPrefix(),
)
if err := addSet.Add(txn, v); err != nil {
RemoveVertexTotal.WithLabelValues("error").Inc()
ErrorsTotal.WithLabelValues("remove_vertex", "add_error").Inc()
return errors.Wrap(err, "remove vertex")
}
if err := removeSet.Add(txn, v); err != nil {
RemoveVertexTotal.WithLabelValues("error").Inc()
ErrorsTotal.WithLabelValues("remove_vertex", "remove_error").Inc()
return errors.Wrap(err, "remove vertex")
}
RemoveVertexTotal.WithLabelValues("success").Inc()
return nil
}
_, removeSet := hg.getOrCreateIdSet(
shardKey,
hg.vertexAdds,
hg.vertexRemoves,
hypergraph.VertexAtomType,
hg.getCoveredPrefix(),
)
err := removeSet.Add(txn, v)
if err != nil {
RemoveVertexTotal.WithLabelValues("error").Inc()
ErrorsTotal.WithLabelValues("remove_vertex", "remove_error").Inc()
return err
}
hg.size.Sub(hg.size, v.GetSize())
RemoveVertexTotal.WithLabelValues("success").Inc()
return nil
}
// RevertRemoveVertex undoes the removal of a vertex. This removes the vertex
// from the remove set, effectively un-deleting it.
func (hg *HypergraphCRDT) RevertRemoveVertex(
txn tries.TreeBackingStoreTransaction,
v hypergraph.Vertex,
) error {
hg.mu.Lock()
defer hg.mu.Unlock()
shardKey := hypergraph.GetShardKey(v)
_, removeSet := hg.getOrCreateIdSet(
shardKey,
hg.vertexAdds,
hg.vertexRemoves,
hypergraph.VertexAtomType,
hg.getCoveredPrefix(),
)
err := removeSet.Delete(txn, v)
if err != nil {
RevertRemoveVertexTotal.WithLabelValues("error").Inc()
ErrorsTotal.WithLabelValues("revert_remove_vertex", "delete_error").Inc()
return err
}
hg.size.Add(hg.size, v.GetSize())
RevertRemoveVertexTotal.WithLabelValues("success").Inc()
return nil
}
// LookupVertex checks if a vertex exists in the hypergraph. Returns true if the
// vertex is in the add set and not in the remove set.
func (hg *HypergraphCRDT) LookupVertex(v hypergraph.Vertex) bool {
hg.mu.RLock()
defer hg.mu.RUnlock()
return hg.lookupVertex(v)
}
func (hg *HypergraphCRDT) lookupVertex(v hypergraph.Vertex) bool {
timer := prometheus.NewTimer(LookupDuration.WithLabelValues("vertex"))
defer timer.ObserveDuration()
shardAddr := hypergraph.GetShardKey(v)
addSet, removeSet := hg.getOrCreateIdSet(
shardAddr,
hg.vertexAdds,
hg.vertexRemoves,
hypergraph.VertexAtomType,
hg.getCoveredPrefix(),
)
id := v.GetID()
found := addSet.Has(id) && !removeSet.Has(id)
LookupVertexTotal.WithLabelValues(boolToString(found)).Inc()
return found
}