fix: restore proper respawn logic, fix frozen hypergraph post respawn, unsubscribe from bitmask previously missing

This commit is contained in:
Cassandra Heart 2026-02-25 21:28:16 -06:00
parent e8856d5042
commit d55e045f15
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
4 changed files with 27 additions and 10 deletions

View File

@ -132,6 +132,9 @@ func (hg *HypergraphCRDT) SetSelfPeerID(peerID string) {
func (hg *HypergraphCRDT) SetShutdownContext(ctx context.Context) {
hg.shutdownCtx = ctx
// Reopen the snapshot manager in case it was closed by a previous
// shutdown context (i.e. during in-process engine respawn).
hg.snapshotMgr.reopen()
go func() {
select {
case <-hg.shutdownCtx.Done():

View File

@ -462,6 +462,21 @@ func (m *snapshotManager) close() {
m.logger.Debug("snapshot manager closed")
}
// reopen resets the closed flag so the snapshot manager can accept new
// snapshots after a respawn. Any previously held snapshots were already
// released by close(), so we start with an empty generation list.
func (m *snapshotManager) reopen() {
m.mu.Lock()
defer m.mu.Unlock()
if !m.closed {
return
}
m.closed = false
m.generations = make([]*snapshotGeneration, 0, maxSnapshotGenerations)
m.logger.Debug("snapshot manager reopened")
}
func shardKeyString(sk tries.ShardKey) string {
buf := make([]byte, 0, len(sk.L1)+len(sk.L2))
buf = append(buf, sk.L1[:]...)

View File

@ -984,6 +984,8 @@ func (e *AppConsensusEngine) Stop(force bool) <-chan error {
e.pubsub.UnregisterValidator(e.getFrameMessageBitmask())
e.pubsub.Unsubscribe(e.getGlobalFrameMessageBitmask(), false)
e.pubsub.UnregisterValidator(e.getGlobalFrameMessageBitmask())
e.pubsub.Unsubscribe(e.getGlobalProverMessageBitmask(), false)
e.pubsub.UnregisterValidator(e.getGlobalProverMessageBitmask())
e.pubsub.Unsubscribe(e.getGlobalAlertMessageBitmask(), false)
e.pubsub.UnregisterValidator(e.getGlobalAlertMessageBitmask())
e.pubsub.Unsubscribe(e.getGlobalPeerInfoMessageBitmask(), false)

View File

@ -180,16 +180,13 @@ func (r *DataWorkerIPCServer) Respawn(
func (r *DataWorkerIPCServer) RespawnServer(filter []byte) error {
if r.appConsensusEngine != nil {
// Re-respawn: gracefully shut down the process for a clean restart.
// The master's spawn loop (manager.go) detects the exit and
// immediately restarts the worker process, giving it fresh
// memory, a clean pubsub mesh, and a ProverRegistry built
// from the current on-disk hypergraph state.
r.logger.Info("re-respawn requested, shutting down worker for clean restart",
zap.Uint32("core_id", r.coreId),
)
r.Stop()
return nil
r.logger.Info("respawning worker: stopping old engine")
if r.cancel != nil {
r.cancel()
}
<-r.appConsensusEngine.Stop(false)
r.appConsensusEngine = nil
r.logger.Info("respawning worker: old engine stopped")
}
if r.server != nil {
r.logger.Info("stopping server for respawn")