From 6c6480e84afc5027b84decdc620a0d1ffdca5c0b Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Mon, 22 Dec 2025 05:21:00 -0600 Subject: [PATCH] Add missing hypergraph changes --- hypergraph/hypergraph.go | 8 ++ hypergraph/snapshot_manager.go | 172 +++++++++++++++++++++++++++------ hypergraph/sync.go | 8 +- 3 files changed, 155 insertions(+), 33 deletions(-) diff --git a/hypergraph/hypergraph.go b/hypergraph/hypergraph.go index 1973ec2..7c48a4e 100644 --- a/hypergraph/hypergraph.go +++ b/hypergraph/hypergraph.go @@ -100,6 +100,14 @@ func (hg *HypergraphCRDT) publishSnapshot(root []byte) { hg.snapshotMgr.publish(root) } +// PublishSnapshot announces a new snapshot generation with the given commit root. +// This should be called after Commit() to make the new state available for sync. +// Clients can request sync against this root using the expectedRoot parameter. +// The snapshot manager retains a limited number of historical generations. +func (hg *HypergraphCRDT) PublishSnapshot(root []byte) { + hg.publishSnapshot(root) +} + func (hg *HypergraphCRDT) cloneSetWithStore( set hypergraph.IdSet, store tries.TreeBackingStore, diff --git a/hypergraph/snapshot_manager.go b/hypergraph/snapshot_manager.go index 90a2f6e..4ba90a7 100644 --- a/hypergraph/snapshot_manager.go +++ b/hypergraph/snapshot_manager.go @@ -1,6 +1,7 @@ package hypergraph import ( + "bytes" "encoding/hex" "fmt" "sync" @@ -11,6 +12,11 @@ import ( "source.quilibrium.com/quilibrium/monorepo/types/tries" ) +// maxSnapshotGenerations is the maximum number of historical snapshot +// generations to retain. When a new root is published, older generations +// beyond this limit are released. +const maxSnapshotGenerations = 10 + type snapshotHandle struct { store tries.TreeBackingStore release func() @@ -152,12 +158,20 @@ func (h *snapshotHandle) isLeafMiss(key []byte) bool { return miss } -type snapshotManager struct { - logger *zap.Logger - store tries.TreeBackingStore - mu sync.Mutex +// snapshotGeneration represents a set of shard snapshots for a specific +// commit root. +type snapshotGeneration struct { root []byte - handles map[string]*snapshotHandle + handles map[string]*snapshotHandle // keyed by shard key +} + +type snapshotManager struct { + logger *zap.Logger + store tries.TreeBackingStore + mu sync.Mutex + // generations holds snapshot generations ordered from newest to oldest. + // generations[0] is the current/latest generation. + generations []*snapshotGeneration } func newSnapshotManager( @@ -165,9 +179,9 @@ func newSnapshotManager( store tries.TreeBackingStore, ) *snapshotManager { return &snapshotManager{ - logger: logger, - store: store, - handles: make(map[string]*snapshotHandle), + logger: logger, + store: store, + generations: make([]*snapshotGeneration, 0, maxSnapshotGenerations), } } @@ -175,39 +189,117 @@ func (m *snapshotManager) publish(root []byte) { m.mu.Lock() defer m.mu.Unlock() - // Remove all handles from the map so new syncs get new handles. - // Handles with active refs will be released when their last user calls release(). - // Handles with no active refs (only the initial ref from creation) are released now. - for key, handle := range m.handles { - delete(m.handles, key) - if handle != nil { - // releaseRef decrements the ref count. If this was the last ref - // (i.e., no active sync sessions), the underlying DB is released. - // If there are active sync sessions, they will release it when done. - handle.releaseRef(m.logger) - } - } - - m.root = nil - if len(root) != 0 { - m.root = append([]byte{}, root...) - } - rootHex := "" if len(root) != 0 { rootHex = hex.EncodeToString(root) } - m.logger.Debug("reset snapshot state", zap.String("root", rootHex)) + + // Check if this root already matches the current generation + if len(m.generations) > 0 && bytes.Equal(m.generations[0].root, root) { + m.logger.Debug( + "publish called with current root, no change", + zap.String("root", rootHex), + ) + return + } + + // Create a new generation for this root + newGen := &snapshotGeneration{ + handles: make(map[string]*snapshotHandle), + } + if len(root) != 0 { + newGen.root = append([]byte{}, root...) + } + + // Prepend the new generation (newest first) + m.generations = append([]*snapshotGeneration{newGen}, m.generations...) + + // Release generations beyond the limit + for len(m.generations) > maxSnapshotGenerations { + oldGen := m.generations[len(m.generations)-1] + m.generations = m.generations[:len(m.generations)-1] + + // Release all handles in the old generation + for key, handle := range oldGen.handles { + delete(oldGen.handles, key) + if handle != nil { + handle.releaseRef(m.logger) + } + } + + oldRootHex := "" + if len(oldGen.root) != 0 { + oldRootHex = hex.EncodeToString(oldGen.root) + } + m.logger.Debug( + "released old snapshot generation", + zap.String("root", oldRootHex), + ) + } + + m.logger.Debug( + "published new snapshot generation", + zap.String("root", rootHex), + zap.Int("total_generations", len(m.generations)), + ) } +// acquire returns a snapshot handle for the given shard key. If expectedRoot +// is provided and a matching generation has an existing snapshot for this shard, +// that snapshot is returned. Otherwise, a new snapshot is created from the +// current DB state and associated with the latest generation. +// +// Note: Historical snapshots are only available if they were created while that +// generation was current. We cannot create a snapshot of past state retroactively. func (m *snapshotManager) acquire( shardKey tries.ShardKey, + expectedRoot []byte, ) *snapshotHandle { key := shardKeyString(shardKey) m.mu.Lock() defer m.mu.Unlock() - if handle, ok := m.handles[key]; ok { + if len(m.generations) == 0 { + m.logger.Warn("no snapshot generations available") + return nil + } + + // If expectedRoot is provided, look for an existing snapshot in that generation + if len(expectedRoot) > 0 { + for _, gen := range m.generations { + if bytes.Equal(gen.root, expectedRoot) { + // Found matching generation, check if it has a snapshot for this shard + if handle, ok := gen.handles[key]; ok { + m.logger.Debug( + "found existing snapshot for expected root", + zap.String("expected_root", hex.EncodeToString(expectedRoot)), + ) + handle.acquire() + return handle + } + // Generation exists but no snapshot for this shard yet + m.logger.Info( + "generation matches expected root but no snapshot exists, using latest", + zap.String("expected_root", hex.EncodeToString(expectedRoot)), + ) + break + } + } + // No matching generation found + if m.logger != nil { + m.logger.Info( + "no snapshot generation matches expected root, using latest", + zap.String("expected_root", hex.EncodeToString(expectedRoot)), + zap.String("latest_root", hex.EncodeToString(m.generations[0].root)), + ) + } + } + + // Use the latest generation for new snapshots + latestGen := m.generations[0] + + // Check if we already have a handle for this shard in the latest generation + if handle, ok := latestGen.handles[key]; ok { handle.acquire() return handle } @@ -226,16 +318,27 @@ func (m *snapshotManager) acquire( return nil } - handle := newSnapshotHandle(key, storeSnapshot, release, m.root) + handle := newSnapshotHandle(key, storeSnapshot, release, latestGen.root) // Acquire a ref for the caller. The handle is created with refs=1 (the owner ref // held by the snapshot manager), and this adds another ref for the sync session. // This ensures publish() can release the owner ref without closing the DB while // a sync is still using it. handle.acquire() - m.handles[key] = handle + latestGen.handles[key] = handle return handle } +// currentRoot returns the commit root of the latest snapshot generation. +func (m *snapshotManager) currentRoot() []byte { + m.mu.Lock() + defer m.mu.Unlock() + + if len(m.generations) == 0 { + return nil + } + return append([]byte{}, m.generations[0].root...) +} + func (m *snapshotManager) release(handle *snapshotHandle) { if handle == nil { return @@ -245,8 +348,13 @@ func (m *snapshotManager) release(handle *snapshotHandle) { } m.mu.Lock() defer m.mu.Unlock() - if current, ok := m.handles[handle.key]; ok && current == handle { - delete(m.handles, handle.key) + + // Search all generations for this handle and remove it + for _, gen := range m.generations { + if current, ok := gen.handles[handle.key]; ok && current == handle { + delete(gen.handles, handle.key) + return + } } } diff --git a/hypergraph/sync.go b/hypergraph/sync.go index 3f659a4..6d16d4f 100644 --- a/hypergraph/sync.go +++ b/hypergraph/sync.go @@ -80,10 +80,15 @@ func (hg *HypergraphCRDT) HyperStream( // The caller (e.g. the client) must initiate the diff from its root. // After that, both sides exchange queries, branch info, and leaf updates until // their local trees are synchronized. +// +// If expectedRoot is provided, the server will attempt to use a snapshot with +// a matching commit root. This allows the client to sync against a specific +// known state rather than whatever the server's current state happens to be. func (hg *HypergraphCRDT) Sync( stream protobufs.HypergraphComparisonService_HyperStreamClient, shardKey tries.ShardKey, phaseSet protobufs.HypergraphPhaseSet, + expectedRoot []byte, ) (err error) { const localSyncKey = "local-sync" if !hg.syncController.TryEstablishSyncSession(localSyncKey) { @@ -138,6 +143,7 @@ func (hg *HypergraphCRDT) Sync( Path: toInt32Slice(path), Commitment: set.GetTree().Commit(false), IncludeLeafData: false, + ExpectedRoot: expectedRoot, }, }, }); err != nil { @@ -2355,7 +2361,7 @@ func (hg *HypergraphCRDT) syncTreeServer( } snapshotStart := time.Now() - handle := hg.snapshotMgr.acquire(shardKey) + handle := hg.snapshotMgr.acquire(shardKey, query.ExpectedRoot) if handle == nil { return errors.New("hypergraph shard snapshot unavailable") }