ceremonyclient/lifecycle/supervisor.go
2025-11-20 21:46:58 -06:00

269 lines
6.0 KiB
Go

package lifecycle
import (
"context"
"errors"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"go.uber.org/atomic"
)
// Node describes one component in the graph.
type Node struct {
Name string
Deps []string // names this node depends on (parents)
Factory ComponentFactory
OnError OnError // the handler for this node
}
// Supervisor runs a DAG of nodes with policy-aware error propagation.
type Supervisor struct {
nodes map[string]*Node
parents map[string][]string
kids map[string][]string
// runtime
cancels map[string]context.CancelFunc
wg sync.WaitGroup
// decision requests from node wrappers
requests chan decisionReq
// suppress events that are just the fallout of our own cancels
suppress sync.Map // name -> struct{}
}
type decisionReq struct {
from string
err error
want ErrorHandlingBehavior // node's own OnError verdict
reply chan ErrorHandlingBehavior
}
func NewSupervisor(nodes []*Node) (*Supervisor, error) {
s := &Supervisor{
nodes: map[string]*Node{},
parents: map[string][]string{},
kids: map[string][]string{},
cancels: map[string]context.CancelFunc{},
requests: make(chan decisionReq, 64),
}
for _, n := range nodes {
if _, dup := s.nodes[n.Name]; dup {
return nil, fmt.Errorf("dup node %q", n.Name)
}
s.nodes[n.Name] = n
}
// build edges
for name, n := range s.nodes {
for _, p := range n.Deps {
if _, ok := s.nodes[p]; !ok {
return nil, fmt.Errorf("%s depends on unknown %s", name, p)
}
s.parents[name] = append(s.parents[name], p)
s.kids[p] = append(s.kids[p], name)
}
}
// cycle check via Kahn
if _, err := topoOrder(s.nodes, s.parents); err != nil {
return nil, err
}
return s, nil
}
func (s *Supervisor) Start(ctx context.Context) error {
ctx, stopSignals := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer stopSignals()
order, _ := topoOrder(s.nodes, s.parents)
// start in topo order so deps come up first
for _, name := range order {
n := s.nodes[name]
cctx, cancel := context.WithCancel(ctx)
s.cancels[name] = cancel
s.wg.Add(1)
go func(name string, n *Node, cctx context.Context) {
defer s.wg.Done()
// Wrap node's OnError to route decisions through supervisor.
handler := func(err error) ErrorHandlingBehavior {
want := ErrorShouldRestart
if n.OnError != nil {
want = n.OnError(err)
}
// ignore events we ourselves triggered via cancel
if errors.Is(err, context.Canceled) {
return ErrorShouldStop
}
reply := make(chan ErrorHandlingBehavior, 1)
s.requests <- decisionReq{
from: name,
err: err,
want: want,
reply: reply,
}
return <-reply
}
_ = RunComponent(cctx, n.Factory, handler)
}(name, n, cctx)
}
// coordinator loop
var shutdownAll atomic.Bool
for {
select {
case <-ctx.Done():
s.stopAll()
s.wg.Wait()
return ctx.Err()
case req := <-s.requests:
// Dedup if this node was targeted by a prior cascade
if _, silenced := s.suppress.Load(req.from); silenced {
req.reply <- ErrorShouldStop
continue
}
switch req.want {
case ErrorShouldRestart:
// no graph action; let RunComponent restart it
req.reply <- ErrorShouldRestart
case ErrorShouldStop:
s.stopSubtree(req.from) // stop node + descendants
req.reply <- ErrorShouldStop
case ErrorShouldStopParents:
s.stopAncestorsAndDesc(req.from)
req.reply <- ErrorShouldStop
case ErrorShouldShutdown:
// Let the child return promptly, then synchronously wait and exit.
req.reply <- ErrorShouldStop
// Return the precipitating error so callers can log/act on it.
return req.err
case ErrorShouldSpinHalt:
shutdownAll.Store(true)
s.stopAll()
req.reply <- ErrorShouldStop // child returns promptly
// Block the supervisor until SIGTERM (local wait). Ignore SIGINT.
term := make(chan os.Signal, 1)
signal.Notify(term, syscall.SIGTERM)
<-term
// After SIGTERM, join everything and return the original error.
s.wg.Wait()
return req.err
}
}
}
}
func topoOrder(
nodes map[string]*Node,
parents map[string][]string,
) ([]string, error) {
indeg := map[string]int{}
for name := range nodes {
indeg[name] = 0
}
for name := range nodes {
for range parents[name] {
indeg[name]++
}
}
q := make([]string, 0)
for n, d := range indeg {
if d == 0 {
q = append(q, n)
}
}
var order []string
for len(q) > 0 {
n := q[0]
q = q[1:]
order = append(order, n)
for _, kid := range kidsOf(n, nodes, parents) {
indeg[kid]--
if indeg[kid] == 0 {
q = append(q, kid)
}
}
}
if len(order) != len(nodes) {
return nil, fmt.Errorf("dependency cycle")
}
return order, nil
}
func kidsOf(n string, nodes map[string]*Node, parents map[string][]string) []string {
// build once in NewSupervisor; simplified here:
var out []string
for name := range nodes {
for _, p := range parents[name] {
if p == n {
out = append(out, name)
}
}
}
return out
}
func (s *Supervisor) collectDesc(start string, acc map[string]struct{}) {
for _, k := range s.kids[start] {
if _, seen := acc[k]; seen {
continue
}
acc[k] = struct{}{}
s.collectDesc(k, acc)
}
}
func (s *Supervisor) collectAnc(start string, acc map[string]struct{}) {
for _, p := range s.parents[start] {
if _, seen := acc[p]; seen {
continue
}
acc[p] = struct{}{}
s.collectAnc(p, acc)
}
}
func (s *Supervisor) stopAll() {
for name := range s.cancels {
s.suppress.Store(name, struct{}{})
}
for _, cancel := range s.cancels {
cancel()
}
}
func (s *Supervisor) stopSubtree(root string) {
victims := map[string]struct{}{root: {}}
s.collectDesc(root, victims)
for v := range victims {
s.suppress.Store(v, struct{}{})
}
for v := range victims {
if c := s.cancels[v]; c != nil {
c()
}
}
}
func (s *Supervisor) stopAncestorsAndDesc(root string) {
victims := map[string]struct{}{root: {}}
s.collectDesc(root, victims)
s.collectAnc(root, victims)
for v := range victims {
s.suppress.Store(v, struct{}{})
}
for v := range victims {
if c := s.cancels[v]; c != nil {
c()
}
}
}