diff --git a/hypergraph/hypergraph.go b/hypergraph/hypergraph.go index 07bbd78..0ff615c 100644 --- a/hypergraph/hypergraph.go +++ b/hypergraph/hypergraph.go @@ -132,6 +132,9 @@ func (hg *HypergraphCRDT) SetSelfPeerID(peerID string) { func (hg *HypergraphCRDT) SetShutdownContext(ctx context.Context) { hg.shutdownCtx = ctx + // Reopen the snapshot manager in case it was closed by a previous + // shutdown context (i.e. during in-process engine respawn). + hg.snapshotMgr.reopen() go func() { select { case <-hg.shutdownCtx.Done(): diff --git a/hypergraph/snapshot_manager.go b/hypergraph/snapshot_manager.go index 3f0c2ac..ed63a07 100644 --- a/hypergraph/snapshot_manager.go +++ b/hypergraph/snapshot_manager.go @@ -462,6 +462,21 @@ func (m *snapshotManager) close() { m.logger.Debug("snapshot manager closed") } +// reopen resets the closed flag so the snapshot manager can accept new +// snapshots after a respawn. Any previously held snapshots were already +// released by close(), so we start with an empty generation list. +func (m *snapshotManager) reopen() { + m.mu.Lock() + defer m.mu.Unlock() + + if !m.closed { + return + } + m.closed = false + m.generations = make([]*snapshotGeneration, 0, maxSnapshotGenerations) + m.logger.Debug("snapshot manager reopened") +} + func shardKeyString(sk tries.ShardKey) string { buf := make([]byte, 0, len(sk.L1)+len(sk.L2)) buf = append(buf, sk.L1[:]...) diff --git a/node/consensus/app/app_consensus_engine.go b/node/consensus/app/app_consensus_engine.go index d2c2558..e14f217 100644 --- a/node/consensus/app/app_consensus_engine.go +++ b/node/consensus/app/app_consensus_engine.go @@ -984,6 +984,8 @@ func (e *AppConsensusEngine) Stop(force bool) <-chan error { e.pubsub.UnregisterValidator(e.getFrameMessageBitmask()) e.pubsub.Unsubscribe(e.getGlobalFrameMessageBitmask(), false) e.pubsub.UnregisterValidator(e.getGlobalFrameMessageBitmask()) + e.pubsub.Unsubscribe(e.getGlobalProverMessageBitmask(), false) + e.pubsub.UnregisterValidator(e.getGlobalProverMessageBitmask()) e.pubsub.Unsubscribe(e.getGlobalAlertMessageBitmask(), false) e.pubsub.UnregisterValidator(e.getGlobalAlertMessageBitmask()) e.pubsub.Unsubscribe(e.getGlobalPeerInfoMessageBitmask(), false) diff --git a/node/datarpc/data_worker_ipc_server.go b/node/datarpc/data_worker_ipc_server.go index 183d2dd..3e2d69b 100644 --- a/node/datarpc/data_worker_ipc_server.go +++ b/node/datarpc/data_worker_ipc_server.go @@ -180,16 +180,13 @@ func (r *DataWorkerIPCServer) Respawn( func (r *DataWorkerIPCServer) RespawnServer(filter []byte) error { if r.appConsensusEngine != nil { - // Re-respawn: gracefully shut down the process for a clean restart. - // The master's spawn loop (manager.go) detects the exit and - // immediately restarts the worker process, giving it fresh - // memory, a clean pubsub mesh, and a ProverRegistry built - // from the current on-disk hypergraph state. - r.logger.Info("re-respawn requested, shutting down worker for clean restart", - zap.Uint32("core_id", r.coreId), - ) - r.Stop() - return nil + r.logger.Info("respawning worker: stopping old engine") + if r.cancel != nil { + r.cancel() + } + <-r.appConsensusEngine.Stop(false) + r.appConsensusEngine = nil + r.logger.Info("respawning worker: old engine stopped") } if r.server != nil { r.logger.Info("stopping server for respawn")