ceremonyclient/go-libp2p-blossomsub/backoff.go
Cassandra Heart c797d482f9
v2.1.0.5 (#457)
* wip: conversion of hotstuff from flow into Q-oriented model

* bulk of tests

* remaining non-integration tests

* add integration test, adjust log interface, small tweaks

* further adjustments, restore full pacemaker shape

* add component lifecycle management+supervisor

* further refinements

* resolve timeout hanging

* mostly finalized state for consensus

* bulk of engine swap out

* lifecycle-ify most types

* wiring nearly complete, missing needed hooks for proposals

* plugged in, vetting message validation paths

* global consensus, plugged in and verified

* app shard now wired in too

* do not decode empty keys.yml (#456)

* remove obsolete engine.maxFrames config parameter (#454)

* default to Info log level unless debug is enabled (#453)

* respect config's  "logging" section params, remove obsolete single-file logging (#452)

* Trivial code cleanup aiming to reduce Go compiler warnings (#451)

* simplify range traversal

* simplify channel read for single select case

* delete rand.Seed() deprecated in Go 1.20 and no-op as of Go 1.24

* simplify range traversal

* simplify channel read for single select case

* remove redundant type from array

* simplify range traversal

* simplify channel read for single select case

* RC slate

* finalize 2.1.0.5

* Update comments in StrictMonotonicCounter

Fix comment formatting and clarify description.

---------

Co-authored-by: Black Swan <3999712+blacks1ne@users.noreply.github.com>
2025-11-11 05:00:17 -06:00

109 lines
2.3 KiB
Go

package blossomsub
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/peer"
)
const (
MinBackoffDelay = 100 * time.Millisecond
MaxBackoffDelay = 10 * time.Second
TimeToLive = 10 * time.Minute
BackoffCleanupInterval = 1 * time.Minute
BackoffMultiplier = 2
MaxBackoffJitterCoff = 100
MaxBackoffAttempts = 4
)
type backoffHistory struct {
duration time.Duration
lastTried time.Time
attempts int
}
type backoff struct {
mu sync.Mutex
info map[peer.ID]*backoffHistory
ct int // size threshold that kicks off the cleaner
ci time.Duration // cleanup intervals
maxAttempts int // maximum backoff attempts prior to ejection
}
func newBackoff(ctx context.Context, sizeThreshold int, cleanupInterval time.Duration, maxAttempts int) *backoff {
b := &backoff{
mu: sync.Mutex{},
ct: sizeThreshold,
ci: cleanupInterval,
maxAttempts: maxAttempts,
info: make(map[peer.ID]*backoffHistory),
}
go b.cleanupLoop(ctx)
return b
}
func (b *backoff) updateAndGet(id peer.ID) (time.Duration, error) {
b.mu.Lock()
h, ok := b.info[id]
switch {
case !ok || time.Since(h.lastTried) > TimeToLive:
// first request goes immediately.
h = &backoffHistory{
duration: time.Duration(0),
attempts: 0,
}
case h.attempts >= b.maxAttempts:
b.mu.Unlock()
return 0, fmt.Errorf("peer %s has reached its maximum backoff attempts", id)
case h.duration < MinBackoffDelay:
h.duration = MinBackoffDelay
case h.duration < MaxBackoffDelay:
jitter := rand.Intn(MaxBackoffJitterCoff)
h.duration = (BackoffMultiplier * h.duration) + time.Duration(jitter)*time.Millisecond
if h.duration > MaxBackoffDelay || h.duration < 0 {
h.duration = MaxBackoffDelay
}
}
h.attempts += 1
h.lastTried = time.Now()
b.info[id] = h
b.mu.Unlock()
return h.duration, nil
}
func (b *backoff) cleanup() {
b.mu.Lock()
for id, h := range b.info {
if time.Since(h.lastTried) > TimeToLive {
delete(b.info, id)
}
}
b.mu.Unlock()
}
func (b *backoff) cleanupLoop(ctx context.Context) {
ticker := time.NewTicker(b.ci)
for {
select {
case <-ctx.Done():
ticker.Stop()
return // pubsub shutting down
case <-ticker.C:
b.cleanup()
}
}
}