ceremonyclient/node/consensus/fees/dynamic_fee_manager.go
Cassandra Heart dbd95bd9e9
v2.1.0 (#439)
* v2.1.0 [omit consensus and adjacent] - this commit will be amended with the full release after the file copy is complete

* 2.1.0 main node rollup
2025-09-30 02:48:15 -05:00

317 lines
7.9 KiB
Go

package fees
import (
"encoding/hex"
"sync"
"time"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/types/consensus"
"source.quilibrium.com/quilibrium/monorepo/types/crypto"
)
const (
// maxWindowSize is the maximum number of frames to keep in the sliding window
maxWindowSize = 360
// defaultFeeMultiplier is used when there are no votes yet
defaultFeeMultiplier = 100
)
// feeVote represents a single fee multiplier vote from a frame
type feeVote struct {
frameNumber uint64
feeMultiplierVote uint64
}
// filterFeeData tracks fee votes for a specific filter/shard
type filterFeeData struct {
votes []feeVote // Sliding window of votes (newest at end)
sumVotes uint64 // Running sum to avoid recalculation
lastUpdated time.Time // For pruning inactive filters
}
// DynamicFeeManager implements the DynamicFeeManager interface
type DynamicFeeManager struct {
logger *zap.Logger
mu sync.RWMutex
// Map from filter (as hex string) to fee data
filterData map[string]*filterFeeData
inclusionProver crypto.InclusionProver
}
// NewDynamicFeeManager creates a new dynamic fee manager
func NewDynamicFeeManager(
logger *zap.Logger,
inclusionProver crypto.InclusionProver,
) consensus.DynamicFeeManager {
return &DynamicFeeManager{
logger: logger,
filterData: make(map[string]*filterFeeData),
inclusionProver: inclusionProver,
}
}
// AddFrameFeeVote adds a fee multiplier vote from a frame to the sliding window
func (d *DynamicFeeManager) AddFrameFeeVote(
filter []byte,
frameNumber uint64,
feeMultiplierVote uint64,
) error {
d.mu.Lock()
defer d.mu.Unlock()
filterKey := hex.EncodeToString(filter)
// Get or create filter data
data, exists := d.filterData[filterKey]
if !exists {
data = &filterFeeData{
votes: make([]feeVote, 0, maxWindowSize),
sumVotes: 0,
lastUpdated: time.Now(),
}
d.filterData[filterKey] = data
filtersTracked.Set(float64(len(d.filterData)))
}
// Check if this is a duplicate or out-of-order frame
if len(data.votes) > 0 {
lastFrame := data.votes[len(data.votes)-1]
if frameNumber <= lastFrame.frameNumber {
return errors.Errorf(
"frame %d is not newer than last frame %d",
frameNumber,
lastFrame.frameNumber,
)
}
}
// Add the new vote
newVote := feeVote{
frameNumber: frameNumber,
feeMultiplierVote: feeMultiplierVote,
}
data.votes = append(data.votes, newVote)
data.sumVotes += feeMultiplierVote
data.lastUpdated = time.Now()
// Record metrics
feeVotesAdded.WithLabelValues(filterKey).Inc()
feeVoteDistribution.WithLabelValues(filterKey).Observe(
float64(feeMultiplierVote),
)
// Maintain sliding window size
if len(data.votes) > maxWindowSize {
// Remove oldest vote
oldVote := data.votes[0]
data.votes = data.votes[1:]
data.sumVotes -= oldVote.feeMultiplierVote
feeVotesDropped.WithLabelValues(filterKey).Inc()
}
// Update metrics
slidingWindowSize.WithLabelValues(filterKey).Set(float64(len(data.votes)))
// Calculate and update current fee multiplier metric
if len(data.votes) > 0 {
avgFee := data.sumVotes / uint64(len(data.votes))
currentFeeMultiplier.WithLabelValues(filterKey).Set(float64(avgFee))
}
d.logger.Debug(
"added fee vote",
zap.String("filter", filterKey),
zap.Uint64("frame_number", frameNumber),
zap.Uint64("fee_multiplier_vote", feeMultiplierVote),
zap.Int("window_size", len(data.votes)),
)
return nil
}
// GetNextFeeMultiplier returns the calculated fee multiplier based on the
// average.
func (d *DynamicFeeManager) GetNextFeeMultiplier(
filter []byte,
) (uint64, error) {
timer := prometheus.NewTimer(
feeCalculationDuration.WithLabelValues(hex.EncodeToString(filter)),
)
defer timer.ObserveDuration()
d.mu.RLock()
defer d.mu.RUnlock()
filterKey := hex.EncodeToString(filter)
data, exists := d.filterData[filterKey]
if !exists || len(data.votes) == 0 {
d.logger.Debug(
"no fee votes for filter, using default",
zap.String("filter", filterKey),
zap.Uint64("default_fee", defaultFeeMultiplier),
)
return defaultFeeMultiplier, nil
}
// Calculate average using the maintained sum
// This avoids floating point and minimizes rounding errors
avgFee := data.sumVotes / uint64(len(data.votes))
d.logger.Debug(
"calculated fee multiplier",
zap.String("filter", filterKey),
zap.Uint64("average_fee", avgFee),
zap.Int("votes_count", len(data.votes)),
zap.Uint64("sum_votes", data.sumVotes),
)
return avgFee, nil
}
// GetVoteHistory returns the current sliding window of fee votes for a filter.
func (d *DynamicFeeManager) GetVoteHistory(
filter []byte,
) ([]uint64, error) {
d.mu.RLock()
defer d.mu.RUnlock()
filterKey := hex.EncodeToString(filter)
data, exists := d.filterData[filterKey]
if !exists {
return []uint64{}, nil
}
history := make([]uint64, len(data.votes))
for i, vote := range data.votes {
history[i] = vote.feeMultiplierVote
}
return history, nil
}
// GetAverageWindowSize returns the current number of votes in the sliding
// window.
func (d *DynamicFeeManager) GetAverageWindowSize(
filter []byte,
) (int, error) {
d.mu.RLock()
defer d.mu.RUnlock()
filterKey := hex.EncodeToString(filter)
data, exists := d.filterData[filterKey]
if !exists {
return 0, nil
}
return len(data.votes), nil
}
// PruneOldData removes fee vote data for filters that haven't been updated
// recently.
func (d *DynamicFeeManager) PruneOldData(maxAge uint64) error {
d.mu.Lock()
defer d.mu.Unlock()
cutoffTime := time.Now().Add(-time.Duration(maxAge) * time.Millisecond)
prunedCount := 0
for filterKey, data := range d.filterData {
if data.lastUpdated.Before(cutoffTime) {
delete(d.filterData, filterKey)
prunedCount++
// Clear metrics for this filter
slidingWindowSize.DeleteLabelValues(filterKey)
currentFeeMultiplier.DeleteLabelValues(filterKey)
}
}
if prunedCount > 0 {
filtersPruned.Add(float64(prunedCount))
filtersTracked.Set(float64(len(d.filterData)))
d.logger.Info(
"pruned old filter data",
zap.Int("pruned_count", prunedCount),
zap.Int("remaining_filters", len(d.filterData)),
)
}
return nil
}
// RewindToFrame removes all votes newer than the specified frame number.
func (d *DynamicFeeManager) RewindToFrame(
filter []byte,
frameNumber uint64,
) (int, error) {
d.mu.Lock()
defer d.mu.Unlock()
filterKey := hex.EncodeToString(filter)
data, exists := d.filterData[filterKey]
if !exists || len(data.votes) == 0 {
return 0, nil
}
// Find the index of the first vote to remove
// We want to keep all votes with frameNumber <= the specified frameNumber
keepIndex := 0
for i := 0; i < len(data.votes); i++ {
if data.votes[i].frameNumber > frameNumber {
keepIndex = i
break
}
}
// If all votes are <= frameNumber, keep all
if keepIndex == 0 && len(data.votes) > 0 &&
data.votes[len(data.votes)-1].frameNumber <= frameNumber {
keepIndex = len(data.votes)
}
// If nothing to remove
if keepIndex == len(data.votes) {
return 0, nil
}
// Calculate how many votes we're removing
removedCount := len(data.votes) - keepIndex
// Update the sum by subtracting the removed votes
for i := keepIndex; i < len(data.votes); i++ {
data.sumVotes -= data.votes[i].feeMultiplierVote
feeVotesDropped.WithLabelValues(filterKey).Inc()
}
// Truncate the votes slice
data.votes = data.votes[:keepIndex]
// Update metrics
slidingWindowSize.WithLabelValues(filterKey).Set(float64(len(data.votes)))
// Update current fee multiplier metric
if len(data.votes) > 0 {
avgFee := data.sumVotes / uint64(len(data.votes))
currentFeeMultiplier.WithLabelValues(filterKey).Set(float64(avgFee))
} else {
currentFeeMultiplier.WithLabelValues(filterKey).Set(
float64(defaultFeeMultiplier),
)
}
d.logger.Debug(
"rewound to frame",
zap.String("filter", filterKey),
zap.Uint64("frame_number", frameNumber),
zap.Int("removed_count", removedCount),
zap.Int("remaining_count", len(data.votes)),
)
return removedCount, nil
}