From b0cb9daebe0d85b5b845de46222d57d9cb3c4471 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Sun, 22 Feb 2026 20:53:55 -0600 Subject: [PATCH] fix: worker manager refreshes the filter on allocation, snapshots blocking close on shutdown --- hypergraph/hypergraph.go | 7 +++++++ hypergraph/sync_client_driven.go | 1 - node/consensus/app/factory.go | 9 +++++++++ node/consensus/global/event_distributor_test.go | 3 +++ node/consensus/global/global_consensus_engine.go | 7 +++++++ node/consensus/provers/proposer.go | 14 ++++++++++++++ node/consensus/provers/proposer_test.go | 3 +++ node/datarpc/data_worker_ipc_server.go | 5 +++-- node/worker/manager.go | 4 ++++ types/worker/manager.go | 1 + 10 files changed, 51 insertions(+), 3 deletions(-) diff --git a/hypergraph/hypergraph.go b/hypergraph/hypergraph.go index 7398048..07bbd78 100644 --- a/hypergraph/hypergraph.go +++ b/hypergraph/hypergraph.go @@ -140,6 +140,13 @@ func (hg *HypergraphCRDT) SetShutdownContext(ctx context.Context) { }() } +// CloseSnapshots synchronously releases all snapshot generations and their DB +// snapshots. This must be called before closing the underlying Pebble database +// to avoid dangling snapshot warnings. It is idempotent. +func (hg *HypergraphCRDT) CloseSnapshots() { + hg.snapshotMgr.close() +} + func (hg *HypergraphCRDT) contextWithShutdown( parent context.Context, ) (context.Context, context.CancelFunc) { diff --git a/hypergraph/sync_client_driven.go b/hypergraph/sync_client_driven.go index 9eb92ec..232be42 100644 --- a/hypergraph/sync_client_driven.go +++ b/hypergraph/sync_client_driven.go @@ -404,7 +404,6 @@ func (hg *HypergraphCRDT) handleGetLeaves( Size: leaf.Size.FillBytes(make([]byte, 32)), } - // Load underlying vertex tree if available (use snapshot store for consistency) vtree, err := session.store.LoadVertexTree(leaf.Key) if err == nil && vtree != nil { data, err := tries.SerializeNonLazyTree(vtree) diff --git a/node/consensus/app/factory.go b/node/consensus/app/factory.go index 8887d7a..8b99867 100644 --- a/node/consensus/app/factory.go +++ b/node/consensus/app/factory.go @@ -112,6 +112,15 @@ func NewAppConsensusEngineFactory( } } +// CloseSnapshots synchronously closes the hypergraph snapshot manager. Call +// this before closing the underlying database to ensure no Pebble snapshots +// remain open. +func (f *AppConsensusEngineFactory) CloseSnapshots() { + if closer, ok := f.hypergraph.(interface{ CloseSnapshots() }); ok { + closer.CloseSnapshots() + } +} + // CreateAppConsensusEngine creates a new AppConsensusEngine func (f *AppConsensusEngineFactory) CreateAppConsensusEngine( appAddress []byte, diff --git a/node/consensus/global/event_distributor_test.go b/node/consensus/global/event_distributor_test.go index b6bd03f..a59c7ea 100644 --- a/node/consensus/global/event_distributor_test.go +++ b/node/consensus/global/event_distributor_test.go @@ -66,6 +66,9 @@ func (m *mockWorkerManager) ProposeAllocations(coreIds []uint, filters [][]byte) func (m *mockWorkerManager) DecideAllocations(reject [][]byte, confirm [][]byte) error { return nil } +func (m *mockWorkerManager) RespawnWorker(coreId uint, filter []byte) error { + return nil +} func (m *mockWorkerManager) RangeWorkers() ([]*store.WorkerInfo, error) { result := make([]*store.WorkerInfo, 0, len(m.workers)) for _, w := range m.workers { diff --git a/node/consensus/global/global_consensus_engine.go b/node/consensus/global/global_consensus_engine.go index 2a71e86..f8b5bd4 100644 --- a/node/consensus/global/global_consensus_engine.go +++ b/node/consensus/global/global_consensus_engine.go @@ -1217,6 +1217,13 @@ func (e *GlobalConsensusEngine) Stop(force bool) <-chan error { // fires, so the goroutine will always complete after shutdown. e.coverageWg.Wait() + // Synchronously close the snapshot manager so no Pebble snapshots remain + // open when the database is closed. The async goroutine chain from + // SetShutdownContext may not have completed yet. + if closer, ok := e.hyperSync.(interface{ CloseSnapshots() }); ok { + closer.CloseSnapshots() + } + close(errChan) return errChan } diff --git a/node/consensus/provers/proposer.go b/node/consensus/provers/proposer.go index d30bb91..2d00426 100644 --- a/node/consensus/provers/proposer.go +++ b/node/consensus/provers/proposer.go @@ -301,6 +301,20 @@ func (m *Manager) persistPlannedFilters( zap.Uint("core_id", info.CoreId), zap.Error(err), ) + continue + } + + m.logger.Info( + "reassigning worker to new filter", + zap.Uint("core_id", info.CoreId), + zap.String("filter", hex.EncodeToString(filterCopy)), + ) + if err := m.workerMgr.RespawnWorker(info.CoreId, filterCopy); err != nil { + m.logger.Warn( + "failed to respawn worker with new filter", + zap.Uint("core_id", info.CoreId), + zap.Error(err), + ) } } } diff --git a/node/consensus/provers/proposer_test.go b/node/consensus/provers/proposer_test.go index a5aba0a..fa27ab4 100644 --- a/node/consensus/provers/proposer_test.go +++ b/node/consensus/provers/proposer_test.go @@ -66,6 +66,9 @@ func (m *mockWorkerManager) Start(ctx context.Context) error { func (m *mockWorkerManager) Stop() error { panic("unimplemented") } +func (m *mockWorkerManager) RespawnWorker(coreId uint, filter []byte) error { + return nil +} func (m *mockWorkerManager) RangeWorkers() ([]*store.WorkerInfo, error) { out := make([]*store.WorkerInfo, len(m.workers)) diff --git a/node/datarpc/data_worker_ipc_server.go b/node/datarpc/data_worker_ipc_server.go index 4f59d09..c019097 100644 --- a/node/datarpc/data_worker_ipc_server.go +++ b/node/datarpc/data_worker_ipc_server.go @@ -133,8 +133,8 @@ func (r *DataWorkerIPCServer) Start() error { func (r *DataWorkerIPCServer) Stop() error { r.logger.Info("stopping server gracefully") - // Stop the app consensus engine first so its snapshot manager releases - // any open Pebble snapshots before we close the database. + // Stop the app consensus engine first, then synchronously close the + // snapshot manager so no Pebble snapshots remain when the database closes. if r.appConsensusEngine != nil { if r.cancel != nil { r.cancel() @@ -142,6 +142,7 @@ func (r *DataWorkerIPCServer) Stop() error { <-r.appConsensusEngine.Stop(false) r.appConsensusEngine = nil } + r.appConsensusEngineFactory.CloseSnapshots() r.pubsub.Close() if r.server != nil { diff --git a/node/worker/manager.go b/node/worker/manager.go index 10ada64..d60a598 100644 --- a/node/worker/manager.go +++ b/node/worker/manager.go @@ -466,6 +466,10 @@ func (w *WorkerManager) AllocateWorker(coreId uint, filter []byte) error { return nil } +func (w *WorkerManager) RespawnWorker(coreId uint, filter []byte) error { + return w.respawnWorker(coreId, filter) +} + func (w *WorkerManager) DeallocateWorker(coreId uint) error { timer := prometheus.NewTimer( workerOperationDuration.WithLabelValues("deallocate"), diff --git a/types/worker/manager.go b/types/worker/manager.go index b9523de..d407843 100644 --- a/types/worker/manager.go +++ b/types/worker/manager.go @@ -18,4 +18,5 @@ type WorkerManager interface { ProposeAllocations(coreIds []uint, filters [][]byte) error DecideAllocations(reject [][]byte, confirm [][]byte) error RangeWorkers() ([]*store.WorkerInfo, error) + RespawnWorker(coreId uint, filter []byte) error }