ceremonyclient/lifecycle/common.go
Cassandra Heart c797d482f9
v2.1.0.5 (#457)
* wip: conversion of hotstuff from flow into Q-oriented model

* bulk of tests

* remaining non-integration tests

* add integration test, adjust log interface, small tweaks

* further adjustments, restore full pacemaker shape

* add component lifecycle management+supervisor

* further refinements

* resolve timeout hanging

* mostly finalized state for consensus

* bulk of engine swap out

* lifecycle-ify most types

* wiring nearly complete, missing needed hooks for proposals

* plugged in, vetting message validation paths

* global consensus, plugged in and verified

* app shard now wired in too

* do not decode empty keys.yml (#456)

* remove obsolete engine.maxFrames config parameter (#454)

* default to Info log level unless debug is enabled (#453)

* respect config's  "logging" section params, remove obsolete single-file logging (#452)

* Trivial code cleanup aiming to reduce Go compiler warnings (#451)

* simplify range traversal

* simplify channel read for single select case

* delete rand.Seed() deprecated in Go 1.20 and no-op as of Go 1.24

* simplify range traversal

* simplify channel read for single select case

* remove redundant type from array

* simplify range traversal

* simplify channel read for single select case

* RC slate

* finalize 2.1.0.5

* Update comments in StrictMonotonicCounter

Fix comment formatting and clarify description.

---------

Co-authored-by: Black Swan <3999712+blacks1ne@users.noreply.github.com>
2025-11-11 05:00:17 -06:00

216 lines
5.9 KiB
Go

package lifecycle
import (
"context"
"math"
"reflect"
)
// AllReady calls Ready on all input components and returns a channel that is
// closed when all input components are ready.
func AllReady(components ...Component) <-chan struct{} {
readyChans := make([]<-chan struct{}, len(components))
for i, c := range components {
readyChans[i] = c.Ready()
}
return AllClosed(readyChans...)
}
// AllDone calls Done on all input components and returns a channel that is
// closed when all input components are done.
func AllDone(components ...Component) <-chan struct{} {
doneChans := make([]<-chan struct{}, len(components))
for i, c := range components {
doneChans[i] = c.Done()
}
return AllClosed(doneChans...)
}
// AllClosed returns a channel that is closed when all input channels are
// closed.
func AllClosed(channels ...<-chan struct{}) <-chan struct{} {
done := make(chan struct{})
if len(channels) == 0 {
close(done)
return done
}
go func() {
for _, ch := range channels {
<-ch
}
close(done)
}()
return done
}
// WaitClosed waits for either a signal/close on the channel or for the context
// to be cancelled. Returns nil if the channel was signalled/closed before
// returning, otherwise, it returns the context error.
//
// This handles the corner case where the context is cancelled at the same time
// that the channel is closed, and the Done case was selected. This is intended
// for situations where ignoring a signal can cause safety issues.
func WaitClosed(ctx context.Context, ch <-chan struct{}) error {
select {
case <-ctx.Done():
select {
case <-ch:
return nil
default:
}
return ctx.Err()
case <-ch:
return nil
}
}
// CheckClosed checks if the provided channel has a signal or was closed.
// Returns true if the channel was signaled/closed, otherwise, returns false.
//
// This is intended to reduce boilerplate code when multiple channel checks are
// required because missed signals could cause safety issues.
func CheckClosed(done <-chan struct{}) bool {
select {
case <-done:
return true
default:
return false
}
}
// MergeChannels merges a list of channels into a single channel
func MergeChannels(channels interface{}) interface{} {
sliceType := reflect.TypeOf(channels)
if sliceType.Kind() != reflect.Slice && sliceType.Kind() != reflect.Array {
panic("argument must be an array or slice")
}
chanType := sliceType.Elem()
if chanType.ChanDir() == reflect.SendDir {
panic("channels cannot be send-only")
}
c := reflect.ValueOf(channels)
var cases []reflect.SelectCase
for i := 0; i < c.Len(); i++ {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: c.Index(i),
})
}
elemType := chanType.Elem()
out := reflect.MakeChan(reflect.ChanOf(reflect.BothDir, elemType), 0)
go func() {
for len(cases) > 0 {
i, v, ok := reflect.Select(cases)
if !ok {
lastIndex := len(cases) - 1
cases[i], cases[lastIndex] = cases[lastIndex], cases[i]
cases = cases[:lastIndex]
continue
}
out.Send(v)
}
out.Close()
}()
return out.Convert(reflect.ChanOf(reflect.RecvDir, elemType)).Interface()
}
// WaitError waits for either an error on the error channel or the done channel
// to close. Returns an error if one is received on the error channel, otherwise
// it returns nil.
//
// This handles a race condition where the done channel could have been closed
// as a result of a fatal error being thrown, so that when the scheduler yields
// control back to this goroutine, both channels are available to read from. If
// the done case happens to be chosen at random to proceed instead of the error
// case, then we would return without error which could result in unsafe
// continuation.
func WaitError(errChan <-chan error, done <-chan struct{}) error {
select {
case err := <-errChan:
return err
case <-done:
select {
case err := <-errChan:
return err
default:
}
return nil
}
}
// componentMerger is a utility structure which implements lifecycle.Component
// and is used to merge []T into one T.
type componentMerger struct {
components []Component
}
func (m componentMerger) Start(signalerContext SignalerContext) error {
for _, component := range m.components {
startable, ok := component.(Component)
if ok {
err := startable.Start(signalerContext)
if err != nil {
return err
}
}
}
return nil
}
func (m componentMerger) Ready() <-chan struct{} {
return AllReady(m.components...)
}
func (m componentMerger) Done() <-chan struct{} {
return AllDone(m.components...)
}
var _ Component = (*componentMerger)(nil)
// MergeComponents merges []Component into one Component.
func MergeComponents(components ...Component) Component {
return componentMerger{components: components}
}
// DetypeSlice converts a typed slice containing any kind of elements into an
// untyped []any type, in effect removing the element type information from the
// slice. It is useful for passing data into structpb.NewValue, which accepts
// []any but not []T for any specific type T.
func DetypeSlice[T any](typedSlice []T) []any {
untypedSlice := make([]any, len(typedSlice))
for i, t := range typedSlice {
untypedSlice[i] = t
}
return untypedSlice
}
// SampleN computes a percentage of the given number 'n', and returns the result
// as an unsigned integer. If the calculated sample is greater than the provided
// 'max' value, it returns the ceil of 'max'. If 'n' is less than or equal to 0,
// it returns 0.
//
// Parameters:
// - n: The input number, used as the base to compute the percentage.
// - max: The maximum value that the computed sample should not exceed.
// - percentage: The percentage (in range 0.0 to 1.0) to be applied to 'n'.
//
// Returns:
// - The computed sample as an unsigned integer, with consideration to the
// given constraints.
func SampleN(n int, max, percentage float64) uint {
if n <= 0 {
return 0
}
sample := float64(n) * percentage
if sample > max {
sample = max
}
return uint(math.Ceil(sample))
}