ceremonyclient/node/consensus/provers/proposer.go
Cassandra Heart 12996487c3
v2.1.0.18 (#508)
* experiment: reject bad peer info messages

* v2.1.0.18 preview

* add tagged sync

* Add missing hypergraph changes

* small tweaks to sync

* allow local sync, use it for provers with workers

* missing file

* resolve build error

* resolve sync issue, remove raw sync

* resolve deletion promotion bug

* resolve sync abstraction leak from tree deletion changes

* rearrange prover sync

* remove pruning from sync

* restore removed sync flag

* fix: sync, event stream deadlock, heuristic scoring of better shards

* resolve hanging shutdown + pubsub proxy issue

* further bugfixes: sync (restore old leaf sync), pubsub shutdown, merge events

* fix: clean up rust ffi, background coverage events, and sync tweaks

* fix: linking issue for channel, connectivity test aggression, sync regression, join tests

* fix: disjoint sync, improper application of filter

* resolve sync/reel/validation deadlock

* adjust sync to handle no leaf edge cases, multi-path segment traversal

* use simpler sync

* faster, simpler sync with some debug extras

* migration to recalculate

* don't use batch

* square up the roots

* fix nil pointer

* fix: seniority calculation, sync race condition, migration

* make sync dumber

* fix: tree deletion issue

* fix: missing seniority merge request canonical serialization

* address issues from previous commit test

* stale workers should be cleared

* remove missing gap check

* rearrange collect, reduce sync logging noise

* fix: the disjoint leaf/branch sync case

* nuclear option on sync failures

* v2.1.0.18, finalized
2026-02-08 23:51:51 -06:00

521 lines
13 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package provers
import (
"bytes"
"crypto/rand"
"encoding/hex"
"math/big"
"sort"
"sync"
"github.com/pkg/errors"
"github.com/shopspring/decimal"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/reward"
"source.quilibrium.com/quilibrium/monorepo/types/store"
"source.quilibrium.com/quilibrium/monorepo/types/worker"
)
type Strategy int
const (
RewardGreedy Strategy = iota
DataGreedy
)
// ShardDescriptor describes a candidate shard allocation target.
type ShardDescriptor struct {
// Confirmation filter for the shard (routing key). Must be non-empty.
Filter []byte
// Size in bytes of this shards state (for reward proportionality).
Size uint64
// Ring attenuation factor (reward is divided by 2^Ring). Usually 0 unless
// you intentionally place on outer rings.
Ring uint8
// Logical shard-group participation count for sqrt divisor (>=1).
// If youre assigning a worker to exactly one shard, use 1.
Shards uint64
}
// Proposal is a plan to allocate a specific worker to a shard filter.
type Proposal struct {
WorkerId uint
Filter []byte
ExpectedReward *big.Int // in base units
WorldStateBytes uint64
ShardSizeBytes uint64
Ring uint8
ShardsDenominator uint64
}
// scored is an internal struct for ranking proposals
type scored struct {
idx int
score *big.Int
}
// Manager ranks shards and assigns free workers to the best ones.
type Manager struct {
logger *zap.Logger
store store.WorkerStore
workerMgr worker.WorkerManager
// Static issuance parameters for planning
Units uint64
Strategy Strategy
mu sync.Mutex
isPlanning bool
}
// NewManager wires up a planning manager
func NewManager(
logger *zap.Logger,
ws store.WorkerStore,
wm worker.WorkerManager,
units uint64,
strategy Strategy,
) *Manager {
return &Manager{
logger: logger.Named("allocation_manager"),
store: ws,
workerMgr: wm,
Units: units,
Strategy: strategy,
}
}
// PlanAndAllocate picks up to maxAllocations of the best shard filters and
// updates the filter in the worker manager for each selected free worker.
// If maxAllocations == 0, it will use as many free workers as available.
// frameNumber is recorded so pending joins survive restarts while the network
// processes the request.
func (m *Manager) PlanAndAllocate(
difficulty uint64,
shards []ShardDescriptor,
maxAllocations int,
worldBytes *big.Int,
frameNumber uint64,
) ([]Proposal, error) {
m.mu.Lock()
isPlanning := m.isPlanning
m.mu.Unlock()
if isPlanning {
m.logger.Debug("planning already in progress")
return []Proposal{}, nil
}
m.mu.Lock()
m.isPlanning = true
m.mu.Unlock()
defer func() {
m.mu.Lock()
m.isPlanning = false
m.mu.Unlock()
}()
if len(shards) == 0 {
m.logger.Debug("no shards to allocate")
return nil, nil
}
// Enumerate free workers (unallocated).
all, err := m.workerMgr.RangeWorkers()
if err != nil {
return nil, errors.Wrap(err, "plan and allocate")
}
free := make([]uint, 0, len(all))
for _, w := range all {
if len(w.Filter) == 0 {
free = append(free, w.CoreId)
}
}
if len(free) == 0 {
m.logger.Debug("no workers free")
return nil, nil
}
if worldBytes.Cmp(big.NewInt(0)) == 0 {
return nil, errors.Wrap(
errors.New("world size is zero"),
"plan and allocate",
)
}
// Pre-compute basis (independent of shard specifics).
basis := reward.PomwBasis(difficulty, worldBytes.Uint64(), m.Units)
scores, err := m.scoreShards(shards, basis, worldBytes)
if err != nil {
return nil, errors.Wrap(err, "plan and allocate")
}
if len(scores) == 0 {
m.logger.Debug("no scores")
return nil, nil
}
// Sort by score desc, then lexicographically by filter to keep order
// stable/deterministic.
sort.Slice(scores, func(i, j int) bool {
cmp := scores[i].score.Cmp(scores[j].score)
if cmp != 0 {
return cmp > 0
}
fi := shards[scores[i].idx].Filter
fj := shards[scores[j].idx].Filter
return bytes.Compare(fi, fj) < 0
})
// For reward-greedy strategy, randomize within equal-score groups so equally
// good shards are chosen fairly instead of skewing toward lexicographically
// earlier filters.
if m.Strategy != DataGreedy && len(scores) > 1 {
for start := 0; start < len(scores); {
end := start + 1
// Find run [start,end) where scores are equal.
for end < len(scores) && scores[end].score.Cmp(scores[start].score) == 0 {
end++
}
// Shuffle the run with FisherYates:
if end-start > 1 {
for i := end - 1; i > start; i-- {
n := big.NewInt(int64(i - start + 1))
r, err := rand.Int(rand.Reader, n)
if err == nil {
j := start + int(r.Int64())
scores[i], scores[j] = scores[j], scores[i]
}
}
}
start = end
}
}
// Determine how many allocations well attempt.
limit := len(free)
if maxAllocations > 0 && maxAllocations < limit {
limit = maxAllocations
}
if limit > len(scores) {
limit = len(scores)
}
m.logger.Debug(
"deciding on scored proposals",
zap.Int("free_workers", len(free)),
zap.Int("max_allocations", maxAllocations),
zap.Int("scores", len(scores)),
zap.Int("limit", limit),
)
proposals := make([]Proposal, 0, limit)
// Assign top-k scored shards to the first k free workers.
for k := 0; k < limit; k++ {
sel := shards[scores[k].idx]
// Copy filter so we don't leak underlying slices.
filterCopy := make([]byte, len(sel.Filter))
copy(filterCopy, sel.Filter)
// Convert expected reward to *big.Int to match issuance style.
expBI := scores[k].score
proposals = append(proposals, Proposal{
WorkerId: free[k],
Filter: filterCopy,
ExpectedReward: expBI,
WorldStateBytes: worldBytes.Uint64(),
ShardSizeBytes: sel.Size,
Ring: sel.Ring,
ShardsDenominator: sel.Shards,
})
}
workerLookup := make(map[uint]*store.WorkerInfo, len(all))
for _, w := range all {
workerLookup[w.CoreId] = w
}
if len(proposals) > 0 {
m.persistPlannedFilters(proposals, workerLookup, frameNumber)
}
// Perform allocations
workerIds := []uint{}
filters := [][]byte{}
for _, p := range proposals {
workerIds = append(workerIds, p.WorkerId)
filters = append(filters, p.Filter)
}
m.logger.Debug("proposals collated", zap.Int("count", len(proposals)))
err = m.workerMgr.ProposeAllocations(workerIds, filters)
if err != nil {
m.logger.Warn("allocate worker failed",
zap.Error(err),
)
}
return proposals, errors.Wrap(err, "plan and allocate")
}
func (m *Manager) persistPlannedFilters(
proposals []Proposal,
workers map[uint]*store.WorkerInfo,
frameNumber uint64,
) {
for _, proposal := range proposals {
info, ok := workers[proposal.WorkerId]
if !ok {
var err error
info, err = m.store.GetWorker(proposal.WorkerId)
if err != nil {
m.logger.Warn(
"failed to load worker for planned allocation",
zap.Uint("core_id", proposal.WorkerId),
zap.Error(err),
)
continue
}
workers[proposal.WorkerId] = info
}
if bytes.Equal(info.Filter, proposal.Filter) {
continue
}
filterCopy := make([]byte, len(proposal.Filter))
copy(filterCopy, proposal.Filter)
info.Filter = filterCopy
info.Allocated = false
info.PendingFilterFrame = frameNumber
if err := m.workerMgr.RegisterWorker(info); err != nil {
m.logger.Warn(
"failed to persist worker filter",
zap.Uint("core_id", info.CoreId),
zap.Error(err),
)
}
}
}
func (m *Manager) scoreShards(
shards []ShardDescriptor,
basis *big.Int,
worldBytes *big.Int,
) ([]scored, error) {
scores := make([]scored, 0, len(shards))
for i, s := range shards {
if len(s.Filter) == 0 || s.Size == 0 {
continue
}
if s.Shards == 0 {
s.Shards = 1
}
var score *big.Int
switch m.Strategy {
case DataGreedy:
// Pure data coverage: larger shards first.
score = big.NewInt(int64(s.Size))
default:
// factor = (stateSize / worldBytes)
factor := decimal.NewFromUint64(s.Size)
factor = factor.Mul(decimal.NewFromBigInt(basis, 0))
factor = factor.Div(decimal.NewFromBigInt(worldBytes, 0))
// ring divisor = 2^Ring
divisor := int64(1)
for j := uint8(0); j < s.Ring+1; j++ {
divisor <<= 1
}
// shard is oversubscribed, treat as no rewards
if divisor == 0 {
scores = append(scores, scored{idx: i, score: big.NewInt(0)})
continue
}
ringDiv := decimal.NewFromInt(divisor)
// shard factor = sqrt(Shards)
shardsSqrt, err := decimal.NewFromUint64(s.Shards).PowWithPrecision(
decimal.NewFromBigRat(big.NewRat(1, 2), 53),
53,
)
if err != nil {
return nil, errors.Wrap(err, "score shards")
}
if shardsSqrt.IsZero() {
return nil, errors.New("score shards")
}
factor = factor.Div(ringDiv)
factor = factor.Div(shardsSqrt)
score = factor.BigInt()
}
scores = append(scores, scored{idx: i, score: score})
}
return scores, nil
}
// DecideJoins evaluates pending shard joins using the latest shard view. It
// uses the same scoring basis as initial planning. For each pending join:
// - If there exists a strictly better shard in the latest view, reject the
// existing one (this will result in a new join attempt).
// - Otherwise (tie or better), confirm the existing one.
func (m *Manager) DecideJoins(
difficulty uint64,
shards []ShardDescriptor,
pending [][]byte,
worldBytes *big.Int,
) error {
if len(pending) == 0 {
return nil
}
availableWorkers, err := m.unallocatedWorkerCount()
if err != nil {
return errors.Wrap(err, "decide joins")
}
// If no shards remain, we should warn
if len(shards) == 0 {
m.logger.Warn("no shards available to decide")
return nil
}
basis := reward.PomwBasis(difficulty, worldBytes.Uint64(), m.Units)
scores, err := m.scoreShards(shards, basis, worldBytes)
if err != nil {
return errors.Wrap(err, "decide joins")
}
type srec struct {
desc ShardDescriptor
score *big.Int
}
byHex := make(map[string]srec, len(shards))
var bestScore *big.Int
for _, sc := range scores {
s := shards[sc.idx]
key := hex.EncodeToString(s.Filter)
byHex[key] = srec{desc: s, score: sc.score}
if bestScore == nil || sc.score.Cmp(bestScore) > 0 {
bestScore = sc.score
}
}
// If nothing valid, reject everything.
if bestScore == nil {
reject := make([][]byte, 0, len(pending))
for _, p := range pending {
if len(p) == 0 {
continue
}
if len(reject) > 99 {
break
}
pc := make([]byte, len(p))
copy(pc, p)
reject = append(reject, pc)
}
return m.workerMgr.DecideAllocations(reject, nil)
}
reject := make([][]byte, 0, len(pending))
confirm := make([][]byte, 0, len(pending))
// Calculate rejection threshold: only reject if bestScore is significantly
// better (at least 50% higher) than the pending shard's score. This prevents
// churn from minor score fluctuations.
// threshold = bestScore * 0.67 (i.e., reject if pending score < 67% of best,
// which means best is ~50% better than pending)
rejectThreshold := new(big.Int).Mul(bestScore, big.NewInt(67))
rejectThreshold.Div(rejectThreshold, big.NewInt(100))
for _, p := range pending {
if len(p) == 0 {
continue
}
if len(reject) > 99 {
break
}
if len(confirm) > 99 {
break
}
key := hex.EncodeToString(p)
rec, ok := byHex[key]
if !ok {
// If a pending shard is missing, we should reject it. This could happen
// if shard-out produces a bunch of divisions.
pc := make([]byte, len(p))
copy(pc, p)
reject = append(reject, pc)
continue
}
// Reject only if the pending shard's score is significantly worse than
// the best available (below 90% of the best score). This prevents churn
// from minor score differences.
if rec.score.Cmp(rejectThreshold) < 0 {
pc := make([]byte, len(p))
copy(pc, p)
reject = append(reject, pc)
} else {
// Otherwise confirm - score is within acceptable range of best
pc := make([]byte, len(p))
copy(pc, p)
confirm = append(confirm, pc)
}
}
if len(reject) > 0 {
return m.workerMgr.DecideAllocations(reject, nil)
} else {
if availableWorkers == 0 && len(confirm) > 0 {
m.logger.Info(
"skipping confirmations due to lack of available workers",
zap.Int("pending_confirmations", len(confirm)),
)
confirm = nil
} else if availableWorkers > 0 && len(confirm) > availableWorkers {
m.logger.Warn(
"limiting confirmations due to worker capacity",
zap.Int("pending_confirmations", len(confirm)),
zap.Int("available_workers", availableWorkers),
)
confirm = confirm[:availableWorkers]
}
return m.workerMgr.DecideAllocations(nil, confirm)
}
}
func (m *Manager) unallocatedWorkerCount() (int, error) {
workers, err := m.workerMgr.RangeWorkers()
if err != nil {
return 0, err
}
count := 0
for _, worker := range workers {
if worker == nil {
continue
}
if !worker.Allocated {
count++
}
}
return count, nil
}