mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 10:27:26 +08:00
resolve sync issue, remove raw sync
This commit is contained in:
parent
e8eba1fbe1
commit
7d02708f4f
@ -277,22 +277,23 @@ func (m *snapshotManager) acquire(
|
||||
handle.acquire()
|
||||
return handle
|
||||
}
|
||||
// Generation exists but no snapshot for this shard yet
|
||||
m.logger.Info(
|
||||
"generation matches expected root but no snapshot exists, using latest",
|
||||
// Generation exists but no snapshot for this shard yet - reject
|
||||
m.logger.Warn(
|
||||
"generation matches expected root but no snapshot exists, rejecting sync request",
|
||||
zap.String("expected_root", hex.EncodeToString(expectedRoot)),
|
||||
)
|
||||
break
|
||||
return nil
|
||||
}
|
||||
}
|
||||
// No matching generation found
|
||||
// No matching generation found - reject instead of falling back to latest
|
||||
if m.logger != nil {
|
||||
m.logger.Info(
|
||||
"no snapshot generation matches expected root, using latest",
|
||||
m.logger.Warn(
|
||||
"no snapshot generation matches expected root, rejecting sync request",
|
||||
zap.String("expected_root", hex.EncodeToString(expectedRoot)),
|
||||
zap.String("latest_root", hex.EncodeToString(m.generations[0].root)),
|
||||
)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Use the latest generation for new snapshots
|
||||
|
||||
@ -563,14 +563,6 @@ func (s *streamManager) updateActivity() {
|
||||
s.lastSent = time.Now()
|
||||
}
|
||||
|
||||
type rawVertexSaver interface {
|
||||
SaveVertexTreeRaw(
|
||||
txn tries.TreeBackingStoreTransaction,
|
||||
id []byte,
|
||||
data []byte,
|
||||
) error
|
||||
}
|
||||
|
||||
type vertexTreeDeleter interface {
|
||||
DeleteVertexTree(
|
||||
txn tries.TreeBackingStoreTransaction,
|
||||
@ -603,488 +595,6 @@ func leafAckTimeout(count uint64) time.Duration {
|
||||
return timeout
|
||||
}
|
||||
|
||||
func shouldUseRawSync(phaseSet protobufs.HypergraphPhaseSet) bool {
|
||||
return phaseSet == protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS
|
||||
}
|
||||
|
||||
func keyWithinCoveredPrefix(key []byte, prefix []int) bool {
|
||||
if len(prefix) == 0 {
|
||||
return true
|
||||
}
|
||||
path := tries.GetFullPath(key)
|
||||
if len(path) < len(prefix) {
|
||||
return false
|
||||
}
|
||||
for i, nib := range prefix {
|
||||
if path[i] != nib {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// rawShardSync performs a full raw sync of all leaves from server to client.
|
||||
// This iterates directly over the database, bypassing in-memory tree caching
|
||||
// to ensure all leaves are sent even if the in-memory tree is stale.
|
||||
func (s *streamManager) rawShardSync(
|
||||
shardKey tries.ShardKey,
|
||||
phaseSet protobufs.HypergraphPhaseSet,
|
||||
incomingLeaves <-chan *protobufs.HypergraphComparison,
|
||||
coveredPrefix []int32,
|
||||
) error {
|
||||
shardHex := hex.EncodeToString(shardKey.L2[:])
|
||||
s.logger.Info(
|
||||
"SERVER: starting raw shard sync (direct DB iteration)",
|
||||
zap.String("shard_key", shardHex),
|
||||
)
|
||||
start := time.Now()
|
||||
prefix := toIntSlice(coveredPrefix)
|
||||
|
||||
// Determine set and phase type strings
|
||||
setType := string(hypergraph.VertexAtomType)
|
||||
phaseType := string(hypergraph.AddsPhaseType)
|
||||
switch phaseSet {
|
||||
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS:
|
||||
setType = string(hypergraph.VertexAtomType)
|
||||
phaseType = string(hypergraph.AddsPhaseType)
|
||||
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES:
|
||||
setType = string(hypergraph.VertexAtomType)
|
||||
phaseType = string(hypergraph.RemovesPhaseType)
|
||||
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS:
|
||||
setType = string(hypergraph.HyperedgeAtomType)
|
||||
phaseType = string(hypergraph.AddsPhaseType)
|
||||
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES:
|
||||
setType = string(hypergraph.HyperedgeAtomType)
|
||||
phaseType = string(hypergraph.RemovesPhaseType)
|
||||
}
|
||||
|
||||
// Get raw leaf iterator from the database
|
||||
iter, err := s.hypergraphStore.IterateRawLeaves(setType, phaseType, shardKey)
|
||||
if err != nil {
|
||||
s.logger.Error(
|
||||
"SERVER: failed to create raw leaf iterator",
|
||||
zap.String("shard_key", shardHex),
|
||||
zap.Error(err),
|
||||
)
|
||||
return errors.Wrap(err, "raw shard sync")
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
// First pass: count leaves
|
||||
var count uint64
|
||||
for valid := iter.First(); valid; valid = iter.Next() {
|
||||
leaf, err := iter.Leaf()
|
||||
if err != nil {
|
||||
// Skip non-leaf nodes (branches)
|
||||
continue
|
||||
}
|
||||
if leaf != nil && keyWithinCoveredPrefix(leaf.Key, prefix) {
|
||||
count++
|
||||
}
|
||||
}
|
||||
|
||||
s.logger.Info(
|
||||
"SERVER: raw sync sending metadata",
|
||||
zap.String("shard_key", shardHex),
|
||||
zap.Uint64("leaf_count", count),
|
||||
)
|
||||
|
||||
// Send metadata with leaf count
|
||||
if err := s.stream.Send(&protobufs.HypergraphComparison{
|
||||
Payload: &protobufs.HypergraphComparison_Metadata{
|
||||
Metadata: &protobufs.HypersyncMetadata{Leaves: count},
|
||||
},
|
||||
}); err != nil {
|
||||
return errors.Wrap(err, "raw shard sync: send metadata")
|
||||
}
|
||||
|
||||
// Create new iterator for sending (previous one is exhausted)
|
||||
iter.Close()
|
||||
iter, err = s.hypergraphStore.IterateRawLeaves(setType, phaseType, shardKey)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "raw shard sync: recreate iterator")
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
// Second pass: send leaves
|
||||
var sent uint64
|
||||
for valid := iter.First(); valid; valid = iter.Next() {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return s.ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
leaf, err := iter.Leaf()
|
||||
if err != nil {
|
||||
// Skip non-leaf nodes
|
||||
continue
|
||||
}
|
||||
if leaf == nil {
|
||||
continue
|
||||
}
|
||||
if !keyWithinCoveredPrefix(leaf.Key, prefix) {
|
||||
continue
|
||||
}
|
||||
|
||||
update := &protobufs.LeafData{
|
||||
Key: leaf.Key,
|
||||
Value: leaf.Value,
|
||||
HashTarget: leaf.HashTarget,
|
||||
Size: leaf.Size,
|
||||
UnderlyingData: leaf.UnderlyingData,
|
||||
}
|
||||
|
||||
msg := &protobufs.HypergraphComparison{
|
||||
Payload: &protobufs.HypergraphComparison_LeafData{
|
||||
LeafData: update,
|
||||
},
|
||||
}
|
||||
|
||||
if err := s.stream.Send(msg); err != nil {
|
||||
return errors.Wrap(err, "raw shard sync: send leaf")
|
||||
}
|
||||
|
||||
sent++
|
||||
// Update activity periodically to prevent idle timeout
|
||||
if sent%100 == 0 {
|
||||
s.updateActivity()
|
||||
}
|
||||
if sent%1000 == 0 {
|
||||
s.logger.Debug(
|
||||
"SERVER: raw sync progress",
|
||||
zap.Uint64("sent", sent),
|
||||
zap.Uint64("total", count),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
s.logger.Info(
|
||||
"SERVER: raw sync sent all leaves, waiting for ack",
|
||||
zap.String("shard_key", shardHex),
|
||||
zap.Uint64("sent", sent),
|
||||
)
|
||||
|
||||
// Wait for acknowledgment
|
||||
timeoutTimer := time.NewTimer(leafAckTimeout(count))
|
||||
defer timeoutTimer.Stop()
|
||||
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return errors.Wrap(s.ctx.Err(), "raw shard sync: wait ack")
|
||||
case msg, ok := <-incomingLeaves:
|
||||
if !ok {
|
||||
return errors.Wrap(errors.New("channel closed"), "raw shard sync: wait ack")
|
||||
}
|
||||
meta := msg.GetMetadata()
|
||||
if meta == nil {
|
||||
return errors.Wrap(errors.New("expected metadata ack"), "raw shard sync: wait ack")
|
||||
}
|
||||
if meta.Leaves != count {
|
||||
return errors.Wrap(
|
||||
fmt.Errorf("ack mismatch: expected %d, got %d", count, meta.Leaves),
|
||||
"raw shard sync: wait ack",
|
||||
)
|
||||
}
|
||||
case <-timeoutTimer.C:
|
||||
return errors.Wrap(errors.New("timeout waiting for ack"), "raw shard sync")
|
||||
}
|
||||
|
||||
s.logger.Info(
|
||||
"SERVER: raw shard sync completed",
|
||||
zap.String("shard_key", shardHex),
|
||||
zap.Uint64("leaves_sent", sent),
|
||||
zap.Duration("duration", time.Since(start)),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
// receiveRawShardSync receives a full raw sync of all leaves from server.
|
||||
// It uses tree insertion to properly build the tree structure on the client.
|
||||
func (s *streamManager) receiveRawShardSync(
|
||||
incomingLeaves <-chan *protobufs.HypergraphComparison,
|
||||
) error {
|
||||
start := time.Now()
|
||||
s.logger.Info("CLIENT: starting receiveRawShardSync")
|
||||
|
||||
expectedLeaves, err := s.awaitRawLeafMetadata(incomingLeaves)
|
||||
if err != nil {
|
||||
s.logger.Error("CLIENT: failed to receive metadata", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
s.logger.Info(
|
||||
"CLIENT: received metadata",
|
||||
zap.Uint64("expected_leaves", expectedLeaves),
|
||||
)
|
||||
|
||||
var txn tries.TreeBackingStoreTransaction
|
||||
var processed uint64
|
||||
seenKeys := make(map[string]struct{})
|
||||
for processed < expectedLeaves {
|
||||
if processed%100 == 0 {
|
||||
if txn != nil {
|
||||
if err := txn.Commit(); err != nil {
|
||||
return errors.Wrap(err, "receive raw shard sync")
|
||||
}
|
||||
}
|
||||
txn, err = s.hypergraphStore.NewTransaction(false)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "receive raw shard sync")
|
||||
}
|
||||
}
|
||||
|
||||
leafMsg, err := s.awaitLeafData(incomingLeaves)
|
||||
if err != nil {
|
||||
if txn != nil {
|
||||
txn.Abort()
|
||||
}
|
||||
s.logger.Error(
|
||||
"CLIENT: failed to receive leaf",
|
||||
zap.Uint64("processed", processed),
|
||||
zap.Uint64("expected", expectedLeaves),
|
||||
zap.Error(err),
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
// Deserialize the atom from the raw value
|
||||
theirs := AtomFromBytes(leafMsg.Value)
|
||||
if theirs == nil {
|
||||
if txn != nil {
|
||||
txn.Abort()
|
||||
}
|
||||
return errors.Wrap(
|
||||
errors.New("invalid atom"),
|
||||
"receive raw shard sync",
|
||||
)
|
||||
}
|
||||
|
||||
// Persist underlying vertex tree data if present
|
||||
if len(leafMsg.UnderlyingData) > 0 {
|
||||
if saver, ok := s.hypergraphStore.(rawVertexSaver); ok {
|
||||
if err := saver.SaveVertexTreeRaw(
|
||||
txn,
|
||||
leafMsg.Key,
|
||||
leafMsg.UnderlyingData,
|
||||
); err != nil {
|
||||
txn.Abort()
|
||||
return errors.Wrap(err, "receive raw shard sync: save vertex tree")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Track key so we can prune anything absent from the authoritative list.
|
||||
seenKeys[string(append([]byte(nil), leafMsg.Key...))] = struct{}{}
|
||||
|
||||
// Use Add to properly build tree structure
|
||||
if err := s.localSet.Add(txn, theirs); err != nil {
|
||||
txn.Abort()
|
||||
return errors.Wrap(err, "receive raw shard sync: add atom")
|
||||
}
|
||||
|
||||
processed++
|
||||
if processed%1000 == 0 {
|
||||
s.logger.Debug(
|
||||
"CLIENT: raw sync progress",
|
||||
zap.Uint64("processed", processed),
|
||||
zap.Uint64("expected", expectedLeaves),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
if txn != nil {
|
||||
if err := txn.Commit(); err != nil {
|
||||
return errors.Wrap(err, "receive raw shard sync")
|
||||
}
|
||||
}
|
||||
|
||||
// Send acknowledgment
|
||||
if err := s.sendLeafMetadata(expectedLeaves); err != nil {
|
||||
return errors.Wrap(err, "receive raw shard sync")
|
||||
}
|
||||
|
||||
if err := s.pruneRawSyncExtras(seenKeys); err != nil {
|
||||
return errors.Wrap(err, "receive raw shard sync")
|
||||
}
|
||||
|
||||
s.logger.Info(
|
||||
"CLIENT: raw shard sync completed",
|
||||
zap.Uint64("leaves_received", expectedLeaves),
|
||||
zap.Duration("duration", time.Since(start)),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *streamManager) pruneRawSyncExtras(seen map[string]struct{}) error {
|
||||
start := time.Now()
|
||||
setType := s.localTree.SetType
|
||||
phaseType := s.localTree.PhaseType
|
||||
shardKey := s.localTree.ShardKey
|
||||
|
||||
iter, err := s.hypergraphStore.IterateRawLeaves(setType, phaseType, shardKey)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "prune raw sync extras: iterator")
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
var txn tries.TreeBackingStoreTransaction
|
||||
var pruned uint64
|
||||
|
||||
commitTxn := func() error {
|
||||
if txn == nil {
|
||||
return nil
|
||||
}
|
||||
if err := txn.Commit(); err != nil {
|
||||
txn.Abort()
|
||||
return err
|
||||
}
|
||||
txn = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
for valid := iter.First(); valid; valid = iter.Next() {
|
||||
leaf, err := iter.Leaf()
|
||||
if err != nil || leaf == nil {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[string(leaf.Key)]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if txn == nil {
|
||||
txn, err = s.hypergraphStore.NewTransaction(false)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "prune raw sync extras")
|
||||
}
|
||||
}
|
||||
|
||||
atom := AtomFromBytes(leaf.Value)
|
||||
if atom == nil {
|
||||
s.logger.Warn("CLIENT: skipping stale leaf with invalid atom", zap.String("key", hex.EncodeToString(leaf.Key)))
|
||||
continue
|
||||
}
|
||||
|
||||
if err := s.localSet.Delete(txn, atom); err != nil {
|
||||
txn.Abort()
|
||||
return errors.Wrap(err, "prune raw sync extras")
|
||||
}
|
||||
if err := s.deleteVertexTreeIfNeeded(txn, atom, leaf.Key); err != nil {
|
||||
txn.Abort()
|
||||
return errors.Wrap(err, "prune raw sync extras")
|
||||
}
|
||||
|
||||
pruned++
|
||||
if pruned%pruneTxnChunk == 0 {
|
||||
if err := commitTxn(); err != nil {
|
||||
return errors.Wrap(err, "prune raw sync extras")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := commitTxn(); err != nil {
|
||||
return errors.Wrap(err, "prune raw sync extras")
|
||||
}
|
||||
|
||||
if pruned > 0 {
|
||||
s.logger.Info(
|
||||
"CLIENT: pruned stale leaves after raw sync",
|
||||
zap.Uint64("count", pruned),
|
||||
zap.Duration("duration", time.Since(start)),
|
||||
)
|
||||
} else {
|
||||
s.logger.Info(
|
||||
"CLIENT: no stale leaves found after raw sync",
|
||||
zap.Duration("duration", time.Since(start)),
|
||||
)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *streamManager) awaitRawLeafMetadata(
|
||||
incomingLeaves <-chan *protobufs.HypergraphComparison,
|
||||
) (uint64, error) {
|
||||
s.logger.Debug("CLIENT: awaitRawLeafMetadata waiting...")
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return 0, errors.Wrap(
|
||||
errors.New("context canceled"),
|
||||
"await raw leaf metadata",
|
||||
)
|
||||
case msg, ok := <-incomingLeaves:
|
||||
if !ok {
|
||||
s.logger.Error("CLIENT: incomingLeaves channel closed")
|
||||
return 0, errors.Wrap(
|
||||
errors.New("channel closed"),
|
||||
"await raw leaf metadata",
|
||||
)
|
||||
}
|
||||
meta := msg.GetMetadata()
|
||||
if meta == nil {
|
||||
s.logger.Error(
|
||||
"CLIENT: received non-metadata message while waiting for metadata",
|
||||
zap.String("payload_type", fmt.Sprintf("%T", msg.Payload)),
|
||||
)
|
||||
return 0, errors.Wrap(
|
||||
errors.New("invalid message: expected metadata"),
|
||||
"await raw leaf metadata",
|
||||
)
|
||||
}
|
||||
s.logger.Debug(
|
||||
"CLIENT: received metadata",
|
||||
zap.Uint64("leaves", meta.Leaves),
|
||||
)
|
||||
return meta.Leaves, nil
|
||||
case <-time.After(leafAckTimeout(1)):
|
||||
s.logger.Error("CLIENT: timeout waiting for metadata")
|
||||
return 0, errors.Wrap(
|
||||
errors.New("timed out waiting for metadata"),
|
||||
"await raw leaf metadata",
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *streamManager) awaitLeafData(
|
||||
incomingLeaves <-chan *protobufs.HypergraphComparison,
|
||||
) (*protobufs.LeafData, error) {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return nil, errors.Wrap(
|
||||
errors.New("context canceled"),
|
||||
"await leaf data",
|
||||
)
|
||||
case msg, ok := <-incomingLeaves:
|
||||
if !ok {
|
||||
return nil, errors.Wrap(
|
||||
errors.New("channel closed"),
|
||||
"await leaf data",
|
||||
)
|
||||
}
|
||||
if leaf := msg.GetLeafData(); leaf != nil {
|
||||
return leaf, nil
|
||||
}
|
||||
return nil, errors.Wrap(
|
||||
errors.New("invalid message: expected leaf data"),
|
||||
"await leaf data",
|
||||
)
|
||||
case <-time.After(leafAckTimeout(1)):
|
||||
return nil, errors.Wrap(
|
||||
errors.New("timed out waiting for leaf data"),
|
||||
"await leaf data",
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *streamManager) sendLeafMetadata(leaves uint64) error {
|
||||
s.logger.Debug("sending leaf metadata ack", zap.Uint64("leaves", leaves))
|
||||
return s.stream.Send(&protobufs.HypergraphComparison{
|
||||
Payload: &protobufs.HypergraphComparison_Metadata{
|
||||
Metadata: &protobufs.HypersyncMetadata{Leaves: leaves},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// sendLeafData builds a LeafData message (with the full leaf data) for the
|
||||
// node at the given path in the local tree and sends it over the stream.
|
||||
func (s *streamManager) sendLeafData(
|
||||
@ -1797,6 +1307,14 @@ func (s *streamManager) pruneLocalSubtree(path []int32) (uint64, error) {
|
||||
|
||||
node, err := s.localTree.GetByPath(intPath)
|
||||
if err != nil {
|
||||
// "item not found" means the tree is empty at this path - nothing to prune
|
||||
if strings.Contains(err.Error(), "item not found") {
|
||||
s.logger.Debug(
|
||||
"CLIENT: prune skipped, item not found",
|
||||
zap.String("path", pathHex),
|
||||
)
|
||||
return 0, nil
|
||||
}
|
||||
return 0, errors.Wrap(err, "prune local subtree")
|
||||
}
|
||||
|
||||
@ -1901,20 +1419,13 @@ func (s *streamManager) persistLeafTree(
|
||||
return nil
|
||||
}
|
||||
|
||||
needsValidation := s.requiresTreeValidation()
|
||||
_, canSaveRaw := s.hypergraphStore.(rawVertexSaver)
|
||||
|
||||
var tree *tries.VectorCommitmentTree
|
||||
var err error
|
||||
if needsValidation || !canSaveRaw {
|
||||
tree, err = tries.DeserializeNonLazyTree(update.UnderlyingData)
|
||||
if err != nil {
|
||||
s.logger.Error("server returned invalid tree", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
tree, err := tries.DeserializeNonLazyTree(update.UnderlyingData)
|
||||
if err != nil {
|
||||
s.logger.Error("server returned invalid tree", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if needsValidation {
|
||||
if s.requiresTreeValidation() {
|
||||
if err := s.localSet.ValidateTree(
|
||||
update.Key,
|
||||
update.Value,
|
||||
@ -1925,12 +1436,6 @@ func (s *streamManager) persistLeafTree(
|
||||
}
|
||||
}
|
||||
|
||||
if saver, ok := s.hypergraphStore.(rawVertexSaver); ok {
|
||||
buf := make([]byte, len(update.UnderlyingData))
|
||||
copy(buf, update.UnderlyingData)
|
||||
return saver.SaveVertexTreeRaw(txn, update.Key, buf)
|
||||
}
|
||||
|
||||
return s.hypergraphStore.SaveVertexTree(txn, update.Key, tree)
|
||||
}
|
||||
|
||||
@ -2115,21 +1620,23 @@ func (s *streamManager) walk(
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if we should use raw sync mode for this phase set
|
||||
if init && shouldUseRawSync(phaseSet) {
|
||||
s.logger.Info(
|
||||
"walk: using raw sync mode",
|
||||
zap.Bool("is_server", isServer),
|
||||
zap.Int("phase_set", int(phaseSet)),
|
||||
)
|
||||
if isServer {
|
||||
return s.rawShardSync(shardKey, phaseSet, incomingLeaves, path)
|
||||
}
|
||||
return s.receiveRawShardSync(incomingLeaves)
|
||||
}
|
||||
|
||||
if isLeaf(lnode) && isLeaf(rnode) && !init {
|
||||
return nil
|
||||
// Both are leaves with differing commitments - need to sync
|
||||
// Server sends its leaf, client prunes local and receives server's leaf
|
||||
if isServer {
|
||||
err := s.sendLeafData(
|
||||
path,
|
||||
incomingLeaves,
|
||||
)
|
||||
return errors.Wrap(err, "walk")
|
||||
} else {
|
||||
// Prune local leaf first since it differs from server
|
||||
if _, err := s.pruneLocalSubtree(path); err != nil {
|
||||
return errors.Wrap(err, "walk")
|
||||
}
|
||||
err := s.handleLeafData(incomingLeaves)
|
||||
return errors.Wrap(err, "walk")
|
||||
}
|
||||
}
|
||||
|
||||
if isLeaf(rnode) || isLeaf(lnode) {
|
||||
@ -2141,6 +1648,11 @@ func (s *streamManager) walk(
|
||||
)
|
||||
return errors.Wrap(err, "walk")
|
||||
} else {
|
||||
// Prune local subtree first - either local is leaf with different data,
|
||||
// or local is branch with children that server doesn't have
|
||||
if _, err := s.pruneLocalSubtree(path); err != nil {
|
||||
return errors.Wrap(err, "walk")
|
||||
}
|
||||
err := s.handleLeafData(incomingLeaves)
|
||||
return errors.Wrap(err, "walk")
|
||||
}
|
||||
@ -2157,13 +1669,15 @@ func (s *streamManager) walk(
|
||||
// )
|
||||
if len(lpref) > len(rpref) {
|
||||
// s.logger.Debug("local prefix longer, traversing remote to path", pathString)
|
||||
traverse := lpref[len(rpref)-1:]
|
||||
traverse := lpref[len(rpref):]
|
||||
rtrav := rnode
|
||||
traversePath := append([]int32{}, rpref...)
|
||||
for _, nibble := range traverse {
|
||||
// s.logger.Debug("attempting remote traversal step")
|
||||
foundMatch := false
|
||||
for _, child := range rtrav.Children {
|
||||
if child.Index == nibble {
|
||||
foundMatch = true
|
||||
// s.logger.Debug("sending query")
|
||||
traversePath = append(traversePath, child.Index)
|
||||
var err error
|
||||
@ -2180,7 +1694,9 @@ func (s *streamManager) walk(
|
||||
}
|
||||
}
|
||||
|
||||
if rtrav == nil {
|
||||
// If no child matched or queryNext returned nil, remote doesn't
|
||||
// have the path that local has
|
||||
if !foundMatch || rtrav == nil {
|
||||
// s.logger.Debug("traversal could not reach path")
|
||||
if isServer {
|
||||
err := s.sendLeafData(
|
||||
@ -2209,15 +1725,17 @@ func (s *streamManager) walk(
|
||||
)
|
||||
} else {
|
||||
// s.logger.Debug("remote prefix longer, traversing local to path", pathString)
|
||||
traverse := rpref[len(lpref)-1:]
|
||||
traverse := rpref[len(lpref):]
|
||||
ltrav := lnode
|
||||
traversedPath := append([]int32{}, lnode.Path...)
|
||||
|
||||
for _, nibble := range traverse {
|
||||
// s.logger.Debug("attempting local traversal step")
|
||||
preTraversal := append([]int32{}, traversedPath...)
|
||||
foundMatch := false
|
||||
for _, child := range ltrav.Children {
|
||||
if child.Index == nibble {
|
||||
foundMatch = true
|
||||
traversedPath = append(traversedPath, nibble)
|
||||
var err error
|
||||
// s.logger.Debug("expecting query")
|
||||
@ -2259,6 +1777,28 @@ func (s *streamManager) walk(
|
||||
}
|
||||
}
|
||||
}
|
||||
// If no child matched the nibble, the local tree doesn't extend
|
||||
// to match the remote's deeper prefix. We still need to respond to
|
||||
// the remote's query with an empty response, then handle the leaf data.
|
||||
if !foundMatch {
|
||||
// Respond to remote's pending query with our current path info
|
||||
// (which will have empty commitment since we don't have the path)
|
||||
traversedPath = append(traversedPath, nibble)
|
||||
ltrav, _ = s.handleQueryNext(incomingQueries, traversedPath)
|
||||
|
||||
if isServer {
|
||||
// Server sends its data since client's tree is shallower
|
||||
if err := s.sendLeafData(preTraversal, incomingLeaves); err != nil {
|
||||
return errors.Wrap(err, "walk")
|
||||
}
|
||||
} else {
|
||||
// Client receives data from server
|
||||
if err := s.handleLeafData(incomingLeaves); err != nil {
|
||||
return errors.Wrap(err, "walk")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
// s.logger.Debug("traversal completed, performing walk", pathString)
|
||||
return s.walk(
|
||||
@ -2319,6 +1859,11 @@ func (s *streamManager) walk(
|
||||
}
|
||||
}
|
||||
if rchild != nil {
|
||||
// Remote has a child that local doesn't have
|
||||
// - If SERVER: remote (client) has extra data, server has nothing to send
|
||||
// Client will prune this on their side via lchild != nil case
|
||||
// - If CLIENT: remote (server) has data we need to receive
|
||||
// Server sends this via their lchild != nil case
|
||||
if !isServer {
|
||||
err := s.handleLeafData(incomingLeaves)
|
||||
if err != nil {
|
||||
@ -2380,6 +1925,10 @@ func (s *streamManager) walk(
|
||||
return errors.Wrap(err, "walk")
|
||||
}
|
||||
} else {
|
||||
// Prune local data first since prefixes differ
|
||||
if _, err := s.pruneLocalSubtree(path); err != nil {
|
||||
return errors.Wrap(err, "walk")
|
||||
}
|
||||
err := s.handleLeafData(incomingLeaves)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "walk")
|
||||
|
||||
@ -997,7 +997,8 @@ func (e *AppConsensusEngine) handleGlobalProverRoot(
|
||||
)
|
||||
e.globalProverRootSynced.Store(false)
|
||||
e.globalProverRootVerifiedFrame.Store(0)
|
||||
e.triggerGlobalHypersync(frame.Header.Prover, expectedProverRoot)
|
||||
// Use blocking hypersync to ensure we're synced before continuing
|
||||
e.performBlockingGlobalHypersync(frame.Header.Prover, expectedProverRoot)
|
||||
return
|
||||
}
|
||||
|
||||
@ -1014,7 +1015,8 @@ func (e *AppConsensusEngine) handleGlobalProverRoot(
|
||||
)
|
||||
e.globalProverRootSynced.Store(false)
|
||||
e.globalProverRootVerifiedFrame.Store(0)
|
||||
e.triggerGlobalHypersync(frame.Header.Prover, expectedProverRoot)
|
||||
// Use blocking hypersync to ensure we're synced before continuing
|
||||
e.performBlockingGlobalHypersync(frame.Header.Prover, expectedProverRoot)
|
||||
return
|
||||
}
|
||||
|
||||
@ -1095,6 +1097,75 @@ func (e *AppConsensusEngine) triggerGlobalHypersync(proposer []byte, expectedRoo
|
||||
}()
|
||||
}
|
||||
|
||||
// performBlockingGlobalHypersync performs a synchronous hypersync that blocks
|
||||
// until completion. This is used before materializing frames to ensure we sync
|
||||
// before applying any transactions when there's a prover root mismatch.
|
||||
func (e *AppConsensusEngine) performBlockingGlobalHypersync(proposer []byte, expectedRoot []byte) {
|
||||
if e.syncProvider == nil {
|
||||
e.logger.Debug("blocking hypersync: no sync provider")
|
||||
return
|
||||
}
|
||||
if bytes.Equal(proposer, e.proverAddress) {
|
||||
e.logger.Debug("blocking hypersync: we are the proposer")
|
||||
return
|
||||
}
|
||||
|
||||
// Wait for any existing sync to complete first
|
||||
for e.globalProverSyncInProgress.Load() {
|
||||
e.logger.Debug("blocking hypersync: waiting for existing sync to complete")
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Mark sync as in progress
|
||||
if !e.globalProverSyncInProgress.CompareAndSwap(false, true) {
|
||||
// Another sync started, wait for it
|
||||
for e.globalProverSyncInProgress.Load() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
return
|
||||
}
|
||||
defer e.globalProverSyncInProgress.Store(false)
|
||||
|
||||
e.logger.Info(
|
||||
"performing blocking global hypersync before processing frame",
|
||||
zap.String("proposer", hex.EncodeToString(proposer)),
|
||||
zap.String("expected_root", hex.EncodeToString(expectedRoot)),
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Set up shutdown handler
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
select {
|
||||
case <-e.ShutdownSignal():
|
||||
cancel()
|
||||
case <-done:
|
||||
}
|
||||
}()
|
||||
|
||||
selfPeerID := peer.ID(e.pubsub.GetPeerID())
|
||||
shardKey := tries.ShardKey{
|
||||
L1: [3]byte{0x00, 0x00, 0x00},
|
||||
L2: intrinsics.GLOBAL_INTRINSIC_ADDRESS,
|
||||
}
|
||||
|
||||
// Perform sync synchronously (blocking)
|
||||
e.syncProvider.HyperSyncSelf(ctx, selfPeerID, shardKey, nil, expectedRoot)
|
||||
close(done)
|
||||
|
||||
if err := e.proverRegistry.Refresh(); err != nil {
|
||||
e.logger.Warn(
|
||||
"failed to refresh prover registry after blocking hypersync",
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
|
||||
e.globalProverRootSynced.Store(true)
|
||||
e.logger.Info("blocking global hypersync completed")
|
||||
}
|
||||
|
||||
func (e *AppConsensusEngine) GetFrame() *protobufs.AppShardFrame {
|
||||
frame, _, _ := e.clockStore.GetLatestShardClockFrame(e.appAddress)
|
||||
return frame
|
||||
|
||||
@ -118,6 +118,10 @@ func (e *GlobalConsensusEngine) eventDistributorLoop(
|
||||
e.evaluateForProposals(ctx, data, needsProposals)
|
||||
} else {
|
||||
self, effectiveSeniority := e.allocationContext()
|
||||
// Still reconcile allocations even when all workers appear
|
||||
// allocated - this clears stale filters that no longer match
|
||||
// prover allocations on-chain.
|
||||
e.reconcileWorkerAllocations(data.Frame.Header.FrameNumber, self)
|
||||
e.checkExcessPendingJoins(self, data.Frame.Header.FrameNumber)
|
||||
e.logAllocationStatusOnly(ctx, data, self, effectiveSeniority)
|
||||
}
|
||||
|
||||
@ -1625,6 +1625,25 @@ func (e *GlobalConsensusEngine) materialize(
|
||||
return errors.Wrap(err, "materialize")
|
||||
}
|
||||
|
||||
// Check prover root BEFORE processing transactions. If there's a mismatch,
|
||||
// we need to sync first, otherwise we'll apply transactions on top of
|
||||
// divergent state and then sync will delete the newly added records.
|
||||
if len(expectedProverRoot) > 0 {
|
||||
localRoot, localErr := e.computeLocalProverRoot(frameNumber)
|
||||
if localErr == nil && len(localRoot) > 0 {
|
||||
if !bytes.Equal(localRoot, expectedProverRoot) {
|
||||
e.logger.Info(
|
||||
"prover root mismatch detected before processing frame, syncing first",
|
||||
zap.Uint64("frame_number", frameNumber),
|
||||
zap.String("expected_root", hex.EncodeToString(expectedProverRoot)),
|
||||
zap.String("local_root", hex.EncodeToString(localRoot)),
|
||||
)
|
||||
// Perform blocking hypersync before continuing
|
||||
e.performBlockingProverHypersync(proposer, expectedProverRoot)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var state state.State
|
||||
state = hgstate.NewHypergraphState(e.hypergraph)
|
||||
|
||||
@ -1736,23 +1755,13 @@ func (e *GlobalConsensusEngine) materialize(
|
||||
return errors.Wrap(err, "materialize")
|
||||
}
|
||||
|
||||
shouldVerifyRoot := !e.config.Engine.ArchiveMode || e.config.P2P.Network == 99
|
||||
localProverRoot, localRootErr := e.computeLocalProverRoot(frameNumber)
|
||||
if localRootErr != nil {
|
||||
logMsg := "failed to compute local prover root"
|
||||
if shouldVerifyRoot {
|
||||
e.logger.Warn(
|
||||
logMsg,
|
||||
zap.Uint64("frame_number", frameNumber),
|
||||
zap.Error(localRootErr),
|
||||
)
|
||||
} else {
|
||||
e.logger.Debug(
|
||||
logMsg,
|
||||
zap.Uint64("frame_number", frameNumber),
|
||||
zap.Error(localRootErr),
|
||||
)
|
||||
}
|
||||
e.logger.Warn(
|
||||
"failed to compute local prover root",
|
||||
zap.Uint64("frame_number", frameNumber),
|
||||
zap.Error(localRootErr),
|
||||
)
|
||||
}
|
||||
|
||||
// Publish the snapshot generation with the new root so clients can sync
|
||||
@ -1763,7 +1772,7 @@ func (e *GlobalConsensusEngine) materialize(
|
||||
}
|
||||
}
|
||||
|
||||
if len(localProverRoot) > 0 && shouldVerifyRoot {
|
||||
if len(localProverRoot) > 0 {
|
||||
if e.verifyProverRoot(
|
||||
frameNumber,
|
||||
expectedProverRoot,
|
||||
@ -1985,7 +1994,77 @@ func (e *GlobalConsensusEngine) triggerProverHypersync(proposer []byte, expected
|
||||
}()
|
||||
}
|
||||
|
||||
// performBlockingProverHypersync performs a synchronous hypersync that blocks
|
||||
// until completion. This is used at the start of materialize to ensure we sync
|
||||
// before applying any transactions when there's a prover root mismatch.
|
||||
func (e *GlobalConsensusEngine) performBlockingProverHypersync(proposer []byte, expectedRoot []byte) {
|
||||
if e.syncProvider == nil || len(proposer) == 0 {
|
||||
e.logger.Debug("blocking hypersync: no sync provider or proposer")
|
||||
return
|
||||
}
|
||||
if bytes.Equal(proposer, e.getProverAddress()) {
|
||||
e.logger.Debug("blocking hypersync: we are the proposer")
|
||||
return
|
||||
}
|
||||
|
||||
// Wait for any existing sync to complete first
|
||||
for e.proverSyncInProgress.Load() {
|
||||
e.logger.Debug("blocking hypersync: waiting for existing sync to complete")
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Mark sync as in progress
|
||||
if !e.proverSyncInProgress.CompareAndSwap(false, true) {
|
||||
// Another sync started, wait for it
|
||||
for e.proverSyncInProgress.Load() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
return
|
||||
}
|
||||
defer e.proverSyncInProgress.Store(false)
|
||||
|
||||
e.logger.Info(
|
||||
"performing blocking hypersync before processing frame",
|
||||
zap.String("proposer", hex.EncodeToString(proposer)),
|
||||
zap.String("expected_root", hex.EncodeToString(expectedRoot)),
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Set up shutdown handler
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
select {
|
||||
case <-e.ShutdownSignal():
|
||||
cancel()
|
||||
case <-done:
|
||||
}
|
||||
}()
|
||||
|
||||
shardKey := tries.ShardKey{
|
||||
L1: [3]byte{0x00, 0x00, 0x00},
|
||||
L2: intrinsics.GLOBAL_INTRINSIC_ADDRESS,
|
||||
}
|
||||
|
||||
// Perform sync synchronously (blocking)
|
||||
e.syncProvider.HyperSync(ctx, proposer, shardKey, nil, expectedRoot)
|
||||
close(done)
|
||||
|
||||
if err := e.proverRegistry.Refresh(); err != nil {
|
||||
e.logger.Warn(
|
||||
"failed to refresh prover registry after blocking hypersync",
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
|
||||
e.logger.Info("blocking hypersync completed")
|
||||
}
|
||||
|
||||
func (e *GlobalConsensusEngine) reconcileLocalWorkerAllocations() {
|
||||
if e.config.Engine.ArchiveMode {
|
||||
return
|
||||
}
|
||||
if e.workerManager == nil || e.proverRegistry == nil {
|
||||
return
|
||||
}
|
||||
|
||||
@ -412,6 +412,19 @@ func (r *ProverRegistry) PruneOrphanJoins(frameNumber uint64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reload prover state from hypergraph to ensure deterministic pruning
|
||||
// across all nodes regardless of in-memory cache state
|
||||
r.globalTrie = &tries.RollingFrecencyCritbitTrie{}
|
||||
r.shardTries = make(map[string]*tries.RollingFrecencyCritbitTrie)
|
||||
r.proverCache = make(map[string]*consensus.ProverInfo)
|
||||
r.filterCache = make(map[string][]*consensus.ProverInfo)
|
||||
r.addressToFilters = make(map[string][]string)
|
||||
|
||||
if err := r.extractGlobalState(); err != nil {
|
||||
r.logger.Error("failed to reload global state before pruning", zap.Error(err))
|
||||
return errors.Wrap(err, "prune orphan joins")
|
||||
}
|
||||
|
||||
cutoff := frameNumber - 760
|
||||
var prunedAllocations int
|
||||
var prunedProvers int
|
||||
|
||||
@ -67,6 +67,7 @@ var pebbleMigrations = []func(*pebble.Batch) error{
|
||||
migration_2_1_0_172,
|
||||
migration_2_1_0_173,
|
||||
migration_2_1_0_18,
|
||||
migration_2_1_0_181,
|
||||
}
|
||||
|
||||
func NewPebbleDB(
|
||||
@ -776,6 +777,10 @@ func migration_2_1_0_18(b *pebble.Batch) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func migration_2_1_0_181(b *pebble.Batch) error {
|
||||
return migration_2_1_0_18(b)
|
||||
}
|
||||
|
||||
type pebbleSnapshotDB struct {
|
||||
snap *pebble.Snapshot
|
||||
}
|
||||
|
||||
@ -2038,8 +2038,13 @@ func (t *LazyVectorCommitmentTree) Delete(
|
||||
}
|
||||
retNode = nil
|
||||
case 1:
|
||||
// Identify the child's original path to prevent orphaned storage entries
|
||||
originalChildPath := slices.Concat(n.FullPrefix, []int{lastChildIndex})
|
||||
|
||||
if childBranch, ok := lastChild.(*LazyVectorCommitmentBranchNode); ok {
|
||||
// Merge this node's prefix with the child's prefix
|
||||
// Note: We do NOT update FullPrefix because children are stored
|
||||
// relative to the branch's FullPrefix, and they'd become unreachable
|
||||
mergedPrefix := []int{}
|
||||
mergedPrefix = append(mergedPrefix, n.Prefix...)
|
||||
mergedPrefix = append(mergedPrefix, lastChildIndex)
|
||||
@ -2048,7 +2053,17 @@ func (t *LazyVectorCommitmentTree) Delete(
|
||||
childBranch.Prefix = mergedPrefix
|
||||
childBranch.Commitment = nil
|
||||
|
||||
// Delete this node from storage
|
||||
// Delete the child from its original path to prevent orphan
|
||||
_ = t.Store.DeleteNode(
|
||||
txn,
|
||||
t.SetType,
|
||||
t.PhaseType,
|
||||
t.ShardKey,
|
||||
generateKeyFromPath(originalChildPath),
|
||||
originalChildPath,
|
||||
)
|
||||
|
||||
// Delete this node (parent) from storage
|
||||
err := t.Store.DeleteNode(
|
||||
txn,
|
||||
t.SetType,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user