fix: worker manager refreshes the filter on allocation, snapshots blocking close on shutdown

This commit is contained in:
Cassandra Heart 2026-02-22 20:53:55 -06:00
parent 06beed7511
commit b0cb9daebe
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
10 changed files with 51 additions and 3 deletions

View File

@ -140,6 +140,13 @@ func (hg *HypergraphCRDT) SetShutdownContext(ctx context.Context) {
}()
}
// CloseSnapshots synchronously releases all snapshot generations and their DB
// snapshots. This must be called before closing the underlying Pebble database
// to avoid dangling snapshot warnings. It is idempotent.
func (hg *HypergraphCRDT) CloseSnapshots() {
hg.snapshotMgr.close()
}
func (hg *HypergraphCRDT) contextWithShutdown(
parent context.Context,
) (context.Context, context.CancelFunc) {

View File

@ -404,7 +404,6 @@ func (hg *HypergraphCRDT) handleGetLeaves(
Size: leaf.Size.FillBytes(make([]byte, 32)),
}
// Load underlying vertex tree if available (use snapshot store for consistency)
vtree, err := session.store.LoadVertexTree(leaf.Key)
if err == nil && vtree != nil {
data, err := tries.SerializeNonLazyTree(vtree)

View File

@ -112,6 +112,15 @@ func NewAppConsensusEngineFactory(
}
}
// CloseSnapshots synchronously closes the hypergraph snapshot manager. Call
// this before closing the underlying database to ensure no Pebble snapshots
// remain open.
func (f *AppConsensusEngineFactory) CloseSnapshots() {
if closer, ok := f.hypergraph.(interface{ CloseSnapshots() }); ok {
closer.CloseSnapshots()
}
}
// CreateAppConsensusEngine creates a new AppConsensusEngine
func (f *AppConsensusEngineFactory) CreateAppConsensusEngine(
appAddress []byte,

View File

@ -66,6 +66,9 @@ func (m *mockWorkerManager) ProposeAllocations(coreIds []uint, filters [][]byte)
func (m *mockWorkerManager) DecideAllocations(reject [][]byte, confirm [][]byte) error {
return nil
}
func (m *mockWorkerManager) RespawnWorker(coreId uint, filter []byte) error {
return nil
}
func (m *mockWorkerManager) RangeWorkers() ([]*store.WorkerInfo, error) {
result := make([]*store.WorkerInfo, 0, len(m.workers))
for _, w := range m.workers {

View File

@ -1217,6 +1217,13 @@ func (e *GlobalConsensusEngine) Stop(force bool) <-chan error {
// fires, so the goroutine will always complete after shutdown.
e.coverageWg.Wait()
// Synchronously close the snapshot manager so no Pebble snapshots remain
// open when the database is closed. The async goroutine chain from
// SetShutdownContext may not have completed yet.
if closer, ok := e.hyperSync.(interface{ CloseSnapshots() }); ok {
closer.CloseSnapshots()
}
close(errChan)
return errChan
}

View File

@ -301,6 +301,20 @@ func (m *Manager) persistPlannedFilters(
zap.Uint("core_id", info.CoreId),
zap.Error(err),
)
continue
}
m.logger.Info(
"reassigning worker to new filter",
zap.Uint("core_id", info.CoreId),
zap.String("filter", hex.EncodeToString(filterCopy)),
)
if err := m.workerMgr.RespawnWorker(info.CoreId, filterCopy); err != nil {
m.logger.Warn(
"failed to respawn worker with new filter",
zap.Uint("core_id", info.CoreId),
zap.Error(err),
)
}
}
}

View File

@ -66,6 +66,9 @@ func (m *mockWorkerManager) Start(ctx context.Context) error {
func (m *mockWorkerManager) Stop() error {
panic("unimplemented")
}
func (m *mockWorkerManager) RespawnWorker(coreId uint, filter []byte) error {
return nil
}
func (m *mockWorkerManager) RangeWorkers() ([]*store.WorkerInfo, error) {
out := make([]*store.WorkerInfo, len(m.workers))

View File

@ -133,8 +133,8 @@ func (r *DataWorkerIPCServer) Start() error {
func (r *DataWorkerIPCServer) Stop() error {
r.logger.Info("stopping server gracefully")
// Stop the app consensus engine first so its snapshot manager releases
// any open Pebble snapshots before we close the database.
// Stop the app consensus engine first, then synchronously close the
// snapshot manager so no Pebble snapshots remain when the database closes.
if r.appConsensusEngine != nil {
if r.cancel != nil {
r.cancel()
@ -142,6 +142,7 @@ func (r *DataWorkerIPCServer) Stop() error {
<-r.appConsensusEngine.Stop(false)
r.appConsensusEngine = nil
}
r.appConsensusEngineFactory.CloseSnapshots()
r.pubsub.Close()
if r.server != nil {

View File

@ -466,6 +466,10 @@ func (w *WorkerManager) AllocateWorker(coreId uint, filter []byte) error {
return nil
}
func (w *WorkerManager) RespawnWorker(coreId uint, filter []byte) error {
return w.respawnWorker(coreId, filter)
}
func (w *WorkerManager) DeallocateWorker(coreId uint) error {
timer := prometheus.NewTimer(
workerOperationDuration.WithLabelValues("deallocate"),

View File

@ -18,4 +18,5 @@ type WorkerManager interface {
ProposeAllocations(coreIds []uint, filters [][]byte) error
DecideAllocations(reject [][]byte, confirm [][]byte) error
RangeWorkers() ([]*store.WorkerInfo, error)
RespawnWorker(coreId uint, filter []byte) error
}