fix: panic on shutdown, libp2p discovery picking inaccessible peers, coverage event check not in shutdown logic, amend app shard worker behavior to mirror global for prover root reconciliation

This commit is contained in:
Cassandra Heart 2026-02-20 18:53:24 -06:00
parent 5733047c3b
commit 706e28c1da
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
8 changed files with 216 additions and 70 deletions

View File

@ -135,7 +135,7 @@ func (hg *HypergraphCRDT) SetShutdownContext(ctx context.Context) {
go func() {
select {
case <-hg.shutdownCtx.Done():
hg.snapshotMgr.publish(nil)
hg.snapshotMgr.close()
}
}()
}

View File

@ -170,6 +170,7 @@ type snapshotManager struct {
logger *zap.Logger
store tries.TreeBackingStore
mu sync.Mutex
closed bool
// generations holds snapshot generations ordered from newest to oldest.
// generations[0] is the current/latest generation.
generations []*snapshotGeneration
@ -190,6 +191,10 @@ func (m *snapshotManager) publish(root []byte) {
m.mu.Lock()
defer m.mu.Unlock()
if m.closed {
return
}
rootHex := ""
if len(root) != 0 {
rootHex = hex.EncodeToString(root)
@ -287,7 +292,7 @@ func (m *snapshotManager) acquire(
m.mu.Lock()
defer m.mu.Unlock()
if len(m.generations) == 0 {
if m.closed || len(m.generations) == 0 {
m.logger.Warn("no snapshot generations available")
return nil
}
@ -425,6 +430,38 @@ func (m *snapshotManager) release(handle *snapshotHandle) {
}
}
// close releases all snapshot generations and their DB snapshots. After close,
// publish and acquire become no-ops. Shard snapshot handles held by active sync
// sessions remain valid (they are self-contained in-memory DBs) and will be
// released when the session ends.
func (m *snapshotManager) close() {
m.mu.Lock()
defer m.mu.Unlock()
if m.closed {
return
}
m.closed = true
for _, gen := range m.generations {
for key, handle := range gen.handles {
delete(gen.handles, key)
if handle != nil {
handle.releaseRef(m.logger)
}
}
if gen.dbSnapshot != nil {
if err := gen.dbSnapshot.Close(); err != nil {
m.logger.Warn("failed to close DB snapshot during shutdown", zap.Error(err))
}
gen.dbSnapshot = nil
}
}
m.generations = nil
m.logger.Debug("snapshot manager closed")
}
func shardKeyString(sk tries.ShardKey) string {
buf := make([]byte, 0, len(sk.L1)+len(sk.L2))
buf = append(buf, sk.L1[:]...)

View File

@ -155,7 +155,6 @@ type AppConsensusEngine struct {
globalProverRootVerifiedFrame atomic.Uint64
globalProverRootSynced atomic.Bool
globalProverSyncInProgress atomic.Bool
lastGlobalFrameHeader *protobufs.GlobalFrameHeader // previous frame for deferred root check
// Genesis initialization
genesisInitialized atomic.Bool
@ -986,22 +985,24 @@ func (e *AppConsensusEngine) handleGlobalProverRoot(
return
}
// Defer root check by one frame: when frame N arrives, check frame N-1's
// root. This matches the GlobalConsensusEngine which checks the parent
// frame's root during materialize(N-1), triggered when frame N certifies
// frame N-1. By the time frame N arrives, the master has had time to
// materialize N-2 (triggered when N-1 arrived), so the worker's tree
// should reflect post-materialize(N-2) state — exactly what frame N-1's
// ProverTreeCommitment was computed against.
prevHeader := e.lastGlobalFrameHeader
e.lastGlobalFrameHeader = frame.Header
frameNumber := frame.Header.FrameNumber
expectedProverRoot := frame.Header.ProverTreeCommitment
if prevHeader == nil {
if len(expectedProverRoot) == 0 {
return
}
frameNumber := prevHeader.FrameNumber
expectedProverRoot := prevHeader.ProverTreeCommitment
// Match the GlobalConsensusEngine's ordering: commit the tree first as a
// standalone step, then extract and verify the prover root. The global
// engine calls Commit(N) at the start of materialize(N) before checking
// the root. We mirror this by committing first, then extracting.
if _, err := e.hypergraph.Commit(frameNumber); err != nil {
e.logger.Warn(
"failed to commit hypergraph for global prover root check",
zap.Uint64("frame_number", frameNumber),
zap.Error(err),
)
}
localRoot, err := e.computeLocalGlobalProverRoot(frameNumber)
if err != nil {
@ -1012,11 +1013,11 @@ func (e *AppConsensusEngine) handleGlobalProverRoot(
)
e.globalProverRootSynced.Store(false)
e.globalProverRootVerifiedFrame.Store(0)
e.performBlockingGlobalHypersync(prevHeader.Prover, expectedProverRoot)
e.performBlockingGlobalHypersync(frame.Header.Prover, expectedProverRoot)
return
}
if len(localRoot) == 0 || len(expectedProverRoot) == 0 {
if len(localRoot) == 0 {
return
}
@ -1029,7 +1030,35 @@ func (e *AppConsensusEngine) handleGlobalProverRoot(
)
e.globalProverRootSynced.Store(false)
e.globalProverRootVerifiedFrame.Store(0)
e.performBlockingGlobalHypersync(prevHeader.Prover, expectedProverRoot)
e.performBlockingGlobalHypersync(frame.Header.Prover, expectedProverRoot)
// Re-compute local root after sync to verify convergence, matching
// the global engine's post-sync verification pattern.
newLocalRoot, newRootErr := e.computeLocalGlobalProverRoot(frameNumber)
if newRootErr != nil {
e.logger.Warn(
"failed to compute local global prover root after sync",
zap.Uint64("frame_number", frameNumber),
zap.Error(newRootErr),
)
} else if bytes.Equal(newLocalRoot, expectedProverRoot) {
e.logger.Info(
"global prover root converged after sync",
zap.Uint64("frame_number", frameNumber),
)
e.globalProverRootSynced.Store(true)
e.globalProverRootVerifiedFrame.Store(frameNumber)
if err := e.proverRegistry.Refresh(); err != nil {
e.logger.Warn("failed to refresh prover registry", zap.Error(err))
}
} else {
e.logger.Warn(
"global prover root still mismatched after sync",
zap.Uint64("frame_number", frameNumber),
zap.String("expected_root", hex.EncodeToString(expectedProverRoot)),
zap.String("post_sync_root", hex.EncodeToString(newLocalRoot)),
)
}
return
}

View File

@ -61,7 +61,9 @@ func (e *GlobalConsensusEngine) triggerCoverageCheckAsync(
return
}
e.coverageWg.Add(1)
go func() {
defer e.coverageWg.Done()
defer e.coverageCheckInProgress.Store(false)
if err := e.checkShardCoverage(frameNumber, frameProver); err != nil {

View File

@ -122,6 +122,7 @@ func (e *GlobalConsensusEngine) eventDistributorLoop(
// prover allocations in the registry.
e.reconcileWorkerAllocations(data.Frame.Header.FrameNumber, self)
e.checkExcessPendingJoins(self, data.Frame.Header.FrameNumber)
e.checkAndSubmitSeniorityMerge(self, data.Frame.Header.FrameNumber)
e.logAllocationStatusOnly(ctx, data, self, effectiveSeniority)
}
}
@ -668,47 +669,8 @@ func (e *GlobalConsensusEngine) evaluateForProposals(
)
}
// Standalone seniority merge: when no join was proposed this cycle but the
// prover exists with incorrect seniority, submit a seniority merge to fix
// it. This covers the case where all worker slots are filled and no new
// joins are being proposed.
if !joinProposedThisCycle && self != nil {
frameNum := data.Frame.Header.FrameNumber
mergeSeniority := e.estimateSeniorityFromConfig()
if mergeSeniority > self.Seniority {
lastJoin := e.lastJoinAttemptFrame.Load()
lastMerge := e.lastSeniorityMergeFrame.Load()
joinCooldownOk := lastJoin == 0 || frameNum-lastJoin >= 10
mergeCooldownOk := lastMerge == 0 || frameNum-lastMerge >= 10
if joinCooldownOk && mergeCooldownOk {
frame := e.GetFrame()
if frame != nil {
helpers, peerIds := e.buildMergeHelpers()
err := e.submitSeniorityMerge(
frame, helpers, mergeSeniority, peerIds,
)
if err != nil {
e.logger.Error(
"could not submit seniority merge",
zap.Error(err),
)
} else {
e.lastSeniorityMergeFrame.Store(frameNum)
}
}
} else {
e.logger.Debug(
"seniority merge deferred due to cooldown",
zap.Uint64("merge_seniority", mergeSeniority),
zap.Uint64("existing_seniority", self.Seniority),
zap.Uint64("last_join_frame", lastJoin),
zap.Uint64("last_merge_frame", lastMerge),
zap.Uint64("current_frame", frameNum),
)
}
}
if !joinProposedThisCycle {
e.checkAndSubmitSeniorityMerge(self, data.Frame.Header.FrameNumber)
}
if len(pendingFilters) != 0 {
@ -1082,12 +1044,23 @@ func (e *GlobalConsensusEngine) collectAllocationSnapshot(
decideDescriptors := []provers.ShardDescriptor{}
for _, shardInfo := range shards {
resp, err := e.getAppShardsFromProver(
client,
slices.Concat(shardInfo.L1, shardInfo.L2),
)
shardKey := slices.Concat(shardInfo.L1, shardInfo.L2)
var resp *protobufs.GetAppShardsResponse
var err error
for attempt := 0; attempt < 3; attempt++ {
resp, err = e.getAppShardsFromProver(client, shardKey)
if err == nil {
break
}
e.logger.Debug(
"retrying app shard retrieval",
zap.Int("attempt", attempt+1),
zap.Error(err),
)
time.Sleep(time.Duration(attempt+1) * 500 * time.Millisecond)
}
if err != nil {
e.logger.Debug("could not get app shards from prover", zap.Error(err))
e.logger.Debug("could not get app shards from prover after retries", zap.Error(err))
return nil, false
}
@ -1262,6 +1235,56 @@ func (e *GlobalConsensusEngine) logAllocationStatusOnly(
e.logAllocationStatus(snapshot)
}
// checkAndSubmitSeniorityMerge submits a seniority merge if the prover exists
// with incorrect seniority and cooldowns have elapsed. This is called both from
// evaluateForProposals (when no join was proposed) and from the "all workers
// allocated" path, ensuring seniority is corrected regardless of allocation state.
func (e *GlobalConsensusEngine) checkAndSubmitSeniorityMerge(
self *typesconsensus.ProverInfo,
frameNumber uint64,
) {
if self == nil {
return
}
mergeSeniority := e.estimateSeniorityFromConfig()
if mergeSeniority <= self.Seniority {
return
}
lastJoin := e.lastJoinAttemptFrame.Load()
lastMerge := e.lastSeniorityMergeFrame.Load()
joinCooldownOk := lastJoin == 0 || frameNumber-lastJoin >= 10
mergeCooldownOk := lastMerge == 0 || frameNumber-lastMerge >= 10
if joinCooldownOk && mergeCooldownOk {
frame := e.GetFrame()
if frame != nil {
helpers, peerIds := e.buildMergeHelpers()
err := e.submitSeniorityMerge(
frame, helpers, mergeSeniority, peerIds,
)
if err != nil {
e.logger.Error(
"could not submit seniority merge",
zap.Error(err),
)
} else {
e.lastSeniorityMergeFrame.Store(frameNumber)
}
}
} else {
e.logger.Debug(
"seniority merge deferred due to cooldown",
zap.Uint64("merge_seniority", mergeSeniority),
zap.Uint64("existing_seniority", self.Seniority),
zap.Uint64("last_join_frame", lastJoin),
zap.Uint64("last_merge_frame", lastMerge),
zap.Uint64("current_frame", frameNumber),
)
}
}
func (e *GlobalConsensusEngine) allocationContext() (
*typesconsensus.ProverInfo,
uint64,

View File

@ -206,6 +206,7 @@ type GlobalConsensusEngine struct {
lastShardActionFrame map[string]uint64
lastShardActionFrameMu sync.Mutex
coverageCheckInProgress atomic.Bool
coverageWg sync.WaitGroup
peerInfoDigestCache map[string]struct{}
peerInfoDigestCacheMu sync.Mutex
keyRegistryDigestCache map[string]struct{}
@ -1206,6 +1207,10 @@ func (e *GlobalConsensusEngine) Stop(force bool) <-chan error {
}
}
// Wait for any in-flight coverage check goroutine to finish before
// returning, so callers can safely close the Pebble DB.
e.coverageWg.Wait()
close(errChan)
return errChan
}

View File

@ -95,6 +95,7 @@ type BlossomSub struct {
manualReachability atomic.Pointer[bool]
p2pConfig config.P2PConfig
dht *dht.IpfsDHT
routingDiscovery *routing.RoutingDiscovery
coreId uint
configDir ConfigDir
}
@ -317,6 +318,7 @@ func NewBlossomSubWithHost(
peerID := host.ID()
bs.dht = kademliaDHT
bs.routingDiscovery = routingDiscovery
bs.ps = pubsub
bs.peerID = peerID
bs.h = host
@ -767,6 +769,7 @@ func NewBlossomSub(
peerID := h.ID()
bs.dht = kademliaDHT
bs.routingDiscovery = routingDiscovery
bs.ps = pubsub
bs.peerID = peerID
bs.h = h
@ -890,24 +893,60 @@ func (b *BlossomSub) background(ctx context.Context) {
func (b *BlossomSub) checkAndReconnectPeers(ctx context.Context) {
peerCount := len(b.h.Network().Peers())
if peerCount > 1 {
if peerCount >= b.p2pConfig.MinBootstrapPeers {
return
}
b.logger.Warn(
"no peers connected, attempting to re-bootstrap and discover",
zap.Duration("check_interval", b.p2pConfig.PeerReconnectCheckInterval),
"low peer count, attempting to re-bootstrap and discover",
zap.Int("current_peers", peerCount),
zap.Int("min_bootstrap_peers", b.p2pConfig.MinBootstrapPeers),
)
// Re-bootstrap the DHT to refresh the routing table. At startup,
// kademliaDHT.Bootstrap() populates the routing table by connecting to
// bootstrap peers. Without calling it again here, the routing table can
// go empty after all peers disconnect, making FindPeers unable to
// discover anyone — leaving the node permanently stuck.
if b.dht != nil {
if err := b.dht.Bootstrap(ctx); err != nil {
b.logger.Error("DHT re-bootstrap failed", zap.Error(err))
}
}
// Re-advertise so other peers can find us through the DHT.
if b.routingDiscovery != nil {
util.Advertise(
ctx,
b.routingDiscovery,
getNetworkNamespace(b.p2pConfig.Network),
)
}
// Clear peerstore addresses for disconnected peers so we don't keep
// dialing stale/invalid addresses that were added in previous attempts.
for _, p := range b.h.Peerstore().Peers() {
if p == b.h.ID() {
continue
}
if b.h.Network().Connectedness(p) != network.Connected &&
b.h.Network().Connectedness(p) != network.Limited {
b.h.Peerstore().ClearAddrs(p)
}
}
if err := b.DiscoverPeers(ctx); err != nil {
b.logger.Error("peer reconnect failed", zap.Error(err))
}
newCount := len(b.h.Network().Peers())
if newCount > 1 {
if newCount >= b.p2pConfig.MinBootstrapPeers {
b.logger.Info("peer reconnect succeeded", zap.Int("peers", newCount))
} else {
b.logger.Warn("peer reconnect: still no peers found, will retry at next interval")
b.logger.Warn(
"peer reconnect: still low peer count, will retry at next interval",
zap.Int("peers", newCount),
)
}
}

View File

@ -11,6 +11,8 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"go.uber.org/zap"
)
@ -84,7 +86,16 @@ func (pc *peerConnector) connectToPeer(
return
}
pc.host.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.AddressTTL)
routable := ma.FilterAddrs(p.Addrs, func(a ma.Multiaddr) bool {
pub, err := manet.IsPublicAddr(a)
return pub && err == nil
})
if len(routable) == 0 {
atomic.AddUint32(failure, 1)
return
}
pc.host.Peerstore().AddAddrs(p.ID, routable, peerstore.AddressTTL)
conn, err := pc.host.Network().DialPeer(ctx, p.ID)
if err != nil {