ceremonyclient/hypergraph/hypergraph.go
2026-02-08 23:49:28 -06:00

895 lines
23 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package hypergraph
import (
"context"
"math/big"
"slices"
"sync"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/channel"
"source.quilibrium.com/quilibrium/monorepo/types/crypto"
"source.quilibrium.com/quilibrium/monorepo/types/hypergraph"
"source.quilibrium.com/quilibrium/monorepo/types/tries"
up2p "source.quilibrium.com/quilibrium/monorepo/utils/p2p"
)
// HypergraphCRDT implements a CRDT-based 2P2P-Hypergraph. It maintains separate
// sets for additions and removals of vertices and hyperedges, allowing for
// conflict-free merging in distributed systems.
//
// The underlying trees have their own locks, but are atomic the lock is taken
// on single mutations and reads. While the IdSet exposes the trees for methods
// where direct access is critical, like hypersync, do _not_ expect you will
// be safe using the tree directly.
type HypergraphCRDT struct {
protobufs.UnsafeHypergraphComparisonServiceServer
logger *zap.Logger
// size tracks the total size of the hypergraph (adds - removes)
size *big.Int
// vertexAdds maps shard keys to sets of added vertices
vertexAdds map[tries.ShardKey]hypergraph.IdSet
// vertexRemoves maps shard keys to sets of removed vertices
vertexRemoves map[tries.ShardKey]hypergraph.IdSet
// hyperedgeAdds maps shard keys to sets of added hyperedges
hyperedgeAdds map[tries.ShardKey]hypergraph.IdSet
// hyperedgeRemoves maps shard keys to sets of removed hyperedges
hyperedgeRemoves map[tries.ShardKey]hypergraph.IdSet
// store provides persistence for the hypergraph data
store tries.TreeBackingStore
// prover generates cryptographic inclusion proofs
prover crypto.InclusionProver
// the limited prefix of paths we will accept data insertions for
coveredPrefix []int
// handles locking scenarios for transactions
syncController *hypergraph.SyncController
mu sync.RWMutex
setsMu sync.RWMutex
prefixMu sync.RWMutex
snapshotMgr *snapshotManager
// provides context-driven info for client identification
authenticationProvider channel.AuthenticationProvider
shutdownCtx context.Context
}
var _ hypergraph.Hypergraph = (*HypergraphCRDT)(nil)
var _ protobufs.HypergraphComparisonServiceServer = (*HypergraphCRDT)(nil)
// NewHypergraph creates a new CRDT-based hypergraph. The store provides
// persistence and the prover operates over the underlying vector commitment
// trees backing the sets.
func NewHypergraph(
logger *zap.Logger,
store tries.TreeBackingStore,
prover crypto.InclusionProver,
coveredPrefix []int,
authenticationProvider channel.AuthenticationProvider,
maxSyncSessions int,
) *HypergraphCRDT {
hg := &HypergraphCRDT{
logger: logger,
size: big.NewInt(0),
vertexAdds: make(map[tries.ShardKey]hypergraph.IdSet),
vertexRemoves: make(map[tries.ShardKey]hypergraph.IdSet),
hyperedgeAdds: make(map[tries.ShardKey]hypergraph.IdSet),
hyperedgeRemoves: make(map[tries.ShardKey]hypergraph.IdSet),
store: store,
prover: prover,
coveredPrefix: slices.Clone(coveredPrefix),
authenticationProvider: authenticationProvider,
syncController: hypergraph.NewSyncController(maxSyncSessions),
snapshotMgr: newSnapshotManager(logger, store),
}
hg.publishSnapshot(nil)
return hg
}
func (hg *HypergraphCRDT) publishSnapshot(root []byte) {
if hg.store == nil || hg.snapshotMgr == nil {
return
}
hg.logger.Debug("publishing snapshot")
hg.snapshotMgr.publish(root)
}
// PublishSnapshot announces a new snapshot generation with the given commit root.
// This should be called after Commit() to make the new state available for sync.
// Clients can request sync against this root using the expectedRoot parameter.
// The snapshot manager retains a limited number of historical generations.
func (hg *HypergraphCRDT) PublishSnapshot(root []byte) {
hg.publishSnapshot(root)
}
func (hg *HypergraphCRDT) cloneSetWithStore(
set hypergraph.IdSet,
store tries.TreeBackingStore,
) hypergraph.IdSet {
if store == nil {
return set
}
if typed, ok := set.(*idSet); ok {
return typed.cloneWithStore(store)
}
return set
}
// SetSelfPeerID sets the self peer ID on the sync controller. Sessions from
// this peer ID are allowed unlimited concurrency (for workers syncing to master).
func (hg *HypergraphCRDT) SetSelfPeerID(peerID string) {
if hg.syncController != nil {
hg.syncController.SetSelfPeerID(peerID)
}
}
func (hg *HypergraphCRDT) SetShutdownContext(ctx context.Context) {
hg.shutdownCtx = ctx
go func() {
select {
case <-hg.shutdownCtx.Done():
hg.snapshotMgr.publish(nil)
}
}()
}
func (hg *HypergraphCRDT) contextWithShutdown(
parent context.Context,
) (context.Context, context.CancelFunc) {
if hg.shutdownCtx == nil {
return parent, func() {}
}
ctx, cancel := context.WithCancel(parent)
go func() {
select {
case <-ctx.Done():
case <-hg.shutdownCtx.Done():
cancel()
}
}()
return ctx, cancel
}
func (hg *HypergraphCRDT) snapshotSet(
shardKey tries.ShardKey,
targetStore tries.TreeBackingStore,
setMap map[tries.ShardKey]hypergraph.IdSet,
atomType hypergraph.AtomType,
phaseType hypergraph.PhaseType,
) hypergraph.IdSet {
hg.setsMu.RLock()
set := setMap[shardKey]
hg.setsMu.RUnlock()
if set == nil {
// Try to load root from snapshot store since set doesn't exist in memory
var root tries.LazyVectorCommitmentNode
if targetStore != nil {
root, _ = targetStore.GetNodeByPath(
string(atomType),
string(phaseType),
shardKey,
[]int{}, // empty path = root
)
}
set = NewIdSet(
atomType,
phaseType,
shardKey,
targetStore, // Use target store directly since set is new
hg.prover,
root,
hg.getCoveredPrefix(),
)
// Return directly - no need to clone since we already used targetStore
return set
}
return hg.cloneSetWithStore(set, targetStore)
}
func (hg *HypergraphCRDT) snapshotPhaseSet(
shardKey tries.ShardKey,
phaseSet protobufs.HypergraphPhaseSet,
targetStore tries.TreeBackingStore,
) hypergraph.IdSet {
switch phaseSet {
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS:
return hg.snapshotSet(
shardKey,
targetStore,
hg.vertexAdds,
hypergraph.VertexAtomType,
hypergraph.AddsPhaseType,
)
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES:
return hg.snapshotSet(
shardKey,
targetStore,
hg.vertexRemoves,
hypergraph.VertexAtomType,
hypergraph.RemovesPhaseType,
)
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS:
return hg.snapshotSet(
shardKey,
targetStore,
hg.hyperedgeAdds,
hypergraph.HyperedgeAtomType,
hypergraph.AddsPhaseType,
)
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES:
return hg.snapshotSet(
shardKey,
targetStore,
hg.hyperedgeRemoves,
hypergraph.HyperedgeAtomType,
hypergraph.RemovesPhaseType,
)
default:
return nil
}
}
// NewTransaction creates a new transaction for atomic operations.
func (hg *HypergraphCRDT) NewTransaction(indexed bool) (
tries.TreeBackingStoreTransaction,
error,
) {
timer := prometheus.NewTimer(
TransactionDuration.WithLabelValues(boolToString(indexed)),
)
defer timer.ObserveDuration()
txn, err := hg.store.NewTransaction(indexed)
if err != nil {
TransactionTotal.WithLabelValues(boolToString(indexed), "error").Inc()
return nil, err
}
TransactionTotal.WithLabelValues(boolToString(indexed), "success").Inc()
return txn, nil
}
// GetProver returns the inclusion prover used by this hypergraph.
func (hg *HypergraphCRDT) GetProver() crypto.InclusionProver {
return hg.prover
}
// ImportTree imports an existing commitment tree into the hypergraph. This is
// used to load pre-existing hypergraph data from persistent storage. The
// atomType and phaseType determine which set the tree is imported into.
func (hg *HypergraphCRDT) ImportTree(
atomType hypergraph.AtomType,
phaseType hypergraph.PhaseType,
shardKey tries.ShardKey,
root tries.LazyVectorCommitmentNode,
store tries.TreeBackingStore,
prover crypto.InclusionProver,
) error {
hg.mu.Lock()
defer hg.mu.Unlock()
timer := prometheus.NewTimer(ImportTreeDuration)
defer timer.ObserveDuration()
set := NewIdSet(
atomType,
phaseType,
shardKey,
store,
prover,
root,
hg.getCoveredPrefix(),
)
treeSize := set.GetSize()
size, _ := treeSize.Float64()
ImportTreeSize.Observe(size)
switch atomType {
case hypergraph.VertexAtomType:
switch phaseType {
case hypergraph.AddsPhaseType:
hg.setsMu.Lock()
hg.size.Add(hg.size, treeSize)
hg.vertexAdds[shardKey] = set
hg.setsMu.Unlock()
case hypergraph.RemovesPhaseType:
hg.setsMu.Lock()
hg.size.Sub(hg.size, treeSize)
hg.vertexRemoves[shardKey] = set
hg.setsMu.Unlock()
}
case hypergraph.HyperedgeAtomType:
switch phaseType {
case hypergraph.AddsPhaseType:
hg.setsMu.Lock()
hg.size.Add(hg.size, treeSize)
hg.hyperedgeAdds[shardKey] = set
hg.setsMu.Unlock()
case hypergraph.RemovesPhaseType:
hg.setsMu.Lock()
hg.size.Sub(hg.size, treeSize)
hg.hyperedgeRemoves[shardKey] = set
hg.setsMu.Unlock()
}
}
ImportTreeTotal.WithLabelValues(
string(atomType),
string(phaseType),
"success",
).Inc()
return nil
}
// GetSize returns the current total size of the hypergraph. The size is
// calculated as the sum of all added atoms' data minus removed atoms (note:
// removed meaning removed from a set, not the atoms in remove sets).
func (hg *HypergraphCRDT) GetSize(
shardKey *tries.ShardKey,
path []int,
) *big.Int {
hg.mu.Lock()
defer hg.mu.Unlock()
if shardKey == nil {
sk := tries.ShardKey{
L1: [3]byte{0, 0, 0},
L2: [32]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
},
}
vas := hg.getVertexAddsSet(sk)
has := hg.getHyperedgeAddsSet(sk)
vrs := hg.getVertexRemovesSet(sk)
hrs := hg.getHyperedgeRemovesSet(sk)
size := new(big.Int).Set(hg.size)
size = size.Sub(
size,
new(big.Int).Add(
new(big.Int).Add(vas.GetSize(), has.GetSize()),
new(big.Int).Add(vrs.GetSize(), hrs.GetSize()),
),
)
return size
}
vas := hg.getVertexAddsSet(*shardKey)
has := hg.getHyperedgeAddsSet(*shardKey)
vrs := hg.getVertexRemovesSet(*shardKey)
hrs := hg.getHyperedgeRemovesSet(*shardKey)
if len(path) == 0 {
return new(big.Int).Add(
new(big.Int).Add(vas.GetSize(), has.GetSize()),
new(big.Int).Add(vrs.GetSize(), hrs.GetSize()),
)
} else {
sum := big.NewInt(0)
n, _ := vas.GetTree().GetByPath(path)
if n != nil {
sum = sum.Add(sum, n.GetSize())
}
o, _ := has.GetTree().GetByPath(path)
if o != nil {
sum = sum.Add(sum, o.GetSize())
}
p, _ := vrs.GetTree().GetByPath(path)
if p != nil {
sum = sum.Add(sum, p.GetSize())
}
q, _ := hrs.GetTree().GetByPath(path)
if q != nil {
sum = sum.Add(sum, q.GetSize())
}
return sum
}
}
// TrackChange marks a change for historical notation
func (hg *HypergraphCRDT) TrackChange(
txn tries.TreeBackingStoreTransaction,
key []byte,
oldValue *tries.VectorCommitmentTree,
frameNumber uint64,
phaseType string,
setType string,
shardKey tries.ShardKey,
) error {
hg.mu.Lock()
defer hg.mu.Unlock()
return hg.store.TrackChange(
txn,
key,
oldValue,
frameNumber,
phaseType,
setType,
shardKey,
)
}
// GetChanges returns the series of changes between frames, in reverse
// chronological order.
func (hg *HypergraphCRDT) GetChanges(
frameStart uint64,
frameEnd uint64,
phaseType string,
setType string,
shardKey tries.ShardKey,
) ([]*tries.ChangeRecord, error) {
hg.mu.RLock()
defer hg.mu.RUnlock()
return hg.getChanges(
frameStart,
frameEnd,
phaseType,
setType,
shardKey,
)
}
func (hg *HypergraphCRDT) getChanges(
frameStart uint64,
frameEnd uint64,
phaseType string,
setType string,
shardKey tries.ShardKey,
) ([]*tries.ChangeRecord, error) {
return hg.store.GetChanges(
frameStart,
frameEnd,
phaseType,
setType,
shardKey,
)
}
// RevertChanges reverts the series of changes between frames, in reverse
// chronological order.
func (hg *HypergraphCRDT) RevertChanges(
txn tries.TreeBackingStoreTransaction,
frameStart uint64,
frameEnd uint64,
shardKey tries.ShardKey,
) error {
hg.mu.Lock()
defer hg.mu.Unlock()
// Genesis cannot be reverted
if frameStart == 0 {
frameStart = 1
}
// Get all changes for the frame range
vertexAdds, err := hg.getChanges(
frameStart,
frameEnd,
string(hypergraph.AddsPhaseType),
string(hypergraph.VertexAtomType),
shardKey,
)
if err != nil {
return errors.Wrap(err, "revert changes")
}
vertexRemoves, err := hg.getChanges(
frameStart,
frameEnd,
string(hypergraph.RemovesPhaseType),
string(hypergraph.VertexAtomType),
shardKey,
)
if err != nil {
return errors.Wrap(err, "revert changes")
}
hyperedgeAdds, err := hg.getChanges(
frameStart,
frameEnd,
string(hypergraph.AddsPhaseType),
string(hypergraph.HyperedgeAtomType),
shardKey,
)
if err != nil {
return errors.Wrap(err, "revert changes")
}
hyperedgeRemoves, err := hg.getChanges(
frameStart,
frameEnd,
string(hypergraph.RemovesPhaseType),
string(hypergraph.HyperedgeAtomType),
shardKey,
)
if err != nil {
return errors.Wrap(err, "revert changes")
}
// Create maps indexed by frame number for efficient lookup
vertexAddsMap := make(map[uint64][]*tries.ChangeRecord)
vertexRemovesMap := make(map[uint64][]*tries.ChangeRecord)
hyperedgeAddsMap := make(map[uint64][]*tries.ChangeRecord)
hyperedgeRemovesMap := make(map[uint64][]*tries.ChangeRecord)
for _, change := range vertexAdds {
change := change
vertexAddsMap[change.Frame] = append(vertexAddsMap[change.Frame], change)
}
for _, change := range vertexRemoves {
change := change
vertexRemovesMap[change.Frame] = append(
vertexRemovesMap[change.Frame],
change,
)
}
for _, change := range hyperedgeAdds {
change := change
hyperedgeAddsMap[change.Frame] = append(
hyperedgeAddsMap[change.Frame],
change,
)
}
for _, change := range hyperedgeRemoves {
change := change
hyperedgeRemovesMap[change.Frame] = append(
hyperedgeRemovesMap[change.Frame],
change,
)
}
// Process frames in descending order
for frame := frameEnd; frame >= frameStart; frame-- {
// Revert hyperedge removes for this frame
if hrs, ok := hyperedgeRemovesMap[frame]; ok {
for _, change := range hrs {
// Remove from the hyperedge removes tree
err = hg.hyperedgeRemoves[shardKey].GetTree().Delete(txn, change.Key)
if err != nil {
return errors.Wrap(err, "revert changes")
}
// Clean up change record
err = hg.store.UntrackChange(
txn,
change.Key,
frame,
string(hypergraph.RemovesPhaseType),
string(hypergraph.HyperedgeAtomType),
shardKey,
)
if err != nil {
return errors.Wrap(err, "revert changes")
}
}
}
// Revert vertex removes for this frame
if vrs, ok := vertexRemovesMap[frame]; ok {
for _, change := range vrs {
// Remove from the vertex removes tree
err = hg.vertexRemoves[shardKey].GetTree().Delete(txn, change.Key)
if err != nil {
return errors.Wrap(err, "revert changes")
}
// Clean up change record
err = hg.store.UntrackChange(
txn,
change.Key,
frame,
string(hypergraph.RemovesPhaseType),
string(hypergraph.VertexAtomType),
shardKey,
)
if err != nil {
return errors.Wrap(err, "revert changes")
}
}
}
// Revert hyperedge adds for this frame
if has, ok := hyperedgeAddsMap[frame]; ok {
for _, change := range has {
// Restore the previous hyperedge extrinsic value
if change.OldValue != nil {
// Update the hyperedge adds tree with the old value
err = hg.addHyperedge(txn, &hyperedge{
appAddress: [32]byte(change.Key[:32]),
dataAddress: [32]byte(change.Key[32:]),
extTree: change.OldValue,
})
if err != nil {
return errors.Wrap(err, "revert changes")
}
} else {
// If nil, this was the first add
err = hg.hyperedgeAdds[shardKey].GetTree().Delete(txn, change.Key)
if err != nil {
return errors.Wrap(err, "revert changes")
}
}
// Clean up change record
err = hg.store.UntrackChange(
txn,
change.Key,
frame,
string(hypergraph.AddsPhaseType),
string(hypergraph.HyperedgeAtomType),
shardKey,
)
if err != nil {
return errors.Wrap(err, "revert changes")
}
}
}
// Revert vertex adds for this frame
if vas, ok := vertexAddsMap[frame]; ok {
for _, change := range vas {
// Restore the previous vertex value
if change.OldValue != nil {
// Update the vertex adds with the old value
err = hg.addVertex(txn, NewVertex(
[32]byte(change.Key[:32]),
[32]byte(change.Key[32:]),
change.OldValue.Commit(hg.GetProver(), false),
change.OldValue.GetSize(),
))
if err != nil {
return errors.Wrap(err, "revert changes")
}
} else {
// If nil, this was the first add
err = hg.vertexAdds[shardKey].GetTree().Delete(txn, change.Key)
if err != nil {
return errors.Wrap(err, "revert changes")
}
}
// Clean up change record
err = hg.store.UntrackChange(
txn,
change.Key,
frame,
string(hypergraph.AddsPhaseType),
string(hypergraph.VertexAtomType),
shardKey,
)
if err != nil {
return errors.Wrap(err, "revert changes")
}
}
}
}
return nil
}
// GetMetadataAtKey is a fast path to retrieve metadata information used for
// consensus, avoiding unnecessary recomputation for lookups.
func (hg *HypergraphCRDT) GetMetadataAtKey(pathKey []byte) (
[]hypergraph.ShardMetadata,
error,
) {
hg.mu.Lock()
defer hg.mu.Unlock()
if len(pathKey) < 32 {
return nil, errors.Wrap(
hypergraph.ErrInvalidLocation,
"get metadata at key",
)
}
l1 := up2p.GetBloomFilterIndices(pathKey[:32], 256, 3)
shardKey := tries.ShardKey{
L1: [3]byte(l1),
L2: [32]byte(pathKey[:32]),
}
coveredPrefix := hg.getCoveredPrefix()
vertexAdds, vertexRemoves := hg.getOrCreateIdSet(
shardKey,
hg.vertexAdds,
hg.vertexRemoves,
hypergraph.VertexAtomType,
coveredPrefix,
)
hyperedgeAdds, hyperedgeRemoves := hg.getOrCreateIdSet(
shardKey,
hg.hyperedgeAdds,
hg.hyperedgeRemoves,
hypergraph.HyperedgeAtomType,
coveredPrefix,
)
metadata := []hypergraph.ShardMetadata{}
ipath := tries.GetFullPath(pathKey[:32])
path := []int{}
for _, p := range ipath {
path = append(path, int(p))
}
for _, p := range pathKey[32:] {
path = append(path, int(p))
}
for _, set := range []hypergraph.IdSet{
vertexAdds,
vertexRemoves,
hyperedgeAdds,
hyperedgeRemoves,
} {
node, err := set.GetTree().Store.GetNodeByPath(
set.GetTree().SetType,
set.GetTree().PhaseType,
shardKey,
path,
)
if err != nil {
metadata = append(metadata, hypergraph.ShardMetadata{
Commitment: make([]byte, 64),
LeafCount: 0,
Size: 0,
})
continue
}
if node != nil {
switch t := node.(type) {
case *tries.LazyVectorCommitmentBranchNode:
metadata = append(metadata, hypergraph.ShardMetadata{
Commitment: t.Commitment,
LeafCount: uint64(t.LeafCount),
Size: t.Size.Uint64(),
})
case *tries.LazyVectorCommitmentLeafNode:
metadata = append(metadata, hypergraph.ShardMetadata{
Commitment: t.Commitment,
LeafCount: 1,
Size: t.Size.Uint64(),
})
}
} else {
metadata = append(metadata, hypergraph.ShardMetadata{
Commitment: make([]byte, 64),
LeafCount: 0,
Size: 0,
})
}
}
return metadata, nil
}
// boolToString converts a boolean to string for Prometheus labels.
func boolToString(b bool) string {
if b {
return "true"
}
return "false"
}
// DeleteVertexAdd performs a hard delete of a vertex from the VertexAdds set.
// Unlike RemoveVertex (which adds to VertexRemoves for CRDT semantics), this
// actually removes the entry from VertexAdds and deletes the associated vertex
// data. This is used for pruning stale/orphaned data that should not be synced.
//
// The caller must provide a transaction for atomic deletion of both the tree
// entry and the vertex data.
func (hg *HypergraphCRDT) DeleteVertexAdd(
txn tries.TreeBackingStoreTransaction,
shardKey tries.ShardKey,
vertexID [64]byte,
) error {
hg.mu.Lock()
defer hg.mu.Unlock()
set := hg.getVertexAddsSet(shardKey)
tree := set.GetTree()
// Delete from the tree (removes tree node and path index)
if err := tree.Delete(txn, vertexID[:]); err != nil {
return errors.Wrap(err, "delete vertex add: tree delete")
}
// Delete the vertex data
if err := tree.Store.DeleteVertexTree(txn, vertexID[:]); err != nil {
// Log but don't fail - vertex data may already be gone
hg.logger.Debug(
"delete vertex add: vertex data not found",
zap.String("vertex_id", string(vertexID[:])),
zap.Error(err),
)
}
return nil
}
// DeleteVertexRemove performs a hard delete of a vertex from the VertexRemoves
// set. This removes the entry from VertexRemoves, effectively "un-removing" the
// vertex if it still exists in VertexAdds. This is used for pruning stale data.
func (hg *HypergraphCRDT) DeleteVertexRemove(
txn tries.TreeBackingStoreTransaction,
shardKey tries.ShardKey,
vertexID [64]byte,
) error {
hg.mu.Lock()
defer hg.mu.Unlock()
set := hg.getVertexRemovesSet(shardKey)
tree := set.GetTree()
// Delete from the tree
if err := tree.Delete(txn, vertexID[:]); err != nil {
return errors.Wrap(err, "delete vertex remove: tree delete")
}
return nil
}
// DeleteHyperedgeAdd performs a hard delete of a hyperedge from the
// HyperedgeAdds set. Unlike RemoveHyperedge (which adds to HyperedgeRemoves for
// CRDT semantics), this actually removes the entry from HyperedgeAdds. This is
// used for pruning stale/orphaned data.
func (hg *HypergraphCRDT) DeleteHyperedgeAdd(
txn tries.TreeBackingStoreTransaction,
shardKey tries.ShardKey,
hyperedgeID [64]byte,
) error {
hg.mu.Lock()
defer hg.mu.Unlock()
set := hg.getHyperedgeAddsSet(shardKey)
tree := set.GetTree()
// Delete from the tree
if err := tree.Delete(txn, hyperedgeID[:]); err != nil {
return errors.Wrap(err, "delete hyperedge add: tree delete")
}
return nil
}
// DeleteHyperedgeRemove performs a hard delete of a hyperedge from the
// HyperedgeRemoves set. This is used for pruning stale data.
func (hg *HypergraphCRDT) DeleteHyperedgeRemove(
txn tries.TreeBackingStoreTransaction,
shardKey tries.ShardKey,
hyperedgeID [64]byte,
) error {
hg.mu.Lock()
defer hg.mu.Unlock()
set := hg.getHyperedgeRemovesSet(shardKey)
tree := set.GetTree()
// Delete from the tree
if err := tree.Delete(txn, hyperedgeID[:]); err != nil {
return errors.Wrap(err, "delete hyperedge remove: tree delete")
}
return nil
}