mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 18:37:26 +08:00
* experiment: reject bad peer info messages * v2.1.0.18 preview * add tagged sync * Add missing hypergraph changes * small tweaks to sync * allow local sync, use it for provers with workers * missing file * resolve build error * resolve sync issue, remove raw sync * resolve deletion promotion bug * resolve sync abstraction leak from tree deletion changes * rearrange prover sync * remove pruning from sync * restore removed sync flag * fix: sync, event stream deadlock, heuristic scoring of better shards * resolve hanging shutdown + pubsub proxy issue * further bugfixes: sync (restore old leaf sync), pubsub shutdown, merge events * fix: clean up rust ffi, background coverage events, and sync tweaks * fix: linking issue for channel, connectivity test aggression, sync regression, join tests * fix: disjoint sync, improper application of filter * resolve sync/reel/validation deadlock * adjust sync to handle no leaf edge cases, multi-path segment traversal * use simpler sync * faster, simpler sync with some debug extras * migration to recalculate * don't use batch * square up the roots * fix nil pointer * fix: seniority calculation, sync race condition, migration * make sync dumber * fix: tree deletion issue * fix: missing seniority merge request canonical serialization * address issues from previous commit test * stale workers should be cleared * remove missing gap check * rearrange collect, reduce sync logging noise * fix: the disjoint leaf/branch sync case * nuclear option on sync failures * v2.1.0.18, finalized
2088 lines
56 KiB
Go
2088 lines
56 KiB
Go
package provers
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"slices"
|
|
"sort"
|
|
"sync"
|
|
|
|
"github.com/iden3/go-iden3-crypto/poseidon"
|
|
"github.com/pkg/errors"
|
|
"go.uber.org/zap"
|
|
"source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/global"
|
|
hgstate "source.quilibrium.com/quilibrium/monorepo/node/execution/state/hypergraph"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/consensus"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/execution/intrinsics"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/execution/state"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/hypergraph"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/schema"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/tries"
|
|
)
|
|
|
|
var _ consensus.ProverRegistry = (*ProverRegistry)(nil)
|
|
|
|
// ProverRegistry is the default implementation of ProverRegistry
|
|
type ProverRegistry struct {
|
|
mu sync.RWMutex
|
|
logger *zap.Logger
|
|
|
|
// Hypergraph instance for state queries
|
|
hypergraph hypergraph.Hypergraph
|
|
|
|
// Global prover trie
|
|
globalTrie *tries.RollingFrecencyCritbitTrie
|
|
|
|
// Per-shard prover tries, keyed by filter
|
|
shardTries map[string]*tries.RollingFrecencyCritbitTrie
|
|
|
|
// Prover info cache, keyed by address
|
|
proverCache map[string]*consensus.ProverInfo
|
|
|
|
// Filter cache, keyed by filter (as string) to sorted list of ProverInfo
|
|
filterCache map[string][]*consensus.ProverInfo
|
|
|
|
// Track which addresses are in which tries for efficient lookup
|
|
addressToFilters map[string][]string
|
|
|
|
// Current frame number
|
|
currentFrame uint64
|
|
|
|
// RDF reader
|
|
rdfMultiprover *schema.RDFMultiprover
|
|
}
|
|
|
|
// NewProverRegistry creates a new prover registry with the given hypergraph
|
|
func NewProverRegistry(logger *zap.Logger, hg hypergraph.Hypergraph) (
|
|
consensus.ProverRegistry,
|
|
error,
|
|
) {
|
|
logger.Debug("creating new prover registry")
|
|
|
|
registry := &ProverRegistry{
|
|
logger: logger,
|
|
hypergraph: hg,
|
|
globalTrie: &tries.RollingFrecencyCritbitTrie{},
|
|
shardTries: make(map[string]*tries.RollingFrecencyCritbitTrie),
|
|
proverCache: make(map[string]*consensus.ProverInfo),
|
|
filterCache: make(map[string][]*consensus.ProverInfo),
|
|
addressToFilters: make(map[string][]string),
|
|
rdfMultiprover: schema.NewRDFMultiprover(
|
|
&schema.TurtleRDFParser{},
|
|
hg.GetProver(),
|
|
),
|
|
}
|
|
|
|
// Initialize from current hypergraph state
|
|
logger.Debug("extracting initial global state from hypergraph")
|
|
if err := registry.extractGlobalState(); err != nil {
|
|
logger.Error("failed to extract global state", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
logger.Debug("prover registry created successfully")
|
|
return registry, nil
|
|
}
|
|
|
|
// ProcessStateTransition implements ProverRegistry
|
|
func (r *ProverRegistry) ProcessStateTransition(
|
|
state state.State,
|
|
frameNumber uint64,
|
|
) error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
r.logger.Debug(
|
|
"processing state transition",
|
|
zap.Uint64("frame_number", frameNumber),
|
|
zap.Uint64("previous_frame", r.currentFrame),
|
|
)
|
|
|
|
r.currentFrame = frameNumber
|
|
|
|
changes := state.Changeset()
|
|
r.logger.Debug("processing changeset", zap.Int("change_count", len(changes)))
|
|
|
|
// Process each change
|
|
for _, change := range changes {
|
|
// Check if this is a change to a prover vertex under
|
|
// GLOBAL_INTRINSIC_ADDRESS
|
|
if len(change.Domain) == 32 && bytes.Equal(
|
|
change.Domain,
|
|
intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
|
|
) {
|
|
if err := r.processProverChange(change, frameNumber); err != nil {
|
|
r.logger.Debug(
|
|
"failed to process prover change",
|
|
zap.String("address", fmt.Sprintf("%x", change.Address)),
|
|
zap.Error(err),
|
|
)
|
|
return errors.Wrap(err, "failed to process prover change")
|
|
}
|
|
}
|
|
// For alt fee basis shards, your custom node will want to handle insertions
|
|
// for alt shards here.
|
|
}
|
|
|
|
r.logger.Debug(
|
|
"state transition processed successfully",
|
|
zap.Uint64("frame_number", frameNumber),
|
|
)
|
|
return nil
|
|
}
|
|
|
|
// GetProverInfo implements ProverRegistry
|
|
func (r *ProverRegistry) GetProverInfo(
|
|
address []byte,
|
|
) (*consensus.ProverInfo, error) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
r.logger.Debug(
|
|
"getting prover info",
|
|
zap.String("address", fmt.Sprintf("%x", address)),
|
|
)
|
|
|
|
if info, exists := r.proverCache[string(address)]; exists {
|
|
return copyProverInfo(info), nil
|
|
}
|
|
|
|
r.logger.Debug(
|
|
"prover info not found",
|
|
zap.String("address", fmt.Sprintf("%x", address)),
|
|
)
|
|
return nil, nil
|
|
}
|
|
|
|
// copyProverInfo returns a deep copy of a ProverInfo to avoid callers
|
|
// holding mutable references into the proverCache.
|
|
func copyProverInfo(info *consensus.ProverInfo) *consensus.ProverInfo {
|
|
if info == nil {
|
|
return nil
|
|
}
|
|
cp := &consensus.ProverInfo{
|
|
PublicKey: make([]byte, len(info.PublicKey)),
|
|
Address: make([]byte, len(info.Address)),
|
|
Status: info.Status,
|
|
KickFrameNumber: info.KickFrameNumber,
|
|
AvailableStorage: info.AvailableStorage,
|
|
Seniority: info.Seniority,
|
|
DelegateAddress: make([]byte, len(info.DelegateAddress)),
|
|
Allocations: make([]consensus.ProverAllocationInfo, len(info.Allocations)),
|
|
}
|
|
copy(cp.PublicKey, info.PublicKey)
|
|
copy(cp.Address, info.Address)
|
|
copy(cp.DelegateAddress, info.DelegateAddress)
|
|
for i, a := range info.Allocations {
|
|
cp.Allocations[i] = consensus.ProverAllocationInfo{
|
|
Status: a.Status,
|
|
ConfirmationFilter: make([]byte, len(a.ConfirmationFilter)),
|
|
RejectionFilter: make([]byte, len(a.RejectionFilter)),
|
|
JoinFrameNumber: a.JoinFrameNumber,
|
|
LeaveFrameNumber: a.LeaveFrameNumber,
|
|
PauseFrameNumber: a.PauseFrameNumber,
|
|
ResumeFrameNumber: a.ResumeFrameNumber,
|
|
KickFrameNumber: a.KickFrameNumber,
|
|
JoinConfirmFrameNumber: a.JoinConfirmFrameNumber,
|
|
JoinRejectFrameNumber: a.JoinRejectFrameNumber,
|
|
LeaveConfirmFrameNumber: a.LeaveConfirmFrameNumber,
|
|
LeaveRejectFrameNumber: a.LeaveRejectFrameNumber,
|
|
LastActiveFrameNumber: a.LastActiveFrameNumber,
|
|
VertexAddress: make([]byte, len(a.VertexAddress)),
|
|
}
|
|
copy(cp.Allocations[i].ConfirmationFilter, a.ConfirmationFilter)
|
|
copy(cp.Allocations[i].RejectionFilter, a.RejectionFilter)
|
|
copy(cp.Allocations[i].VertexAddress, a.VertexAddress)
|
|
}
|
|
return cp
|
|
}
|
|
|
|
// GetNextProver implements ProverRegistry
|
|
func (r *ProverRegistry) GetNextProver(
|
|
input [32]byte,
|
|
filter []byte,
|
|
) ([]byte, error) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
r.logger.Debug(
|
|
"getting next prover",
|
|
zap.String("input", fmt.Sprintf("%x", input)),
|
|
zap.String("filter", fmt.Sprintf("%x", filter)),
|
|
)
|
|
|
|
var trie *tries.RollingFrecencyCritbitTrie
|
|
if len(filter) == 0 {
|
|
trie = r.globalTrie
|
|
r.logger.Debug("using global trie")
|
|
} else {
|
|
if shardTrie, exists := r.shardTries[string(filter)]; exists {
|
|
trie = shardTrie
|
|
r.logger.Debug(
|
|
"using shard trie",
|
|
zap.String("filter", fmt.Sprintf("%x", filter)),
|
|
)
|
|
} else {
|
|
r.logger.Debug(
|
|
"shard trie not found",
|
|
zap.String("filter", fmt.Sprintf("%x", filter)),
|
|
)
|
|
return nil, errors.Wrap(
|
|
errors.New("shard trie not available"),
|
|
"get next prover",
|
|
)
|
|
}
|
|
}
|
|
|
|
nearest := trie.FindNearest(input[:])
|
|
if nearest.Key == nil {
|
|
r.logger.Debug("no prover found in trie")
|
|
return nil, errors.Wrap(
|
|
errors.New("shard trie empty"),
|
|
"get next prover",
|
|
)
|
|
}
|
|
|
|
r.logger.Debug(
|
|
"next prover found",
|
|
zap.String("prover", fmt.Sprintf("%x", nearest.Key)),
|
|
)
|
|
return nearest.Key, nil
|
|
}
|
|
|
|
func (r *ProverRegistry) GetOrderedProvers(
|
|
input [32]byte,
|
|
filter []byte,
|
|
) ([][]byte, error) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
r.logger.Debug(
|
|
"getting ordered provers",
|
|
zap.String("input", fmt.Sprintf("%x", input)),
|
|
zap.String("filter", fmt.Sprintf("%x", filter)),
|
|
)
|
|
|
|
var trie *tries.RollingFrecencyCritbitTrie
|
|
if len(filter) == 0 {
|
|
trie = r.globalTrie
|
|
r.logger.Debug("using global trie for ordered provers")
|
|
} else {
|
|
if shardTrie, exists := r.shardTries[string(filter)]; exists {
|
|
trie = shardTrie
|
|
r.logger.Debug(
|
|
"using shard trie for ordered provers",
|
|
zap.String("filter", fmt.Sprintf("%x", filter)),
|
|
)
|
|
} else {
|
|
r.logger.Debug(
|
|
"shard trie not found for filter",
|
|
zap.String("filter", fmt.Sprintf("%x", filter)),
|
|
)
|
|
return nil, nil
|
|
}
|
|
}
|
|
|
|
nearest := trie.FindNearestAndApproximateNeighbors(input[:])
|
|
addresses := [][]byte{}
|
|
for _, leaf := range nearest {
|
|
addresses = append(addresses, leaf.Key)
|
|
}
|
|
|
|
r.logger.Debug("ordered provers retrieved", zap.Int("count", len(addresses)))
|
|
return addresses, nil
|
|
}
|
|
|
|
// GetActiveProvers implements ProverRegistry
|
|
func (r *ProverRegistry) GetActiveProvers(
|
|
filter []byte,
|
|
) ([]*consensus.ProverInfo, error) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
result, err := r.getProversByStatusInternal(
|
|
filter,
|
|
consensus.ProverStatusActive,
|
|
)
|
|
if err != nil {
|
|
r.logger.Debug("failed to get active provers", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// GetProvers implements ProverRegistry
|
|
func (r *ProverRegistry) GetProvers(filter []byte) (
|
|
[]*consensus.ProverInfo,
|
|
error,
|
|
) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
r.logger.Debug(
|
|
"getting provers",
|
|
zap.String("filter", fmt.Sprintf("%x", filter)),
|
|
)
|
|
|
|
var result []*consensus.ProverInfo
|
|
result = append(result, r.filterCache[string(filter)]...)
|
|
|
|
sort.Slice(result, func(i, j int) bool {
|
|
return bytes.Compare(result[i].Address, result[j].Address) == -1
|
|
})
|
|
|
|
r.logger.Debug("provers retrieved", zap.Int("count", len(result)))
|
|
return result, nil
|
|
}
|
|
|
|
// GetProverCount implements ProverRegistry
|
|
func (r *ProverRegistry) GetProverCount(filter []byte) (int, error) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
r.logger.Debug(
|
|
"getting prover count",
|
|
zap.String("filter", fmt.Sprintf("%x", filter)),
|
|
)
|
|
|
|
var trie *tries.RollingFrecencyCritbitTrie
|
|
if len(filter) == 0 {
|
|
trie = r.globalTrie
|
|
r.logger.Debug("counting provers in global trie")
|
|
} else {
|
|
if shardTrie, exists := r.shardTries[string(filter)]; exists {
|
|
trie = shardTrie
|
|
r.logger.Debug(
|
|
"counting provers in shard trie",
|
|
zap.String("filter", fmt.Sprintf("%x", filter)),
|
|
)
|
|
} else {
|
|
r.logger.Debug(
|
|
"shard trie not found, returning count 0",
|
|
zap.String("filter", fmt.Sprintf("%x", filter)),
|
|
)
|
|
return 0, nil
|
|
}
|
|
}
|
|
|
|
count := len(trie.FindNearestAndApproximateNeighbors(make([]byte, 32)))
|
|
r.logger.Debug("prover count retrieved", zap.Int("count", count))
|
|
|
|
return count, nil
|
|
}
|
|
|
|
// GetProversByStatus implements ProverRegistry
|
|
func (r *ProverRegistry) GetProversByStatus(
|
|
filter []byte,
|
|
status consensus.ProverStatus,
|
|
) ([]*consensus.ProverInfo, error) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
r.logger.Debug(
|
|
"getting provers by status",
|
|
zap.String("filter", fmt.Sprintf("%x", filter)),
|
|
zap.Uint8("status", uint8(status)),
|
|
)
|
|
|
|
result, err := r.getProversByStatusInternal(filter, status)
|
|
if err != nil {
|
|
r.logger.Debug("failed to get provers by status", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
r.logger.Debug(
|
|
"provers by status retrieved",
|
|
zap.Uint8("status", uint8(status)),
|
|
zap.Int("count", len(result)),
|
|
)
|
|
return result, nil
|
|
}
|
|
|
|
// UpdateProverActivity implements ProverRegistry
|
|
func (r *ProverRegistry) UpdateProverActivity(
|
|
address []byte,
|
|
filter []byte,
|
|
frameNumber uint64,
|
|
) error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
r.logger.Debug(
|
|
"updating prover activity",
|
|
zap.String("address", fmt.Sprintf("%x", address)),
|
|
zap.Uint64("frame_number", frameNumber),
|
|
)
|
|
|
|
if info, exists := r.proverCache[string(address)]; exists {
|
|
active := 0
|
|
// Update last active frame for all active allocations
|
|
for i := range info.Allocations {
|
|
if info.Allocations[i].Status == consensus.ProverStatusActive {
|
|
if bytes.Equal(
|
|
info.Allocations[i].ConfirmationFilter,
|
|
filter,
|
|
) {
|
|
info.Allocations[i].LastActiveFrameNumber = frameNumber
|
|
}
|
|
active++
|
|
}
|
|
}
|
|
r.logger.Debug(
|
|
"prover activity updated",
|
|
zap.String("address", fmt.Sprintf("%x", address)),
|
|
zap.Int("active_allocations", active),
|
|
)
|
|
} else {
|
|
r.logger.Debug(
|
|
"prover not found for activity update",
|
|
zap.String("address", fmt.Sprintf("%x", address)),
|
|
)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// PruneOrphanJoins implements ProverRegistry
|
|
func (r *ProverRegistry) PruneOrphanJoins(frameNumber uint64) error {
|
|
// Pruning is disabled — it was causing tree divergence between nodes
|
|
// because non-deterministic pruning timing led to different tree states,
|
|
// preventing sync convergence.
|
|
return nil
|
|
}
|
|
|
|
func (r *ProverRegistry) pruneAllocationVertex(
|
|
txn tries.TreeBackingStoreTransaction,
|
|
info *consensus.ProverInfo,
|
|
allocation consensus.ProverAllocationInfo,
|
|
) error {
|
|
if info == nil {
|
|
return errors.New("missing info")
|
|
}
|
|
|
|
var vertexID [64]byte
|
|
copy(vertexID[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:])
|
|
|
|
// Use pre-computed VertexAddress if available, otherwise derive from public
|
|
// key
|
|
if len(allocation.VertexAddress) == 32 {
|
|
copy(vertexID[32:], allocation.VertexAddress)
|
|
} else if len(info.PublicKey) == 0 {
|
|
r.logger.Warn(
|
|
"unable to prune allocation without vertex address or public key",
|
|
zap.String("address", hex.EncodeToString(info.Address)),
|
|
)
|
|
return nil
|
|
} else {
|
|
// Fallback: derive vertex address from public key (legacy path)
|
|
allocationHash, err := poseidon.HashBytes(
|
|
slices.Concat(
|
|
[]byte("PROVER_ALLOCATION"),
|
|
info.PublicKey,
|
|
allocation.ConfirmationFilter,
|
|
),
|
|
)
|
|
if err != nil {
|
|
return errors.Wrap(err, "prune allocation hash")
|
|
}
|
|
copy(
|
|
vertexID[32:],
|
|
allocationHash.FillBytes(make([]byte, 32)),
|
|
)
|
|
}
|
|
|
|
shardKey := tries.ShardKey{
|
|
L1: [3]byte{0x00, 0x00, 0x00},
|
|
L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)),
|
|
}
|
|
|
|
// Use DeleteVertexAdd which properly handles locking and deletes both
|
|
// the tree entry and the vertex data atomically
|
|
if err := r.hypergraph.DeleteVertexAdd(txn, shardKey, vertexID); err != nil {
|
|
r.logger.Debug(
|
|
"could not delete allocation vertex during prune",
|
|
zap.String("address", hex.EncodeToString(info.Address)),
|
|
zap.String(
|
|
"filter",
|
|
hex.EncodeToString(allocation.ConfirmationFilter),
|
|
),
|
|
zap.String("vertex_id", hex.EncodeToString(vertexID[:])),
|
|
zap.Error(err),
|
|
)
|
|
// Don't return error - the vertex may already be deleted
|
|
} else {
|
|
r.logger.Debug(
|
|
"deleted allocation vertex during prune",
|
|
zap.String("address", hex.EncodeToString(info.Address)),
|
|
zap.String(
|
|
"filter",
|
|
hex.EncodeToString(allocation.ConfirmationFilter),
|
|
),
|
|
zap.String("vertex_id", hex.EncodeToString(vertexID[:])),
|
|
)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// pruneProverRecord removes a prover's vertex, hyperedge, and associated data
|
|
// when all of its allocations have been pruned.
|
|
func (r *ProverRegistry) pruneProverRecord(
|
|
txn tries.TreeBackingStoreTransaction,
|
|
shardKey tries.ShardKey,
|
|
info *consensus.ProverInfo,
|
|
) error {
|
|
if info == nil || len(info.Address) == 0 {
|
|
return errors.New("missing prover info")
|
|
}
|
|
|
|
// Construct the prover vertex ID
|
|
var proverVertexID [64]byte
|
|
copy(proverVertexID[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:])
|
|
copy(proverVertexID[32:], info.Address)
|
|
|
|
// Delete prover vertex using DeleteVertexAdd which properly handles locking
|
|
// and deletes both the tree entry and vertex data atomically
|
|
if err := r.hypergraph.DeleteVertexAdd(txn, shardKey, proverVertexID); err != nil {
|
|
r.logger.Debug(
|
|
"could not delete prover vertex during prune",
|
|
zap.String("address", hex.EncodeToString(info.Address)),
|
|
zap.String("vertex_id", hex.EncodeToString(proverVertexID[:])),
|
|
zap.Error(err),
|
|
)
|
|
// Don't return error - the vertex may already be deleted
|
|
} else {
|
|
r.logger.Debug(
|
|
"deleted prover vertex during prune",
|
|
zap.String("address", hex.EncodeToString(info.Address)),
|
|
zap.String("vertex_id", hex.EncodeToString(proverVertexID[:])),
|
|
)
|
|
}
|
|
|
|
// Delete prover hyperedge using DeleteHyperedgeAdd
|
|
if err := r.hypergraph.DeleteHyperedgeAdd(txn, shardKey, proverVertexID); err != nil {
|
|
r.logger.Debug(
|
|
"could not delete prover hyperedge during prune",
|
|
zap.String("address", hex.EncodeToString(info.Address)),
|
|
zap.String("hyperedge_id", hex.EncodeToString(proverVertexID[:])),
|
|
zap.Error(err),
|
|
)
|
|
// Don't return error - the hyperedge may already be deleted
|
|
} else {
|
|
r.logger.Debug(
|
|
"deleted prover hyperedge during prune",
|
|
zap.String("address", hex.EncodeToString(info.Address)),
|
|
zap.String("hyperedge_id", hex.EncodeToString(proverVertexID[:])),
|
|
)
|
|
}
|
|
|
|
r.logger.Debug(
|
|
"pruned prover record",
|
|
zap.String("address", hex.EncodeToString(info.Address)),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *ProverRegistry) cleanupFilterCache(
|
|
info *consensus.ProverInfo,
|
|
filters map[string]struct{},
|
|
) {
|
|
if len(filters) == 0 {
|
|
return
|
|
}
|
|
|
|
for filterKey := range filters {
|
|
if r.proverHasFilter(info, filterKey) {
|
|
continue
|
|
}
|
|
r.removeFilterCacheEntry(filterKey, info)
|
|
}
|
|
}
|
|
|
|
func (r *ProverRegistry) proverHasFilter(
|
|
info *consensus.ProverInfo,
|
|
filterKey string,
|
|
) bool {
|
|
if info == nil {
|
|
return false
|
|
}
|
|
for _, allocation := range info.Allocations {
|
|
if string(allocation.ConfirmationFilter) == filterKey {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (r *ProverRegistry) removeFilterCacheEntry(
|
|
filterKey string,
|
|
info *consensus.ProverInfo,
|
|
) {
|
|
provers, ok := r.filterCache[filterKey]
|
|
if !ok {
|
|
return
|
|
}
|
|
for i, candidate := range provers {
|
|
if candidate == info {
|
|
r.filterCache[filterKey] = append(
|
|
provers[:i],
|
|
provers[i+1:]...,
|
|
)
|
|
break
|
|
}
|
|
}
|
|
if len(r.filterCache[filterKey]) == 0 {
|
|
delete(r.filterCache, filterKey)
|
|
}
|
|
}
|
|
|
|
// Helper method to get provers by status, returns lexicographic order
|
|
func (r *ProverRegistry) getProversByStatusInternal(
|
|
filter []byte,
|
|
status consensus.ProverStatus,
|
|
) ([]*consensus.ProverInfo, error) {
|
|
var result []*consensus.ProverInfo
|
|
|
|
for _, info := range r.filterCache[string(filter)] {
|
|
for _, allocation := range info.Allocations {
|
|
if allocation.Status == status && bytes.Equal(
|
|
allocation.ConfirmationFilter,
|
|
filter,
|
|
) {
|
|
result = append(result, info)
|
|
break // Only add each prover once
|
|
}
|
|
}
|
|
}
|
|
|
|
sort.Slice(result, func(i, j int) bool {
|
|
return bytes.Compare(result[i].Address, result[j].Address) == -1
|
|
})
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// Helper method to add a prover to a trie
|
|
func (r *ProverRegistry) addProverToTrie(
|
|
address []byte,
|
|
publicKey []byte,
|
|
filter []byte,
|
|
frameNumber uint64,
|
|
) error {
|
|
var trie *tries.RollingFrecencyCritbitTrie
|
|
var filterStr string
|
|
|
|
r.logger.Debug(
|
|
"adding prover to trie",
|
|
zap.String("address", fmt.Sprintf("%x", address)),
|
|
zap.String("filter", fmt.Sprintf("%x", filter)),
|
|
zap.Uint64("frame_number", frameNumber),
|
|
)
|
|
|
|
if len(filter) == 0 {
|
|
trie = r.globalTrie
|
|
filterStr = ""
|
|
r.logger.Debug("adding to global trie")
|
|
} else {
|
|
filterStr = string(filter)
|
|
if _, exists := r.shardTries[filterStr]; !exists {
|
|
r.logger.Debug(
|
|
"creating new shard trie",
|
|
zap.String("filter", fmt.Sprintf("%x", filter)),
|
|
)
|
|
r.shardTries[filterStr] = &tries.RollingFrecencyCritbitTrie{}
|
|
}
|
|
trie = r.shardTries[filterStr]
|
|
r.logger.Debug(
|
|
"adding to shard trie",
|
|
zap.String("filter", fmt.Sprintf("%x", filter)),
|
|
)
|
|
}
|
|
|
|
// Add to trie using address as key
|
|
trie.Add(address, frameNumber)
|
|
|
|
// Track which trie this address is in
|
|
if filters, exists := r.addressToFilters[string(address)]; exists {
|
|
// Check if filter is already tracked
|
|
found := false
|
|
for _, f := range filters {
|
|
if f == filterStr {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
r.addressToFilters[string(address)] = append(filters, filterStr)
|
|
r.logger.Debug(
|
|
"added filter to address tracking",
|
|
zap.String("address", fmt.Sprintf("%x", address)),
|
|
zap.Int("filter_count", len(r.addressToFilters[string(address)])),
|
|
)
|
|
}
|
|
} else {
|
|
r.addressToFilters[string(address)] = []string{filterStr}
|
|
r.logger.Debug(
|
|
"created new address filter tracking",
|
|
zap.String("address", fmt.Sprintf("%x", address)),
|
|
)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Helper method to remove a prover from a trie
|
|
func (r *ProverRegistry) removeProverFromTrie(
|
|
address []byte,
|
|
filter []byte,
|
|
) error {
|
|
var trie *tries.RollingFrecencyCritbitTrie
|
|
var filterStr string
|
|
|
|
r.logger.Debug(
|
|
"removing prover from trie",
|
|
zap.String("address", fmt.Sprintf("%x", address)),
|
|
zap.String("filter", fmt.Sprintf("%x", filter)),
|
|
)
|
|
|
|
if len(filter) == 0 {
|
|
trie = r.globalTrie
|
|
filterStr = ""
|
|
r.logger.Debug("removing from global trie")
|
|
} else {
|
|
filterStr = string(filter)
|
|
if shardTrie, exists := r.shardTries[filterStr]; exists {
|
|
trie = shardTrie
|
|
r.logger.Debug(
|
|
"removing from shard trie",
|
|
zap.String("filter", fmt.Sprintf("%x", filter)),
|
|
)
|
|
} else {
|
|
r.logger.Debug(
|
|
"shard trie doesn't exist, nothing to remove",
|
|
zap.String("filter", fmt.Sprintf("%x", filter)),
|
|
)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Remove from trie
|
|
trie.Remove(address)
|
|
|
|
// Update tracking
|
|
if filters, exists := r.addressToFilters[string(address)]; exists {
|
|
newFilters := []string{}
|
|
for _, f := range filters {
|
|
if f != filterStr {
|
|
newFilters = append(newFilters, f)
|
|
}
|
|
}
|
|
if len(newFilters) > 0 {
|
|
r.addressToFilters[string(address)] = newFilters
|
|
r.logger.Debug(
|
|
"updated address filter tracking",
|
|
zap.String("address", fmt.Sprintf("%x", address)),
|
|
zap.Int("remaining_filters", len(newFilters)),
|
|
)
|
|
} else {
|
|
delete(r.addressToFilters, string(address))
|
|
r.logger.Debug(
|
|
"removed address from filter tracking",
|
|
zap.String("address", fmt.Sprintf("%x", address)),
|
|
)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Refresh implements ProverRegistry
|
|
func (r *ProverRegistry) Refresh() error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
r.logger.Debug("refreshing prover registry")
|
|
|
|
// Clear existing state
|
|
r.globalTrie = &tries.RollingFrecencyCritbitTrie{}
|
|
r.shardTries = make(map[string]*tries.RollingFrecencyCritbitTrie)
|
|
r.proverCache = make(map[string]*consensus.ProverInfo)
|
|
r.filterCache = make(map[string][]*consensus.ProverInfo)
|
|
r.addressToFilters = make(map[string][]string)
|
|
|
|
r.logger.Debug("cleared existing registry state")
|
|
|
|
// Re-extract from hypergraph
|
|
if err := r.extractGlobalState(); err != nil {
|
|
r.logger.Debug("failed to refresh registry", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
r.logger.Debug("prover registry refreshed successfully")
|
|
return nil
|
|
}
|
|
|
|
// extractGlobalState reads the current state from the hypergraph
|
|
func (r *ProverRegistry) extractGlobalState() error {
|
|
// If no hypergraph is provided (e.g. in tests), skip extraction
|
|
if r.hypergraph == nil {
|
|
r.logger.Warn("no hypergraph provided to registry")
|
|
return nil
|
|
}
|
|
|
|
// Use the new iterator to iterate over all vertices under
|
|
// GLOBAL_INTRINSIC_ADDRESS
|
|
iter := r.hypergraph.GetVertexDataIterator(
|
|
intrinsics.GLOBAL_INTRINSIC_ADDRESS,
|
|
)
|
|
defer iter.Close()
|
|
|
|
proversFound := 0
|
|
allocationsFound := 0
|
|
|
|
// Iterate through all vertices
|
|
for iter.First(); iter.Valid(); iter.Next() {
|
|
// Get the vertex data
|
|
data := iter.Value()
|
|
if data == nil {
|
|
// Vertex has been removed, skip it
|
|
continue
|
|
}
|
|
|
|
// Skip vertices with nil roots (e.g., spent merge markers)
|
|
if data.Root == nil {
|
|
continue
|
|
}
|
|
|
|
// Get the key which is always 64 bytes (domain + data address)
|
|
key := make([]byte, 64)
|
|
copy(key, iter.Key())
|
|
|
|
// Check if this is a Prover or ProverAllocation based on the schema
|
|
typeName, err := r.rdfMultiprover.GetType(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
|
|
data,
|
|
)
|
|
if err != nil {
|
|
return errors.Wrap(err, "extract global state")
|
|
}
|
|
|
|
switch typeName {
|
|
case "prover:Prover":
|
|
// Extract the prover address (last 32 bytes of the iterator key)
|
|
proverAddress := key[32:]
|
|
|
|
publicKey, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"prover:Prover",
|
|
"PublicKey",
|
|
data,
|
|
)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
// This is a Prover vertex
|
|
statusBytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"prover:Prover",
|
|
"Status",
|
|
data,
|
|
)
|
|
if err != nil || len(statusBytes) == 0 {
|
|
continue
|
|
}
|
|
status := statusBytes[0]
|
|
|
|
// Map internal status to our ProverStatus enum
|
|
var mappedStatus consensus.ProverStatus
|
|
switch status {
|
|
case 0:
|
|
mappedStatus = consensus.ProverStatusJoining
|
|
case 1:
|
|
mappedStatus = consensus.ProverStatusActive
|
|
case 2:
|
|
mappedStatus = consensus.ProverStatusPaused
|
|
case 3:
|
|
mappedStatus = consensus.ProverStatusLeaving
|
|
case 4:
|
|
// Skip left provers
|
|
continue
|
|
default:
|
|
mappedStatus = consensus.ProverStatusUnknown
|
|
}
|
|
|
|
// Extract available storage
|
|
var availableStorage uint64
|
|
storageBytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"prover:Prover",
|
|
"AvailableStorage",
|
|
data,
|
|
)
|
|
if err == nil && len(storageBytes) >= 8 {
|
|
availableStorage = binary.BigEndian.Uint64(storageBytes)
|
|
}
|
|
|
|
// Extract seniority
|
|
var seniority uint64
|
|
seniorityBytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"prover:Prover",
|
|
"Seniority",
|
|
data,
|
|
)
|
|
if err == nil && len(seniorityBytes) >= 8 {
|
|
seniority = binary.BigEndian.Uint64(seniorityBytes)
|
|
}
|
|
|
|
// Extract delegate address
|
|
delegateAddress, _ := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"prover:Prover",
|
|
"DelegateAddress",
|
|
data,
|
|
)
|
|
|
|
// Extract kick frame number
|
|
var kickFrameNumber uint64
|
|
kickFrameBytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"prover:Prover",
|
|
"KickFrameNumber",
|
|
data,
|
|
)
|
|
if err == nil && len(kickFrameBytes) >= 8 {
|
|
kickFrameNumber = binary.BigEndian.Uint64(kickFrameBytes)
|
|
}
|
|
|
|
// Create or update ProverInfo
|
|
proverInfo, exists := r.proverCache[string(proverAddress)]
|
|
if !exists {
|
|
proverInfo = &consensus.ProverInfo{
|
|
PublicKey: publicKey,
|
|
Address: proverAddress,
|
|
Status: mappedStatus,
|
|
AvailableStorage: availableStorage,
|
|
Seniority: seniority,
|
|
DelegateAddress: delegateAddress,
|
|
KickFrameNumber: kickFrameNumber,
|
|
Allocations: []consensus.ProverAllocationInfo{},
|
|
}
|
|
r.proverCache[string(proverAddress)] = proverInfo
|
|
} else {
|
|
// Update existing prover info
|
|
proverInfo.PublicKey = publicKey
|
|
proverInfo.Status = mappedStatus
|
|
proverInfo.AvailableStorage = availableStorage
|
|
proverInfo.Seniority = seniority
|
|
proverInfo.DelegateAddress = delegateAddress
|
|
proverInfo.KickFrameNumber = kickFrameNumber
|
|
|
|
for _, allocation := range proverInfo.Allocations {
|
|
if allocation.Status == consensus.ProverStatusActive {
|
|
if err := r.addProverToTrie(
|
|
proverAddress,
|
|
proverInfo.PublicKey,
|
|
allocation.ConfirmationFilter,
|
|
allocation.LastActiveFrameNumber,
|
|
); err != nil {
|
|
return errors.Wrap(err, "extract global state")
|
|
}
|
|
}
|
|
|
|
info, ok := r.filterCache[string(allocation.ConfirmationFilter)]
|
|
if !ok {
|
|
r.filterCache[string(
|
|
allocation.ConfirmationFilter,
|
|
)] = []*consensus.ProverInfo{proverInfo}
|
|
} else {
|
|
index := sort.Search(len(info), func(i int) bool {
|
|
return bytes.Compare(info[i].Address, proverAddress) >= 0
|
|
})
|
|
skipAdd := false
|
|
for _, i := range r.filterCache[string(
|
|
allocation.ConfirmationFilter,
|
|
)] {
|
|
if bytes.Equal(i.Address, proverAddress) {
|
|
skipAdd = true
|
|
break
|
|
}
|
|
}
|
|
if !skipAdd {
|
|
r.filterCache[string(
|
|
allocation.ConfirmationFilter,
|
|
)] = slices.Insert(info, index, proverInfo)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
proversFound++
|
|
case "allocation:ProverAllocation":
|
|
// Try to read as ProverAllocation
|
|
proverRef, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"Prover",
|
|
data,
|
|
)
|
|
if err != nil || len(proverRef) == 0 {
|
|
// Neither Prover nor ProverAllocation, skip
|
|
continue
|
|
}
|
|
|
|
// This is a ProverAllocation vertex
|
|
statusBytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"Status",
|
|
data,
|
|
)
|
|
if err != nil || len(statusBytes) == 0 {
|
|
continue
|
|
}
|
|
status := statusBytes[0]
|
|
|
|
// Map allocation status
|
|
var mappedStatus consensus.ProverStatus
|
|
switch status {
|
|
case 0:
|
|
mappedStatus = consensus.ProverStatusJoining
|
|
case 1:
|
|
mappedStatus = consensus.ProverStatusActive
|
|
case 2:
|
|
mappedStatus = consensus.ProverStatusPaused
|
|
case 3:
|
|
mappedStatus = consensus.ProverStatusLeaving
|
|
case 4:
|
|
mappedStatus = consensus.ProverStatusRejected
|
|
case 5:
|
|
mappedStatus = consensus.ProverStatusKicked
|
|
default:
|
|
mappedStatus = consensus.ProverStatusUnknown
|
|
}
|
|
|
|
// Extract filters
|
|
confirmationFilter, _ := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"ConfirmationFilter",
|
|
data,
|
|
)
|
|
rejectionFilter, _ := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"RejectionFilter",
|
|
data,
|
|
)
|
|
|
|
// Extract frame numbers
|
|
var joinFrameNumber,
|
|
leaveFrameNumber,
|
|
pauseFrameNumber,
|
|
resumeFrameNumber,
|
|
kickFrameNumber,
|
|
joinConfirmFrameNumber,
|
|
joinRejectFrameNumber,
|
|
leaveConfirmFrameNumber,
|
|
leaveRejectFrameNumber,
|
|
lastActiveFrameNumber uint64
|
|
|
|
if bytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"JoinFrameNumber",
|
|
data,
|
|
); err == nil && len(bytes) >= 8 {
|
|
joinFrameNumber = binary.BigEndian.Uint64(bytes)
|
|
}
|
|
if bytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"LeaveFrameNumber",
|
|
data,
|
|
); err == nil && len(bytes) >= 8 {
|
|
leaveFrameNumber = binary.BigEndian.Uint64(bytes)
|
|
}
|
|
if bytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"PauseFrameNumber",
|
|
data,
|
|
); err == nil && len(bytes) >= 8 {
|
|
pauseFrameNumber = binary.BigEndian.Uint64(bytes)
|
|
}
|
|
if bytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"ResumeFrameNumber",
|
|
data,
|
|
); err == nil && len(bytes) >= 8 {
|
|
resumeFrameNumber = binary.BigEndian.Uint64(bytes)
|
|
}
|
|
if bytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"KickFrameNumber",
|
|
data,
|
|
); err == nil && len(bytes) >= 8 {
|
|
kickFrameNumber = binary.BigEndian.Uint64(bytes)
|
|
}
|
|
if bytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"JoinConfirmFrameNumber",
|
|
data,
|
|
); err == nil && len(bytes) >= 8 {
|
|
joinConfirmFrameNumber = binary.BigEndian.Uint64(bytes)
|
|
}
|
|
if bytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"JoinRejectFrameNumber",
|
|
data,
|
|
); err == nil && len(bytes) >= 8 {
|
|
joinRejectFrameNumber = binary.BigEndian.Uint64(bytes)
|
|
}
|
|
if bytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"LeaveConfirmFrameNumber",
|
|
data,
|
|
); err == nil && len(bytes) >= 8 {
|
|
leaveConfirmFrameNumber = binary.BigEndian.Uint64(bytes)
|
|
}
|
|
if bytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"LeaveRejectFrameNumber",
|
|
data,
|
|
); err == nil && len(bytes) >= 8 {
|
|
leaveRejectFrameNumber = binary.BigEndian.Uint64(bytes)
|
|
}
|
|
if bytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"LastActiveFrameNumber",
|
|
data,
|
|
); err == nil && len(bytes) >= 8 {
|
|
lastActiveFrameNumber = binary.BigEndian.Uint64(bytes)
|
|
}
|
|
|
|
// Create allocation info - key[32:] contains the allocation vertex address
|
|
allocationInfo := consensus.ProverAllocationInfo{
|
|
Status: mappedStatus,
|
|
ConfirmationFilter: confirmationFilter,
|
|
RejectionFilter: rejectionFilter,
|
|
JoinFrameNumber: joinFrameNumber,
|
|
LeaveFrameNumber: leaveFrameNumber,
|
|
PauseFrameNumber: pauseFrameNumber,
|
|
ResumeFrameNumber: resumeFrameNumber,
|
|
KickFrameNumber: kickFrameNumber,
|
|
JoinConfirmFrameNumber: joinConfirmFrameNumber,
|
|
JoinRejectFrameNumber: joinRejectFrameNumber,
|
|
LeaveConfirmFrameNumber: leaveConfirmFrameNumber,
|
|
LeaveRejectFrameNumber: leaveRejectFrameNumber,
|
|
LastActiveFrameNumber: lastActiveFrameNumber,
|
|
VertexAddress: append([]byte(nil), key[32:]...),
|
|
}
|
|
|
|
// Create or update ProverInfo
|
|
proverInfo, exists := r.proverCache[string(proverRef)]
|
|
if !exists {
|
|
proverInfo = &consensus.ProverInfo{
|
|
Address: proverRef,
|
|
Allocations: []consensus.ProverAllocationInfo{},
|
|
}
|
|
r.proverCache[string(proverRef)] = proverInfo
|
|
}
|
|
|
|
// Add this allocation to the prover
|
|
r.proverCache[string(proverRef)].Allocations = append(
|
|
r.proverCache[string(proverRef)].Allocations,
|
|
allocationInfo,
|
|
)
|
|
info, ok := r.filterCache[string(allocationInfo.ConfirmationFilter)]
|
|
if !ok {
|
|
r.filterCache[string(
|
|
allocationInfo.ConfirmationFilter,
|
|
)] = []*consensus.ProverInfo{proverInfo}
|
|
} else {
|
|
index := sort.Search(len(info), func(i int) bool {
|
|
return bytes.Compare(info[i].Address, proverRef) >= 0
|
|
})
|
|
skipAdd := false
|
|
for _, i := range r.filterCache[string(
|
|
allocationInfo.ConfirmationFilter,
|
|
)] {
|
|
if bytes.Equal(i.Address, proverRef) {
|
|
skipAdd = true
|
|
break
|
|
}
|
|
}
|
|
if !skipAdd {
|
|
r.filterCache[string(
|
|
allocationInfo.ConfirmationFilter,
|
|
)] = slices.Insert(info, index, proverInfo)
|
|
}
|
|
}
|
|
|
|
// If allocation is active and we can identify them, add to
|
|
// filter-specific trie
|
|
if mappedStatus == consensus.ProverStatusActive &&
|
|
len(r.proverCache[string(proverRef)].PublicKey) != 0 {
|
|
if err := r.addProverToTrie(
|
|
proverRef,
|
|
r.proverCache[string(proverRef)].PublicKey,
|
|
confirmationFilter,
|
|
lastActiveFrameNumber,
|
|
); err != nil {
|
|
return errors.Wrap(err, "extract global state")
|
|
}
|
|
}
|
|
allocationsFound++
|
|
}
|
|
}
|
|
|
|
r.logger.Debug(
|
|
"global state extraction completed",
|
|
zap.Int("provers_found", proversFound),
|
|
zap.Int("allocations_found", allocationsFound),
|
|
zap.Int("cached_provers", len(r.proverCache)),
|
|
zap.Int("shard_tries", len(r.shardTries)),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
// ExtractProversFromTransactions processes a list of transactions to discover
|
|
// prover addresses. This can be called during initial sync to discover active
|
|
// provers from recent transactions.
|
|
func (r *ProverRegistry) ExtractProversFromTransactions(
|
|
transactions []state.StateChange,
|
|
) error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
// Track unique addresses we've seen
|
|
seenAddresses := make(map[string]bool)
|
|
|
|
for _, tx := range transactions {
|
|
// Check if this is a prover-related transaction under
|
|
// GLOBAL_INTRINSIC_ADDRESS
|
|
if len(tx.Domain) == 32 && bytes.Equal(
|
|
tx.Domain,
|
|
intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
|
|
) {
|
|
typeName, err := r.rdfMultiprover.GetType(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
|
|
tx.Value.DataValue(),
|
|
)
|
|
if err != nil {
|
|
r.logger.Debug(
|
|
"failed to get type name",
|
|
zap.String("address", fmt.Sprintf("%x", tx.Address)),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
if typeName == "prover:Prover" {
|
|
if !seenAddresses[string(tx.Address)] {
|
|
seenAddresses[string(tx.Address)] = true
|
|
r.logger.Debug(
|
|
"extracting prover from transaction",
|
|
zap.String("address", fmt.Sprintf("%x", tx.Address)),
|
|
)
|
|
// Extract this prover's information
|
|
if err := r.extractProverFromAddress(tx.Address); err != nil {
|
|
// Log error but continue with other provers
|
|
r.logger.Debug(
|
|
"failed to extract prover from address",
|
|
zap.String("address", fmt.Sprintf("%x", tx.Address)),
|
|
zap.Error(err),
|
|
)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// For alt-provers, you'd want to cover insertions for your alt basis shard
|
|
// here.
|
|
}
|
|
|
|
r.logger.Debug(
|
|
"extracted provers from transactions",
|
|
zap.Int("unique_provers_found", len(seenAddresses)),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
// processProverChange processes a single state change for a prover
|
|
func (r *ProverRegistry) processProverChange(
|
|
change state.StateChange,
|
|
frameNumber uint64,
|
|
) error {
|
|
// Extract the prover address from the change address
|
|
proverAddress := change.Address
|
|
|
|
switch change.StateChange {
|
|
case state.CreateStateChangeEvent, state.UpdateStateChangeEvent:
|
|
if !bytes.Equal(change.Discriminator, hgstate.VertexAddsDiscriminator) {
|
|
return nil
|
|
}
|
|
|
|
// A prover was created or updated
|
|
if change.Value != nil && change.Value.DataValue() != nil {
|
|
data := change.Value.DataValue()
|
|
|
|
t, err := r.rdfMultiprover.GetType(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
|
|
data,
|
|
)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
// Check if this is a Prover or ProverAllocation
|
|
switch t {
|
|
case "prover:Prover":
|
|
publicKey, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"prover:Prover",
|
|
"PublicKey",
|
|
data,
|
|
)
|
|
if err != nil {
|
|
r.logger.Debug("no public key")
|
|
return nil
|
|
}
|
|
|
|
statusBytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"prover:Prover",
|
|
"Status",
|
|
data,
|
|
)
|
|
if err != nil || len(statusBytes) == 0 {
|
|
r.logger.Debug("no status")
|
|
return nil // Skip if no status
|
|
}
|
|
status := statusBytes[0]
|
|
|
|
// Map internal status to our ProverStatus enum
|
|
var mappedStatus consensus.ProverStatus
|
|
switch status {
|
|
case 0:
|
|
mappedStatus = consensus.ProverStatusJoining
|
|
case 1:
|
|
mappedStatus = consensus.ProverStatusActive
|
|
case 2:
|
|
mappedStatus = consensus.ProverStatusPaused
|
|
case 3:
|
|
mappedStatus = consensus.ProverStatusLeaving
|
|
case 4:
|
|
// Left status - remove from registry
|
|
return r.removeProver(proverAddress)
|
|
default:
|
|
mappedStatus = consensus.ProverStatusUnknown
|
|
}
|
|
|
|
// Extract available storage
|
|
var availableStorage uint64
|
|
storageBytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"prover:Prover",
|
|
"AvailableStorage",
|
|
data,
|
|
)
|
|
if err == nil && len(storageBytes) >= 8 {
|
|
availableStorage = binary.BigEndian.Uint64(storageBytes)
|
|
}
|
|
|
|
// Extract seniority
|
|
var seniority uint64
|
|
seniorityBytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"prover:Prover",
|
|
"Seniority",
|
|
data,
|
|
)
|
|
if err == nil && len(seniorityBytes) >= 8 {
|
|
seniority = binary.BigEndian.Uint64(seniorityBytes)
|
|
}
|
|
|
|
// Extract delegate address
|
|
delegateAddress, _ := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"prover:Prover",
|
|
"DelegateAddress",
|
|
data,
|
|
)
|
|
|
|
// Extract kick frame number
|
|
var kickFrameNumber uint64
|
|
kickFrameBytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"prover:Prover",
|
|
"KickFrameNumber",
|
|
data,
|
|
)
|
|
if err == nil && len(kickFrameBytes) >= 8 {
|
|
kickFrameNumber = binary.BigEndian.Uint64(kickFrameBytes)
|
|
}
|
|
|
|
// Create or update ProverInfo
|
|
proverInfo, exists := r.proverCache[string(proverAddress)]
|
|
if !exists {
|
|
proverInfo = &consensus.ProverInfo{
|
|
PublicKey: publicKey,
|
|
Address: proverAddress,
|
|
Status: mappedStatus,
|
|
AvailableStorage: availableStorage,
|
|
Seniority: seniority,
|
|
DelegateAddress: delegateAddress,
|
|
KickFrameNumber: kickFrameNumber,
|
|
Allocations: []consensus.ProverAllocationInfo{},
|
|
}
|
|
r.proverCache[string(proverAddress)] = proverInfo
|
|
} else {
|
|
// Update existing prover info
|
|
proverInfo.Status = mappedStatus
|
|
proverInfo.AvailableStorage = availableStorage
|
|
proverInfo.Seniority = seniority
|
|
proverInfo.DelegateAddress = delegateAddress
|
|
proverInfo.KickFrameNumber = kickFrameNumber
|
|
}
|
|
case "allocation:ProverAllocation":
|
|
proverRef, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"Prover",
|
|
data,
|
|
)
|
|
if err != nil || len(proverRef) == 0 {
|
|
r.logger.Debug("no prover")
|
|
return nil
|
|
}
|
|
|
|
// This is a ProverAllocation vertex
|
|
statusBytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"Status",
|
|
data,
|
|
)
|
|
if err != nil || len(statusBytes) == 0 {
|
|
r.logger.Debug("no status")
|
|
return nil
|
|
}
|
|
status := statusBytes[0]
|
|
|
|
// Map allocation status
|
|
var mappedStatus consensus.ProverStatus
|
|
switch status {
|
|
case 0:
|
|
mappedStatus = consensus.ProverStatusJoining
|
|
case 1:
|
|
mappedStatus = consensus.ProverStatusActive
|
|
case 2:
|
|
mappedStatus = consensus.ProverStatusPaused
|
|
case 3:
|
|
mappedStatus = consensus.ProverStatusLeaving
|
|
case 4:
|
|
mappedStatus = consensus.ProverStatusRejected
|
|
case 5:
|
|
mappedStatus = consensus.ProverStatusKicked
|
|
default:
|
|
mappedStatus = consensus.ProverStatusUnknown
|
|
}
|
|
|
|
// Extract data
|
|
confirmationFilter, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"ConfirmationFilter",
|
|
data,
|
|
)
|
|
if err != nil {
|
|
return errors.Wrap(err, "process prover change")
|
|
}
|
|
|
|
rejectionFilter, _ := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"RejectionFilter",
|
|
data,
|
|
)
|
|
|
|
joinFrameNumberBytes, _ := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"JoinFrameNumber",
|
|
data,
|
|
)
|
|
|
|
var joinFrameNumber uint64
|
|
if len(joinFrameNumberBytes) != 0 {
|
|
joinFrameNumber = binary.BigEndian.Uint64(joinFrameNumberBytes)
|
|
}
|
|
|
|
leaveFrameNumberBytes, _ := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"LeaveFrameNumber",
|
|
data,
|
|
)
|
|
|
|
var leaveFrameNumber uint64
|
|
if len(leaveFrameNumberBytes) != 0 {
|
|
leaveFrameNumber = binary.BigEndian.Uint64(leaveFrameNumberBytes)
|
|
}
|
|
|
|
pauseFrameNumberBytes, _ := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"PauseFrameNumber",
|
|
data,
|
|
)
|
|
|
|
var pauseFrameNumber uint64
|
|
if len(pauseFrameNumberBytes) != 0 {
|
|
pauseFrameNumber = binary.BigEndian.Uint64(pauseFrameNumberBytes)
|
|
}
|
|
|
|
resumeFrameNumberBytes, _ := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"ResumeFrameNumber",
|
|
data,
|
|
)
|
|
|
|
var resumeFrameNumber uint64
|
|
if len(resumeFrameNumberBytes) != 0 {
|
|
resumeFrameNumber = binary.BigEndian.Uint64(resumeFrameNumberBytes)
|
|
}
|
|
|
|
kickFrameNumberBytes, _ := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"KickFrameNumber",
|
|
data,
|
|
)
|
|
|
|
var kickFrameNumber uint64
|
|
if len(kickFrameNumberBytes) != 0 {
|
|
kickFrameNumber = binary.BigEndian.Uint64(kickFrameNumberBytes)
|
|
}
|
|
|
|
joinConfirmFrameNumberBytes, _ := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"JoinConfirmFrameNumber",
|
|
data,
|
|
)
|
|
|
|
var joinConfirmFrameNumber uint64
|
|
if len(joinConfirmFrameNumberBytes) != 0 {
|
|
joinConfirmFrameNumber = binary.BigEndian.Uint64(
|
|
joinConfirmFrameNumberBytes,
|
|
)
|
|
}
|
|
|
|
joinRejectFrameNumberBytes, _ := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"JoinRejectFrameNumber",
|
|
data,
|
|
)
|
|
|
|
var joinRejectFrameNumber uint64
|
|
if len(joinRejectFrameNumberBytes) != 0 {
|
|
joinRejectFrameNumber = binary.BigEndian.Uint64(
|
|
joinRejectFrameNumberBytes,
|
|
)
|
|
}
|
|
|
|
leaveConfirmFrameNumberBytes, _ := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"LeaveConfirmFrameNumber",
|
|
data,
|
|
)
|
|
|
|
var leaveConfirmFrameNumber uint64
|
|
if len(leaveConfirmFrameNumberBytes) != 0 {
|
|
leaveConfirmFrameNumber = binary.BigEndian.Uint64(
|
|
leaveConfirmFrameNumberBytes,
|
|
)
|
|
}
|
|
|
|
leaveRejectFrameNumberBytes, _ := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"LeaveRejectFrameNumber",
|
|
data,
|
|
)
|
|
|
|
var leaveRejectFrameNumber uint64
|
|
if len(leaveRejectFrameNumberBytes) != 0 {
|
|
leaveRejectFrameNumber = binary.BigEndian.Uint64(
|
|
leaveRejectFrameNumberBytes,
|
|
)
|
|
}
|
|
|
|
lastActiveFrameNumberBytes, _ := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"allocation:ProverAllocation",
|
|
"LastActiveFrameNumber",
|
|
data,
|
|
)
|
|
|
|
var lastActiveFrameNumber uint64
|
|
if len(lastActiveFrameNumberBytes) != 0 {
|
|
lastActiveFrameNumber = binary.BigEndian.Uint64(
|
|
lastActiveFrameNumberBytes,
|
|
)
|
|
}
|
|
|
|
// Find the prover this allocation belongs to
|
|
if proverInfo, exists := r.proverCache[string(proverRef)]; exists {
|
|
found := false
|
|
for i, allocation := range proverInfo.Allocations {
|
|
if bytes.Equal(allocation.ConfirmationFilter, confirmationFilter) {
|
|
proverInfo.Allocations[i].Status = mappedStatus
|
|
proverInfo.Allocations[i].RejectionFilter = rejectionFilter
|
|
proverInfo.Allocations[i].JoinFrameNumber = joinFrameNumber
|
|
proverInfo.Allocations[i].LeaveFrameNumber = leaveFrameNumber
|
|
proverInfo.Allocations[i].PauseFrameNumber = pauseFrameNumber
|
|
proverInfo.Allocations[i].ResumeFrameNumber = resumeFrameNumber
|
|
proverInfo.Allocations[i].KickFrameNumber = kickFrameNumber
|
|
proverInfo.Allocations[i].JoinConfirmFrameNumber =
|
|
joinConfirmFrameNumber
|
|
proverInfo.Allocations[i].JoinRejectFrameNumber =
|
|
joinRejectFrameNumber
|
|
proverInfo.Allocations[i].LeaveConfirmFrameNumber =
|
|
leaveConfirmFrameNumber
|
|
proverInfo.Allocations[i].LeaveRejectFrameNumber =
|
|
leaveRejectFrameNumber
|
|
proverInfo.Allocations[i].LastActiveFrameNumber =
|
|
lastActiveFrameNumber
|
|
// Ensure VertexAddress is set (for backwards compatibility)
|
|
if len(proverInfo.Allocations[i].VertexAddress) == 0 {
|
|
proverInfo.Allocations[i].VertexAddress =
|
|
append([]byte(nil), proverAddress...)
|
|
}
|
|
found = true
|
|
}
|
|
}
|
|
|
|
if !found {
|
|
proverInfo.Allocations = append(
|
|
proverInfo.Allocations,
|
|
consensus.ProverAllocationInfo{
|
|
Status: mappedStatus,
|
|
ConfirmationFilter: confirmationFilter,
|
|
RejectionFilter: rejectionFilter,
|
|
JoinFrameNumber: joinFrameNumber,
|
|
LeaveFrameNumber: leaveFrameNumber,
|
|
PauseFrameNumber: pauseFrameNumber,
|
|
ResumeFrameNumber: resumeFrameNumber,
|
|
KickFrameNumber: kickFrameNumber,
|
|
JoinConfirmFrameNumber: joinConfirmFrameNumber,
|
|
JoinRejectFrameNumber: joinRejectFrameNumber,
|
|
LeaveConfirmFrameNumber: leaveConfirmFrameNumber,
|
|
LeaveRejectFrameNumber: leaveRejectFrameNumber,
|
|
LastActiveFrameNumber: lastActiveFrameNumber,
|
|
VertexAddress: append([]byte(nil), proverAddress...),
|
|
},
|
|
)
|
|
}
|
|
|
|
// Update tries based on allocation status
|
|
if mappedStatus == consensus.ProverStatusActive &&
|
|
len(confirmationFilter) > 0 {
|
|
if err := r.addProverToTrie(
|
|
proverRef,
|
|
proverInfo.PublicKey,
|
|
confirmationFilter,
|
|
frameNumber,
|
|
); err != nil {
|
|
return errors.Wrap(err, "process prover change")
|
|
}
|
|
} else if mappedStatus == consensus.ProverStatusKicked ||
|
|
mappedStatus == consensus.ProverStatusUnknown {
|
|
// Remove from filter trie if not active
|
|
if err := r.removeProverFromTrie(
|
|
proverRef,
|
|
confirmationFilter,
|
|
); err != nil {
|
|
return errors.Wrap(
|
|
err,
|
|
"process prover change",
|
|
)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
case state.DeleteStateChangeEvent:
|
|
// A prover was deleted
|
|
return r.removeProver(proverAddress)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// removeProver removes a prover from all internal structures
|
|
func (r *ProverRegistry) removeProver(proverAddress []byte) error {
|
|
r.logger.Debug(
|
|
"removing prover from registry",
|
|
zap.String("address", fmt.Sprintf("%x", proverAddress)),
|
|
)
|
|
|
|
// Get prover info to know which tries to remove from
|
|
if info, exists := r.proverCache[string(proverAddress)]; exists {
|
|
// Remove from all tries this prover is in
|
|
// First remove from global trie if it's there
|
|
if err := r.removeProverFromTrie(proverAddress, nil); err != nil {
|
|
return errors.Wrap(err, "failed to remove from global trie")
|
|
}
|
|
|
|
// Then remove from all filter-specific tries based on allocations
|
|
for _, allocation := range info.Allocations {
|
|
if len(allocation.ConfirmationFilter) > 0 {
|
|
if err := r.removeProverFromTrie(
|
|
proverAddress,
|
|
allocation.ConfirmationFilter,
|
|
); err != nil {
|
|
return errors.Wrap(err, "failed to remove from filter trie")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Remove from cache
|
|
delete(r.proverCache, string(proverAddress))
|
|
|
|
// Remove from address to filters mapping
|
|
delete(r.addressToFilters, string(proverAddress))
|
|
|
|
r.logger.Debug(
|
|
"prover removed successfully",
|
|
zap.String("address", fmt.Sprintf("%x", proverAddress)),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
// extractProverFromAddress extracts a single prover's information from the
|
|
// hypergraph
|
|
func (r *ProverRegistry) extractProverFromAddress(
|
|
proverAddress []byte,
|
|
) error {
|
|
r.logger.Debug(
|
|
"extracting prover from address",
|
|
zap.String("address", fmt.Sprintf("%x", proverAddress)),
|
|
)
|
|
|
|
if r.hypergraph == nil {
|
|
r.logger.Debug("no hypergraph available for extraction")
|
|
return nil
|
|
}
|
|
|
|
// Create composite address: GLOBAL_INTRINSIC_ADDRESS + prover address
|
|
fullAddress := [64]byte{}
|
|
copy(fullAddress[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:])
|
|
copy(fullAddress[32:], proverAddress)
|
|
|
|
// Try to get the vertex data
|
|
data, err := r.hypergraph.GetVertexData(fullAddress)
|
|
if err != nil {
|
|
// Prover doesn't exist
|
|
r.logger.Debug(
|
|
"prover vertex not found",
|
|
zap.String("address", fmt.Sprintf("%x", proverAddress)),
|
|
)
|
|
return nil
|
|
}
|
|
|
|
// Extract public key
|
|
publicKey, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"prover:Prover",
|
|
"PublicKey",
|
|
data,
|
|
)
|
|
if err != nil || len(publicKey) == 0 {
|
|
return errors.Wrap(err, "failed to get public key")
|
|
}
|
|
|
|
// Extract status
|
|
statusBytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"prover:Prover",
|
|
"Status",
|
|
data,
|
|
)
|
|
if err != nil || len(statusBytes) == 0 {
|
|
return errors.Wrap(err, "failed to get status")
|
|
}
|
|
status := statusBytes[0]
|
|
|
|
// Map internal status to our ProverStatus enum
|
|
var mappedStatus consensus.ProverStatus
|
|
switch status {
|
|
case 0:
|
|
mappedStatus = consensus.ProverStatusJoining
|
|
case 1:
|
|
mappedStatus = consensus.ProverStatusActive
|
|
case 2:
|
|
mappedStatus = consensus.ProverStatusPaused
|
|
case 3:
|
|
mappedStatus = consensus.ProverStatusLeaving
|
|
case 4:
|
|
// Skip left provers
|
|
r.logger.Debug(
|
|
"skipping left prover",
|
|
zap.String("address", fmt.Sprintf("%x", proverAddress)),
|
|
)
|
|
return nil
|
|
default:
|
|
mappedStatus = consensus.ProverStatusUnknown
|
|
}
|
|
|
|
// Extract available storage
|
|
var availableStorage uint64
|
|
storageBytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"prover:Prover",
|
|
"AvailableStorage",
|
|
data,
|
|
)
|
|
if err == nil && len(storageBytes) >= 8 {
|
|
availableStorage = binary.BigEndian.Uint64(storageBytes)
|
|
}
|
|
|
|
// Extract seniority
|
|
var seniority uint64
|
|
seniorityBytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"prover:Prover",
|
|
"Seniority",
|
|
data,
|
|
)
|
|
if err == nil && len(seniorityBytes) >= 8 {
|
|
seniority = binary.BigEndian.Uint64(seniorityBytes)
|
|
}
|
|
|
|
// Extract delegate address
|
|
delegateAddress, _ := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"prover:Prover",
|
|
"DelegateAddress",
|
|
data,
|
|
)
|
|
|
|
// Extract kick frame number
|
|
var kickFrameNumber uint64
|
|
kickFrameBytes, err := r.rdfMultiprover.Get(
|
|
global.GLOBAL_RDF_SCHEMA,
|
|
"prover:Prover",
|
|
"KickFrameNumber",
|
|
data,
|
|
)
|
|
if err == nil && len(kickFrameBytes) >= 8 {
|
|
kickFrameNumber = binary.BigEndian.Uint64(kickFrameBytes)
|
|
}
|
|
|
|
r.logger.Debug(
|
|
"extracted prover info",
|
|
zap.String("address", fmt.Sprintf("%x", proverAddress)),
|
|
zap.Uint8("status", uint8(mappedStatus)),
|
|
zap.Uint64("available_storage", availableStorage),
|
|
zap.Uint64("seniority", seniority),
|
|
)
|
|
|
|
// Create ProverInfo
|
|
proverInfo := &consensus.ProverInfo{
|
|
PublicKey: publicKey,
|
|
Address: proverAddress, // buildutils:allow-slice-alias slice is static
|
|
Status: mappedStatus,
|
|
AvailableStorage: availableStorage,
|
|
Seniority: seniority,
|
|
DelegateAddress: delegateAddress,
|
|
KickFrameNumber: kickFrameNumber,
|
|
Allocations: []consensus.ProverAllocationInfo{},
|
|
}
|
|
|
|
// Add to cache
|
|
r.proverCache[string(proverAddress)] = proverInfo
|
|
|
|
// Note: Allocations should be handled separately when iterating through
|
|
// allocation vertices
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetAllActiveAppShardProvers implements ProverRegistry
|
|
func (r *ProverRegistry) GetAllActiveAppShardProvers() (
|
|
[]*consensus.ProverInfo,
|
|
error,
|
|
) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
r.logger.Debug("getting all active app shard provers")
|
|
|
|
var result []*consensus.ProverInfo
|
|
|
|
// Iterate through all cached provers
|
|
for _, proverInfo := range r.proverCache {
|
|
// Check if this prover has any active allocations (app shard provers)
|
|
hasActiveAllocation := false
|
|
for _, allocation := range proverInfo.Allocations {
|
|
if allocation.Status == consensus.ProverStatusActive &&
|
|
len(allocation.ConfirmationFilter) > 0 {
|
|
hasActiveAllocation = true
|
|
break
|
|
}
|
|
}
|
|
|
|
// Only include provers with active allocations
|
|
if hasActiveAllocation {
|
|
r.logger.Debug(
|
|
"copying prover info for status",
|
|
zap.String("address", hex.EncodeToString(proverInfo.Address)),
|
|
)
|
|
// Make a copy to avoid external modification
|
|
proverCopy := &consensus.ProverInfo{
|
|
PublicKey: make([]byte, len(proverInfo.PublicKey)),
|
|
Address: make([]byte, len(proverInfo.Address)),
|
|
Status: proverInfo.Status,
|
|
AvailableStorage: proverInfo.AvailableStorage,
|
|
Seniority: proverInfo.Seniority,
|
|
DelegateAddress: make([]byte, len(proverInfo.DelegateAddress)),
|
|
KickFrameNumber: proverInfo.KickFrameNumber,
|
|
Allocations: make(
|
|
[]consensus.ProverAllocationInfo,
|
|
len(proverInfo.Allocations),
|
|
),
|
|
}
|
|
copy(proverCopy.PublicKey, proverInfo.PublicKey)
|
|
copy(proverCopy.Address, proverInfo.Address)
|
|
copy(proverCopy.DelegateAddress, proverInfo.DelegateAddress)
|
|
|
|
// Copy allocations
|
|
for i, allocation := range proverInfo.Allocations {
|
|
r.logger.Debug(
|
|
"copying prover allocation for status",
|
|
zap.String("address", hex.EncodeToString(proverInfo.Address)),
|
|
zap.String(
|
|
"filter",
|
|
hex.EncodeToString(allocation.ConfirmationFilter),
|
|
),
|
|
)
|
|
proverCopy.Allocations[i] = consensus.ProverAllocationInfo{
|
|
Status: allocation.Status,
|
|
ConfirmationFilter: make(
|
|
[]byte,
|
|
len(allocation.ConfirmationFilter),
|
|
),
|
|
RejectionFilter: make(
|
|
[]byte,
|
|
len(allocation.RejectionFilter),
|
|
),
|
|
JoinFrameNumber: allocation.JoinFrameNumber,
|
|
LeaveFrameNumber: allocation.LeaveFrameNumber,
|
|
PauseFrameNumber: allocation.PauseFrameNumber,
|
|
ResumeFrameNumber: allocation.ResumeFrameNumber,
|
|
KickFrameNumber: allocation.KickFrameNumber,
|
|
JoinConfirmFrameNumber: allocation.JoinConfirmFrameNumber,
|
|
JoinRejectFrameNumber: allocation.JoinRejectFrameNumber,
|
|
LeaveConfirmFrameNumber: allocation.LeaveConfirmFrameNumber,
|
|
LeaveRejectFrameNumber: allocation.LeaveRejectFrameNumber,
|
|
LastActiveFrameNumber: allocation.LastActiveFrameNumber,
|
|
VertexAddress: make([]byte, len(allocation.VertexAddress)),
|
|
}
|
|
copy(
|
|
proverCopy.Allocations[i].ConfirmationFilter,
|
|
allocation.ConfirmationFilter,
|
|
)
|
|
copy(
|
|
proverCopy.Allocations[i].RejectionFilter,
|
|
allocation.RejectionFilter,
|
|
)
|
|
copy(
|
|
proverCopy.Allocations[i].VertexAddress,
|
|
allocation.VertexAddress,
|
|
)
|
|
}
|
|
|
|
result = append(result, proverCopy)
|
|
}
|
|
}
|
|
|
|
r.logger.Debug(
|
|
"retrieved active app shard provers",
|
|
zap.Int("count", len(result)),
|
|
)
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// GetProverShardSummaries implements ProverRegistry
|
|
func (r *ProverRegistry) GetProverShardSummaries() (
|
|
[]*consensus.ProverShardSummary,
|
|
error,
|
|
) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
summaries := make([]*consensus.ProverShardSummary, 0, len(r.filterCache))
|
|
for filterKey, provers := range r.filterCache {
|
|
if len(filterKey) == 0 || len(provers) == 0 {
|
|
continue
|
|
}
|
|
statusCounts := make(map[consensus.ProverStatus]int)
|
|
for _, info := range provers {
|
|
counted := false
|
|
for _, alloc := range info.Allocations {
|
|
if len(alloc.ConfirmationFilter) > 0 &&
|
|
string(alloc.ConfirmationFilter) == filterKey {
|
|
statusCounts[alloc.Status]++
|
|
counted = true
|
|
break
|
|
}
|
|
if len(alloc.RejectionFilter) > 0 &&
|
|
string(alloc.RejectionFilter) == filterKey {
|
|
statusCounts[alloc.Status]++
|
|
counted = true
|
|
break
|
|
}
|
|
}
|
|
if !counted {
|
|
statusCounts[info.Status]++
|
|
}
|
|
}
|
|
summary := &consensus.ProverShardSummary{
|
|
Filter: append([]byte(nil), filterKey...),
|
|
StatusCounts: statusCounts,
|
|
}
|
|
summaries = append(summaries, summary)
|
|
}
|
|
|
|
sort.Slice(summaries, func(i, j int) bool {
|
|
return bytes.Compare(summaries[i].Filter, summaries[j].Filter) < 0
|
|
})
|
|
|
|
return summaries, nil
|
|
}
|