package provers import ( "bytes" "crypto/rand" "encoding/hex" "math/big" "sort" "sync" "github.com/pkg/errors" "github.com/shopspring/decimal" "go.uber.org/zap" "source.quilibrium.com/quilibrium/monorepo/node/consensus/reward" "source.quilibrium.com/quilibrium/monorepo/types/store" "source.quilibrium.com/quilibrium/monorepo/types/worker" ) type Strategy int const ( RewardGreedy Strategy = iota DataGreedy ) // ShardDescriptor describes a candidate shard allocation target. type ShardDescriptor struct { // Confirmation filter for the shard (routing key). Must be non-empty. Filter []byte // Size in bytes of this shard’s state (for reward proportionality). Size uint64 // Ring attenuation factor (reward is divided by 2^Ring). Usually 0 unless // you intentionally place on outer rings. Ring uint8 // Logical shard-group participation count for sqrt divisor (>=1). // If you’re assigning a worker to exactly one shard, use 1. Shards uint64 } // Proposal is a plan to allocate a specific worker to a shard filter. type Proposal struct { WorkerId uint Filter []byte ExpectedReward *big.Int // in base units WorldStateBytes uint64 ShardSizeBytes uint64 Ring uint8 ShardsDenominator uint64 } // scored is an internal struct for ranking proposals type scored struct { idx int score *big.Int } // Manager ranks shards and assigns free workers to the best ones. type Manager struct { logger *zap.Logger store store.WorkerStore workerMgr worker.WorkerManager // Static issuance parameters for planning Units uint64 Strategy Strategy mu sync.Mutex isPlanning bool } // NewManager wires up a planning manager func NewManager( logger *zap.Logger, ws store.WorkerStore, wm worker.WorkerManager, units uint64, strategy Strategy, ) *Manager { return &Manager{ logger: logger.Named("allocation_manager"), store: ws, workerMgr: wm, Units: units, Strategy: strategy, } } // PlanAndAllocate picks up to maxAllocations of the best shard filters and // updates the filter in the worker manager for each selected free worker. // If maxAllocations == 0, it will use as many free workers as available. // frameNumber is recorded so pending joins survive restarts while the network // processes the request. func (m *Manager) PlanAndAllocate( difficulty uint64, shards []ShardDescriptor, maxAllocations int, worldBytes *big.Int, frameNumber uint64, ) ([]Proposal, error) { m.mu.Lock() isPlanning := m.isPlanning m.mu.Unlock() if isPlanning { m.logger.Debug("planning already in progress") return []Proposal{}, nil } m.mu.Lock() m.isPlanning = true m.mu.Unlock() defer func() { m.mu.Lock() m.isPlanning = false m.mu.Unlock() }() if len(shards) == 0 { m.logger.Debug("no shards to allocate") return nil, nil } // Enumerate free workers (unallocated). all, err := m.workerMgr.RangeWorkers() if err != nil { return nil, errors.Wrap(err, "plan and allocate") } free := make([]uint, 0, len(all)) for _, w := range all { if len(w.Filter) == 0 { free = append(free, w.CoreId) } } if len(free) == 0 { m.logger.Debug("no workers free") return nil, nil } if worldBytes.Cmp(big.NewInt(0)) == 0 { return nil, errors.Wrap( errors.New("world size is zero"), "plan and allocate", ) } // Pre-compute basis (independent of shard specifics). basis := reward.PomwBasis(difficulty, worldBytes.Uint64(), m.Units) scores, err := m.scoreShards(shards, basis, worldBytes) if err != nil { return nil, errors.Wrap(err, "plan and allocate") } if len(scores) == 0 { m.logger.Debug("no scores") return nil, nil } // Sort by score desc, then lexicographically by filter to keep order // stable/deterministic. sort.Slice(scores, func(i, j int) bool { cmp := scores[i].score.Cmp(scores[j].score) if cmp != 0 { return cmp > 0 } fi := shards[scores[i].idx].Filter fj := shards[scores[j].idx].Filter return bytes.Compare(fi, fj) < 0 }) // For reward-greedy strategy, randomize within equal-score groups so equally // good shards are chosen fairly instead of skewing toward lexicographically // earlier filters. if m.Strategy != DataGreedy && len(scores) > 1 { for start := 0; start < len(scores); { end := start + 1 // Find run [start,end) where scores are equal. for end < len(scores) && scores[end].score.Cmp(scores[start].score) == 0 { end++ } // Shuffle the run with Fisher–Yates: if end-start > 1 { for i := end - 1; i > start; i-- { n := big.NewInt(int64(i - start + 1)) r, err := rand.Int(rand.Reader, n) if err == nil { j := start + int(r.Int64()) scores[i], scores[j] = scores[j], scores[i] } } } start = end } } // Determine how many allocations we’ll attempt. limit := len(free) if maxAllocations > 0 && maxAllocations < limit { limit = maxAllocations } if limit > len(scores) { limit = len(scores) } m.logger.Debug( "deciding on scored proposals", zap.Int("free_workers", len(free)), zap.Int("max_allocations", maxAllocations), zap.Int("scores", len(scores)), zap.Int("limit", limit), ) proposals := make([]Proposal, 0, limit) // Assign top-k scored shards to the first k free workers. for k := 0; k < limit; k++ { sel := shards[scores[k].idx] // Copy filter so we don't leak underlying slices. filterCopy := make([]byte, len(sel.Filter)) copy(filterCopy, sel.Filter) // Convert expected reward to *big.Int to match issuance style. expBI := scores[k].score proposals = append(proposals, Proposal{ WorkerId: free[k], Filter: filterCopy, ExpectedReward: expBI, WorldStateBytes: worldBytes.Uint64(), ShardSizeBytes: sel.Size, Ring: sel.Ring, ShardsDenominator: sel.Shards, }) } workerLookup := make(map[uint]*store.WorkerInfo, len(all)) for _, w := range all { workerLookup[w.CoreId] = w } if len(proposals) > 0 { m.persistPlannedFilters(proposals, workerLookup, frameNumber) } // Perform allocations workerIds := []uint{} filters := [][]byte{} for _, p := range proposals { workerIds = append(workerIds, p.WorkerId) filters = append(filters, p.Filter) } m.logger.Debug("proposals collated", zap.Int("count", len(proposals))) err = m.workerMgr.ProposeAllocations(workerIds, filters) if err != nil { m.logger.Warn("allocate worker failed", zap.Error(err), ) } return proposals, errors.Wrap(err, "plan and allocate") } func (m *Manager) persistPlannedFilters( proposals []Proposal, workers map[uint]*store.WorkerInfo, frameNumber uint64, ) { for _, proposal := range proposals { info, ok := workers[proposal.WorkerId] if !ok { var err error info, err = m.store.GetWorker(proposal.WorkerId) if err != nil { m.logger.Warn( "failed to load worker for planned allocation", zap.Uint("core_id", proposal.WorkerId), zap.Error(err), ) continue } workers[proposal.WorkerId] = info } if bytes.Equal(info.Filter, proposal.Filter) { continue } filterCopy := make([]byte, len(proposal.Filter)) copy(filterCopy, proposal.Filter) info.Filter = filterCopy info.Allocated = false info.PendingFilterFrame = frameNumber if err := m.workerMgr.RegisterWorker(info); err != nil { m.logger.Warn( "failed to persist worker filter", zap.Uint("core_id", info.CoreId), zap.Error(err), ) } } } func (m *Manager) scoreShards( shards []ShardDescriptor, basis *big.Int, worldBytes *big.Int, ) ([]scored, error) { scores := make([]scored, 0, len(shards)) for i, s := range shards { if len(s.Filter) == 0 || s.Size == 0 { continue } if s.Shards == 0 { s.Shards = 1 } var score *big.Int switch m.Strategy { case DataGreedy: // Pure data coverage: larger shards first. score = big.NewInt(int64(s.Size)) default: // factor = (stateSize / worldBytes) factor := decimal.NewFromUint64(s.Size) factor = factor.Mul(decimal.NewFromBigInt(basis, 0)) factor = factor.Div(decimal.NewFromBigInt(worldBytes, 0)) // ring divisor = 2^Ring divisor := int64(1) for j := uint8(0); j < s.Ring+1; j++ { divisor <<= 1 } // shard is oversubscribed, treat as no rewards if divisor == 0 { scores = append(scores, scored{idx: i, score: big.NewInt(0)}) continue } ringDiv := decimal.NewFromInt(divisor) // shard factor = sqrt(Shards) shardsSqrt, err := decimal.NewFromUint64(s.Shards).PowWithPrecision( decimal.NewFromBigRat(big.NewRat(1, 2), 53), 53, ) if err != nil { return nil, errors.Wrap(err, "score shards") } if shardsSqrt.IsZero() { return nil, errors.New("score shards") } factor = factor.Div(ringDiv) factor = factor.Div(shardsSqrt) score = factor.BigInt() } scores = append(scores, scored{idx: i, score: score}) } return scores, nil } // DecideJoins evaluates pending shard joins using the latest shard view. It // uses the same scoring basis as initial planning. For each pending join: // - If there exists a strictly better shard in the latest view, reject the // existing one (this will result in a new join attempt). // - Otherwise (tie or better), confirm the existing one. func (m *Manager) DecideJoins( difficulty uint64, shards []ShardDescriptor, pending [][]byte, worldBytes *big.Int, ) error { if len(pending) == 0 { return nil } availableWorkers, err := m.unallocatedWorkerCount() if err != nil { return errors.Wrap(err, "decide joins") } // If no shards remain, we should warn if len(shards) == 0 { m.logger.Warn("no shards available to decide") return nil } basis := reward.PomwBasis(difficulty, worldBytes.Uint64(), m.Units) scores, err := m.scoreShards(shards, basis, worldBytes) if err != nil { return errors.Wrap(err, "decide joins") } type srec struct { desc ShardDescriptor score *big.Int } byHex := make(map[string]srec, len(shards)) var bestScore *big.Int for _, sc := range scores { s := shards[sc.idx] key := hex.EncodeToString(s.Filter) byHex[key] = srec{desc: s, score: sc.score} if bestScore == nil || sc.score.Cmp(bestScore) > 0 { bestScore = sc.score } } // If nothing valid, reject everything. if bestScore == nil { reject := make([][]byte, 0, len(pending)) for _, p := range pending { if len(p) == 0 { continue } if len(reject) > 99 { break } pc := make([]byte, len(p)) copy(pc, p) reject = append(reject, pc) } return m.workerMgr.DecideAllocations(reject, nil) } reject := make([][]byte, 0, len(pending)) confirm := make([][]byte, 0, len(pending)) // Calculate rejection threshold: only reject if bestScore is significantly // better (at least 50% higher) than the pending shard's score. This prevents // churn from minor score fluctuations. // threshold = bestScore * 0.67 (i.e., reject if pending score < 67% of best, // which means best is ~50% better than pending) rejectThreshold := new(big.Int).Mul(bestScore, big.NewInt(67)) rejectThreshold.Div(rejectThreshold, big.NewInt(100)) for _, p := range pending { if len(p) == 0 { continue } if len(reject) > 99 { break } if len(confirm) > 99 { break } key := hex.EncodeToString(p) rec, ok := byHex[key] if !ok { // If a pending shard is missing, we should reject it. This could happen // if shard-out produces a bunch of divisions. pc := make([]byte, len(p)) copy(pc, p) reject = append(reject, pc) continue } // Reject only if the pending shard's score is significantly worse than // the best available (below 90% of the best score). This prevents churn // from minor score differences. if rec.score.Cmp(rejectThreshold) < 0 { pc := make([]byte, len(p)) copy(pc, p) reject = append(reject, pc) } else { // Otherwise confirm - score is within acceptable range of best pc := make([]byte, len(p)) copy(pc, p) confirm = append(confirm, pc) } } if len(reject) > 0 { return m.workerMgr.DecideAllocations(reject, nil) } else { if availableWorkers == 0 && len(confirm) > 0 { m.logger.Info( "skipping confirmations due to lack of available workers", zap.Int("pending_confirmations", len(confirm)), ) confirm = nil } else if availableWorkers > 0 && len(confirm) > availableWorkers { m.logger.Warn( "limiting confirmations due to worker capacity", zap.Int("pending_confirmations", len(confirm)), zap.Int("available_workers", availableWorkers), ) confirm = confirm[:availableWorkers] } return m.workerMgr.DecideAllocations(nil, confirm) } } func (m *Manager) unallocatedWorkerCount() (int, error) { workers, err := m.workerMgr.RangeWorkers() if err != nil { return 0, err } count := 0 for _, worker := range workers { if worker == nil { continue } if !worker.Allocated { count++ } } return count, nil }