mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 18:37:26 +08:00
Add missing hypergraph changes
This commit is contained in:
parent
78577392c3
commit
6c6480e84a
@ -100,6 +100,14 @@ func (hg *HypergraphCRDT) publishSnapshot(root []byte) {
|
||||
hg.snapshotMgr.publish(root)
|
||||
}
|
||||
|
||||
// PublishSnapshot announces a new snapshot generation with the given commit root.
|
||||
// This should be called after Commit() to make the new state available for sync.
|
||||
// Clients can request sync against this root using the expectedRoot parameter.
|
||||
// The snapshot manager retains a limited number of historical generations.
|
||||
func (hg *HypergraphCRDT) PublishSnapshot(root []byte) {
|
||||
hg.publishSnapshot(root)
|
||||
}
|
||||
|
||||
func (hg *HypergraphCRDT) cloneSetWithStore(
|
||||
set hypergraph.IdSet,
|
||||
store tries.TreeBackingStore,
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package hypergraph
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"sync"
|
||||
@ -11,6 +12,11 @@ import (
|
||||
"source.quilibrium.com/quilibrium/monorepo/types/tries"
|
||||
)
|
||||
|
||||
// maxSnapshotGenerations is the maximum number of historical snapshot
|
||||
// generations to retain. When a new root is published, older generations
|
||||
// beyond this limit are released.
|
||||
const maxSnapshotGenerations = 10
|
||||
|
||||
type snapshotHandle struct {
|
||||
store tries.TreeBackingStore
|
||||
release func()
|
||||
@ -152,12 +158,20 @@ func (h *snapshotHandle) isLeafMiss(key []byte) bool {
|
||||
return miss
|
||||
}
|
||||
|
||||
type snapshotManager struct {
|
||||
logger *zap.Logger
|
||||
store tries.TreeBackingStore
|
||||
mu sync.Mutex
|
||||
// snapshotGeneration represents a set of shard snapshots for a specific
|
||||
// commit root.
|
||||
type snapshotGeneration struct {
|
||||
root []byte
|
||||
handles map[string]*snapshotHandle
|
||||
handles map[string]*snapshotHandle // keyed by shard key
|
||||
}
|
||||
|
||||
type snapshotManager struct {
|
||||
logger *zap.Logger
|
||||
store tries.TreeBackingStore
|
||||
mu sync.Mutex
|
||||
// generations holds snapshot generations ordered from newest to oldest.
|
||||
// generations[0] is the current/latest generation.
|
||||
generations []*snapshotGeneration
|
||||
}
|
||||
|
||||
func newSnapshotManager(
|
||||
@ -165,9 +179,9 @@ func newSnapshotManager(
|
||||
store tries.TreeBackingStore,
|
||||
) *snapshotManager {
|
||||
return &snapshotManager{
|
||||
logger: logger,
|
||||
store: store,
|
||||
handles: make(map[string]*snapshotHandle),
|
||||
logger: logger,
|
||||
store: store,
|
||||
generations: make([]*snapshotGeneration, 0, maxSnapshotGenerations),
|
||||
}
|
||||
}
|
||||
|
||||
@ -175,39 +189,117 @@ func (m *snapshotManager) publish(root []byte) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
// Remove all handles from the map so new syncs get new handles.
|
||||
// Handles with active refs will be released when their last user calls release().
|
||||
// Handles with no active refs (only the initial ref from creation) are released now.
|
||||
for key, handle := range m.handles {
|
||||
delete(m.handles, key)
|
||||
if handle != nil {
|
||||
// releaseRef decrements the ref count. If this was the last ref
|
||||
// (i.e., no active sync sessions), the underlying DB is released.
|
||||
// If there are active sync sessions, they will release it when done.
|
||||
handle.releaseRef(m.logger)
|
||||
}
|
||||
}
|
||||
|
||||
m.root = nil
|
||||
if len(root) != 0 {
|
||||
m.root = append([]byte{}, root...)
|
||||
}
|
||||
|
||||
rootHex := ""
|
||||
if len(root) != 0 {
|
||||
rootHex = hex.EncodeToString(root)
|
||||
}
|
||||
m.logger.Debug("reset snapshot state", zap.String("root", rootHex))
|
||||
|
||||
// Check if this root already matches the current generation
|
||||
if len(m.generations) > 0 && bytes.Equal(m.generations[0].root, root) {
|
||||
m.logger.Debug(
|
||||
"publish called with current root, no change",
|
||||
zap.String("root", rootHex),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
// Create a new generation for this root
|
||||
newGen := &snapshotGeneration{
|
||||
handles: make(map[string]*snapshotHandle),
|
||||
}
|
||||
if len(root) != 0 {
|
||||
newGen.root = append([]byte{}, root...)
|
||||
}
|
||||
|
||||
// Prepend the new generation (newest first)
|
||||
m.generations = append([]*snapshotGeneration{newGen}, m.generations...)
|
||||
|
||||
// Release generations beyond the limit
|
||||
for len(m.generations) > maxSnapshotGenerations {
|
||||
oldGen := m.generations[len(m.generations)-1]
|
||||
m.generations = m.generations[:len(m.generations)-1]
|
||||
|
||||
// Release all handles in the old generation
|
||||
for key, handle := range oldGen.handles {
|
||||
delete(oldGen.handles, key)
|
||||
if handle != nil {
|
||||
handle.releaseRef(m.logger)
|
||||
}
|
||||
}
|
||||
|
||||
oldRootHex := ""
|
||||
if len(oldGen.root) != 0 {
|
||||
oldRootHex = hex.EncodeToString(oldGen.root)
|
||||
}
|
||||
m.logger.Debug(
|
||||
"released old snapshot generation",
|
||||
zap.String("root", oldRootHex),
|
||||
)
|
||||
}
|
||||
|
||||
m.logger.Debug(
|
||||
"published new snapshot generation",
|
||||
zap.String("root", rootHex),
|
||||
zap.Int("total_generations", len(m.generations)),
|
||||
)
|
||||
}
|
||||
|
||||
// acquire returns a snapshot handle for the given shard key. If expectedRoot
|
||||
// is provided and a matching generation has an existing snapshot for this shard,
|
||||
// that snapshot is returned. Otherwise, a new snapshot is created from the
|
||||
// current DB state and associated with the latest generation.
|
||||
//
|
||||
// Note: Historical snapshots are only available if they were created while that
|
||||
// generation was current. We cannot create a snapshot of past state retroactively.
|
||||
func (m *snapshotManager) acquire(
|
||||
shardKey tries.ShardKey,
|
||||
expectedRoot []byte,
|
||||
) *snapshotHandle {
|
||||
key := shardKeyString(shardKey)
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
if handle, ok := m.handles[key]; ok {
|
||||
if len(m.generations) == 0 {
|
||||
m.logger.Warn("no snapshot generations available")
|
||||
return nil
|
||||
}
|
||||
|
||||
// If expectedRoot is provided, look for an existing snapshot in that generation
|
||||
if len(expectedRoot) > 0 {
|
||||
for _, gen := range m.generations {
|
||||
if bytes.Equal(gen.root, expectedRoot) {
|
||||
// Found matching generation, check if it has a snapshot for this shard
|
||||
if handle, ok := gen.handles[key]; ok {
|
||||
m.logger.Debug(
|
||||
"found existing snapshot for expected root",
|
||||
zap.String("expected_root", hex.EncodeToString(expectedRoot)),
|
||||
)
|
||||
handle.acquire()
|
||||
return handle
|
||||
}
|
||||
// Generation exists but no snapshot for this shard yet
|
||||
m.logger.Info(
|
||||
"generation matches expected root but no snapshot exists, using latest",
|
||||
zap.String("expected_root", hex.EncodeToString(expectedRoot)),
|
||||
)
|
||||
break
|
||||
}
|
||||
}
|
||||
// No matching generation found
|
||||
if m.logger != nil {
|
||||
m.logger.Info(
|
||||
"no snapshot generation matches expected root, using latest",
|
||||
zap.String("expected_root", hex.EncodeToString(expectedRoot)),
|
||||
zap.String("latest_root", hex.EncodeToString(m.generations[0].root)),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Use the latest generation for new snapshots
|
||||
latestGen := m.generations[0]
|
||||
|
||||
// Check if we already have a handle for this shard in the latest generation
|
||||
if handle, ok := latestGen.handles[key]; ok {
|
||||
handle.acquire()
|
||||
return handle
|
||||
}
|
||||
@ -226,16 +318,27 @@ func (m *snapshotManager) acquire(
|
||||
return nil
|
||||
}
|
||||
|
||||
handle := newSnapshotHandle(key, storeSnapshot, release, m.root)
|
||||
handle := newSnapshotHandle(key, storeSnapshot, release, latestGen.root)
|
||||
// Acquire a ref for the caller. The handle is created with refs=1 (the owner ref
|
||||
// held by the snapshot manager), and this adds another ref for the sync session.
|
||||
// This ensures publish() can release the owner ref without closing the DB while
|
||||
// a sync is still using it.
|
||||
handle.acquire()
|
||||
m.handles[key] = handle
|
||||
latestGen.handles[key] = handle
|
||||
return handle
|
||||
}
|
||||
|
||||
// currentRoot returns the commit root of the latest snapshot generation.
|
||||
func (m *snapshotManager) currentRoot() []byte {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
if len(m.generations) == 0 {
|
||||
return nil
|
||||
}
|
||||
return append([]byte{}, m.generations[0].root...)
|
||||
}
|
||||
|
||||
func (m *snapshotManager) release(handle *snapshotHandle) {
|
||||
if handle == nil {
|
||||
return
|
||||
@ -245,8 +348,13 @@ func (m *snapshotManager) release(handle *snapshotHandle) {
|
||||
}
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if current, ok := m.handles[handle.key]; ok && current == handle {
|
||||
delete(m.handles, handle.key)
|
||||
|
||||
// Search all generations for this handle and remove it
|
||||
for _, gen := range m.generations {
|
||||
if current, ok := gen.handles[handle.key]; ok && current == handle {
|
||||
delete(gen.handles, handle.key)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -80,10 +80,15 @@ func (hg *HypergraphCRDT) HyperStream(
|
||||
// The caller (e.g. the client) must initiate the diff from its root.
|
||||
// After that, both sides exchange queries, branch info, and leaf updates until
|
||||
// their local trees are synchronized.
|
||||
//
|
||||
// If expectedRoot is provided, the server will attempt to use a snapshot with
|
||||
// a matching commit root. This allows the client to sync against a specific
|
||||
// known state rather than whatever the server's current state happens to be.
|
||||
func (hg *HypergraphCRDT) Sync(
|
||||
stream protobufs.HypergraphComparisonService_HyperStreamClient,
|
||||
shardKey tries.ShardKey,
|
||||
phaseSet protobufs.HypergraphPhaseSet,
|
||||
expectedRoot []byte,
|
||||
) (err error) {
|
||||
const localSyncKey = "local-sync"
|
||||
if !hg.syncController.TryEstablishSyncSession(localSyncKey) {
|
||||
@ -138,6 +143,7 @@ func (hg *HypergraphCRDT) Sync(
|
||||
Path: toInt32Slice(path),
|
||||
Commitment: set.GetTree().Commit(false),
|
||||
IncludeLeafData: false,
|
||||
ExpectedRoot: expectedRoot,
|
||||
},
|
||||
},
|
||||
}); err != nil {
|
||||
@ -2355,7 +2361,7 @@ func (hg *HypergraphCRDT) syncTreeServer(
|
||||
}
|
||||
|
||||
snapshotStart := time.Now()
|
||||
handle := hg.snapshotMgr.acquire(shardKey)
|
||||
handle := hg.snapshotMgr.acquire(shardKey, query.ExpectedRoot)
|
||||
if handle == nil {
|
||||
return errors.New("hypergraph shard snapshot unavailable")
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user