From 03ecf4e417f95250d5ddadfa03a30e0ddbf98f04 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Fri, 7 Mar 2025 18:54:51 -0600 Subject: [PATCH] periodic saving, better logging, better peer choices, additional rebuild --- node/consensus/data/token_handle_mint_test.go | 2 +- .../token/application/token_handle_merge.go | 2 +- .../token_handle_prover_join_test.go | 8 +- .../token/application/token_handle_split.go | 2 +- .../application/token_handle_transfer.go | 2 +- .../token/token_execution_engine.go | 138 +++++++- node/rpc/hypergraph_sync_rpc_server.go | 325 +++++++++++------- node/rpc/hypergraph_sync_rpc_server_test.go | 41 ++- node/store/coin.go | 17 +- 9 files changed, 384 insertions(+), 153 deletions(-) diff --git a/node/consensus/data/token_handle_mint_test.go b/node/consensus/data/token_handle_mint_test.go index 00926f2..05ad6c8 100644 --- a/node/consensus/data/token_handle_mint_test.go +++ b/node/consensus/data/token_handle_mint_test.go @@ -674,7 +674,7 @@ func TestHandlePreMidnightMint(t *testing.T) { err = app.CoinStore.PutCoin(txn, 1, a, e.Coin) assert.NoError(t, err) case *protobufs.TokenOutput_DeletedCoin: - c, err := app.CoinStore.GetCoinByAddress(nil, e.DeletedCoin.Address) + _, c, err := app.CoinStore.GetCoinByAddress(nil, e.DeletedCoin.Address) assert.NoError(t, err) err = app.CoinStore.DeleteCoin(txn, e.DeletedCoin.Address, c) assert.NoError(t, err) diff --git a/node/execution/intrinsics/token/application/token_handle_merge.go b/node/execution/intrinsics/token/application/token_handle_merge.go index 6595661..1e8dfa0 100644 --- a/node/execution/intrinsics/token/application/token_handle_merge.go +++ b/node/execution/intrinsics/token/application/token_handle_merge.go @@ -63,7 +63,7 @@ func (a *TokenApplication) handleMerge( owner := &protobufs.AccountRef{} deleted := []*protobufs.TokenOutput{} for _, c := range t.Coins { - coin, err := a.CoinStore.GetCoinByAddress(nil, c.Address) + _, coin, err := a.CoinStore.GetCoinByAddress(nil, c.Address) if err != nil { return nil, errors.Wrap(ErrInvalidStateTransition, "handle merge") } diff --git a/node/execution/intrinsics/token/application/token_handle_prover_join_test.go b/node/execution/intrinsics/token/application/token_handle_prover_join_test.go index 85d6134..c2a70ee 100644 --- a/node/execution/intrinsics/token/application/token_handle_prover_join_test.go +++ b/node/execution/intrinsics/token/application/token_handle_prover_join_test.go @@ -338,7 +338,7 @@ func TestHandleProverJoin(t *testing.T) { err = app.CoinStore.PutCoin(txn, 1, a, e.Coin) assert.NoError(t, err) case *protobufs.TokenOutput_DeletedCoin: - c, err := app.CoinStore.GetCoinByAddress(nil, e.DeletedCoin.Address) + _, c, err := app.CoinStore.GetCoinByAddress(nil, e.DeletedCoin.Address) assert.NoError(t, err) err = app.CoinStore.DeleteCoin(txn, e.DeletedCoin.Address, c) assert.NoError(t, err) @@ -387,7 +387,7 @@ func TestHandleProverJoin(t *testing.T) { err = app.CoinStore.PutCoin(txn, 4, a, e.Coin) assert.NoError(t, err) case *protobufs.TokenOutput_DeletedCoin: - c, err := app.CoinStore.GetCoinByAddress(txn, e.DeletedCoin.Address) + _, c, err := app.CoinStore.GetCoinByAddress(txn, e.DeletedCoin.Address) assert.NoError(t, err) err = app.CoinStore.DeleteCoin(txn, e.DeletedCoin.Address, c) assert.NoError(t, err) @@ -441,7 +441,7 @@ func TestHandleProverJoin(t *testing.T) { assert.NoError(t, err) coins = append(coins, a) case *protobufs.TokenOutput_DeletedCoin: - c, err := app.CoinStore.GetCoinByAddress(txn, e.DeletedCoin.Address) + _, c, err := app.CoinStore.GetCoinByAddress(txn, e.DeletedCoin.Address) assert.NoError(t, err) err = app.CoinStore.DeleteCoin(txn, e.DeletedCoin.Address, c) assert.NoError(t, err) @@ -496,7 +496,7 @@ func TestHandleProverJoin(t *testing.T) { assert.NoError(t, err) coins = append(coins, a) case *protobufs.TokenOutput_DeletedCoin: - c, err := app.CoinStore.GetCoinByAddress(txn, e.DeletedCoin.Address) + _, c, err := app.CoinStore.GetCoinByAddress(txn, e.DeletedCoin.Address) assert.NoError(t, err) err = app.CoinStore.DeleteCoin(txn, e.DeletedCoin.Address, c) assert.NoError(t, err) diff --git a/node/execution/intrinsics/token/application/token_handle_split.go b/node/execution/intrinsics/token/application/token_handle_split.go index e4a35b5..8699499 100644 --- a/node/execution/intrinsics/token/application/token_handle_split.go +++ b/node/execution/intrinsics/token/application/token_handle_split.go @@ -23,7 +23,7 @@ func (a *TokenApplication) handleSplit( newCoins := []*protobufs.Coin{} newAmounts := []*big.Int{} - coin, err := a.CoinStore.GetCoinByAddress(nil, t.OfCoin.Address) + _, coin, err := a.CoinStore.GetCoinByAddress(nil, t.OfCoin.Address) if err != nil { return nil, errors.Wrap(ErrInvalidStateTransition, "handle split") } diff --git a/node/execution/intrinsics/token/application/token_handle_transfer.go b/node/execution/intrinsics/token/application/token_handle_transfer.go index 150566b..27ab21f 100644 --- a/node/execution/intrinsics/token/application/token_handle_transfer.go +++ b/node/execution/intrinsics/token/application/token_handle_transfer.go @@ -24,7 +24,7 @@ func (a *TokenApplication) handleTransfer( return nil, errors.Wrap(ErrInvalidStateTransition, "handle transfer") } - coin, err := a.CoinStore.GetCoinByAddress(nil, t.OfCoin.Address) + _, coin, err := a.CoinStore.GetCoinByAddress(nil, t.OfCoin.Address) if err != nil { return nil, errors.Wrap(ErrInvalidStateTransition, "handle transfer") } diff --git a/node/execution/intrinsics/token/token_execution_engine.go b/node/execution/intrinsics/token/token_execution_engine.go index a731dda..528b821 100644 --- a/node/execution/intrinsics/token/token_execution_engine.go +++ b/node/execution/intrinsics/token/token_execution_engine.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto" + "encoding/binary" "encoding/hex" "fmt" "math/big" @@ -268,7 +269,7 @@ func NewTokenExecutionEngine( } tries := []*tries.RollingFrecencyCritbitTrie{ - &tries.RollingFrecencyCritbitTrie{}, + {}, } proverKeys = [][]byte{config.GetGenesis().Beacon} for _, key := range proverKeys { @@ -395,8 +396,13 @@ func NewTokenExecutionEngine( panic(err) } + includeSet := [][]byte{} + for iter.First(); iter.Valid(); iter.Next() { if bytes.Compare(iter.Key()[2:], start) >= 0 && bytes.Compare(iter.Key()[2:], end) < 0 { + key := make([]byte, len(iter.Key())-2) + copy(key, iter.Key()[2:]) + includeSet = append(includeSet, key) specificRange++ } totalCoins++ @@ -419,6 +425,28 @@ func NewTokenExecutionEngine( if e.hypergraph == nil || len(e.hypergraph.GetVertexAdds()) == 0 { e.rebuildHypergraph(specificRange) } + + vertices, ok := e.hypergraph.GetVertexAdds()[hypergraph.ShardKey{ + L1: [3]byte(p2p.GetBloomFilterIndices(intrinsicFilter[:], 256, 3)), + L2: [32]byte(slices.Clone(intrinsicFilter[:])), + }] + + if !ok { + panic("hypergraph does not contain id set for application") + } + + rebuildSet := [][]byte{} + for _, inc := range includeSet { + if !vertices.Has( + [64]byte(slices.Concat(intrinsicFilter, inc)), + ) { + rebuildSet = append(rebuildSet, inc) + } + } + + if len(rebuildSet) != 0 { + e.rebuildMissingSetForHypergraph(rebuildSet) + } } syncServer := qgrpc.NewServer( @@ -622,12 +650,24 @@ func (e *TokenExecutionEngine) hyperSync(totalCoins int) { } defer e.syncController.EndSyncSession() - peerId, err := e.pubSub.GetRandomPeer( - append([]byte{0x00}, e.intrinsicFilter...), - ) - if err != nil { - e.logger.Error("error getting peer", zap.Error(err)) - return + var peerId []byte = nil + for peerId == nil { + var err error + peerId, err = e.pubSub.GetRandomPeer( + append([]byte{0x00}, e.intrinsicFilter...), + ) + if err != nil { + e.logger.Error("error getting peer", zap.Error(err)) + return + } + + info, ok := e.syncController.SyncStatus[peer.ID(peerId).String()] + if ok { + if info.Unreachable || gotime.Since(info.LastSynced) < 30*gotime.Minute { + peerId = nil + continue + } + } } e.logger.Info( @@ -643,6 +683,10 @@ func (e *TokenExecutionEngine) hyperSync(totalCoins int) { "could not establish direct channel", zap.Error(err), ) + e.syncController.SyncStatus[peer.ID(peerId).String()] = &rpc.SyncInfo{ + Unreachable: true, + LastSynced: gotime.Now(), + } return } defer func() { @@ -656,6 +700,10 @@ func (e *TokenExecutionEngine) hyperSync(totalCoins int) { stream, err := client.HyperStream(e.ctx) if err != nil { e.logger.Error("could not open stream", zap.Error(err)) + e.syncController.SyncStatus[peer.ID(peerId).String()] = &rpc.SyncInfo{ + Unreachable: false, + LastSynced: gotime.Now(), + } return } @@ -667,6 +715,7 @@ func (e *TokenExecutionEngine) hyperSync(totalCoins int) { append(append([]byte{}, key.L1[:]...), key.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, e.hypergraphStore, + e.hypergraph, set, e.syncController, totalCoins, @@ -674,7 +723,11 @@ func (e *TokenExecutionEngine) hyperSync(totalCoins int) { ) if err != nil { e.logger.Error("error while synchronizing", zap.Error(err)) - return + e.syncController.SyncStatus[peer.ID(peerId).String()] = &rpc.SyncInfo{ + Unreachable: false, + LastSynced: gotime.Now(), + } + continue } } @@ -689,6 +742,73 @@ func (e *TokenExecutionEngine) hyperSync(totalCoins int) { } } +func (e *TokenExecutionEngine) rebuildMissingSetForHypergraph(set [][]byte) { + e.logger.Info("rebuilding missing set entries") + var batchKey, batchValue [][]byte + processed := 0 + totalRange := len(set) + for _, address := range set { + processed++ + key := slices.Clone(address) + batchKey = append(batchKey, key) + + frameNumber, coin, err := e.coinStore.GetCoinByAddress(nil, address) + if err != nil { + panic(err) + } + + value := []byte{} + value = binary.BigEndian.AppendUint64(value, frameNumber) + value = append(value, coin.Amount...) + // implicit + value = append(value, 0x00) + value = append(value, coin.Owner.GetImplicitAccount().GetAddress()...) + // domain len + value = append(value, 0x00) + value = append(value, coin.Intersection...) + batchValue = append(batchValue, value) + + if len(batchKey) == runtime.NumCPU() { + e.addBatchToHypergraph(batchKey, batchValue) + e.logger.Info( + "processed batch", + zap.Float32("percentage", float32(processed)/float32(totalRange)), + ) + batchKey = [][]byte{} + batchValue = [][]byte{} + } + } + + if len(batchKey) != 0 { + e.addBatchToHypergraph(batchKey, batchValue) + } + + txn, err := e.clockStore.NewTransaction(false) + if err != nil { + panic(err) + } + + e.logger.Info("committing hypergraph") + + roots := e.hypergraph.Commit() + + e.logger.Info( + "committed hypergraph state", + zap.String("root", fmt.Sprintf("%x", roots[0])), + ) + + err = e.hypergraphStore.SaveHypergraph(e.hypergraph) + if err != nil { + txn.Abort() + panic(err) + } + + if err = txn.Commit(); err != nil { + txn.Abort() + panic(err) + } +} + func (e *TokenExecutionEngine) rebuildHypergraph(totalRange int) { e.logger.Info("rebuilding hypergraph") e.hypergraph = hypergraph.NewHypergraph() @@ -1023,7 +1143,7 @@ func (e *TokenExecutionEngine) ProcessFrame( panic(err) } case *protobufs.TokenOutput_DeletedCoin: - coin, err := e.coinStore.GetCoinByAddress(nil, o.DeletedCoin.Address) + _, coin, err := e.coinStore.GetCoinByAddress(nil, o.DeletedCoin.Address) if err != nil { txn.Abort() return nil, errors.Wrap(err, "process frame") diff --git a/node/rpc/hypergraph_sync_rpc_server.go b/node/rpc/hypergraph_sync_rpc_server.go index 2a50300..e6d84f1 100644 --- a/node/rpc/hypergraph_sync_rpc_server.go +++ b/node/rpc/hypergraph_sync_rpc_server.go @@ -16,12 +16,14 @@ import ( "go.uber.org/zap" "source.quilibrium.com/quilibrium/monorepo/node/crypto" hypergraph "source.quilibrium.com/quilibrium/monorepo/node/hypergraph/application" + "source.quilibrium.com/quilibrium/monorepo/node/internal/grpc" "source.quilibrium.com/quilibrium/monorepo/node/protobufs" "source.quilibrium.com/quilibrium/monorepo/node/store" ) type SyncController struct { - isSyncing atomic.Bool + isSyncing atomic.Bool + SyncStatus map[string]*SyncInfo } func (s *SyncController) TryEstablishSyncSession() bool { @@ -32,9 +34,15 @@ func (s *SyncController) EndSyncSession() { s.isSyncing.Store(false) } +type SyncInfo struct { + Unreachable bool + LastSynced time.Time +} + func NewSyncController() *SyncController { return &SyncController{ - isSyncing: atomic.Bool{}, + isSyncing: atomic.Bool{}, + SyncStatus: map[string]*SyncInfo{}, } } @@ -65,12 +73,18 @@ func NewHypergraphComparisonServer( } } +type streamManager struct { + ctx context.Context + logger *zap.Logger + stream HyperStream + hypergraphStore store.HypergraphStore + localTree *crypto.VectorCommitmentTree + lastSent time.Time +} + // sendLeafData builds a LeafData message (with the full leaf data) for the // node at the given path in the local tree and sends it over the stream. -func sendLeafData( - stream HyperStream, - hypergraphStore store.HypergraphStore, - localTree *crypto.VectorCommitmentTree, +func (s *streamManager) sendLeafData( path []int32, metadataOnly bool, ) error { @@ -82,7 +96,7 @@ func sendLeafData( Size: leaf.Size.FillBytes(make([]byte, 32)), } if !metadataOnly { - tree, err := hypergraphStore.LoadVertexTree(leaf.Key) + tree, err := s.hypergraphStore.LoadVertexTree(leaf.Key) if err == nil { var buf bytes.Buffer enc := gob.NewEncoder(&buf) @@ -97,10 +111,28 @@ func sendLeafData( LeafData: update, }, } - return stream.Send(msg) + + s.logger.Info( + "sending leaf data", + zap.String("key", hex.EncodeToString(leaf.Key)), + ) + + select { + case <-s.ctx.Done(): + return s.ctx.Err() + default: + } + + err := s.stream.Send(msg) + if err != nil { + return errors.Wrap(err, "send leaf data") + } + + s.lastSent = time.Now() + return nil } - node := getNodeAtPath(localTree.Root, path, 0) + node := getNodeAtPath(s.localTree.Root, path, 0) leaf, ok := node.(*crypto.VectorCommitmentLeafNode) if !ok { children := crypto.GetAllLeaves(node) @@ -251,7 +283,7 @@ func queryNext( ) } return resp, nil - case <-time.After(5 * time.Second): + case <-time.After(30 * time.Second): return nil, errors.Wrap( errors.New("timed out"), "handle query", @@ -306,7 +338,7 @@ func handleQueryNext( } return branchInfo, nil - case <-time.After(5 * time.Second): + case <-time.After(30 * time.Second): return nil, errors.Wrap( errors.New("timed out"), "handle query next", @@ -366,7 +398,7 @@ func descendIndex( } return branchInfo, resp, nil - case <-time.After(5 * time.Second): + case <-time.After(30 * time.Second): return nil, nil, errors.Wrap( errors.New("timed out"), "descend index", @@ -387,38 +419,30 @@ func packPath(path []int32) []byte { return b } -func walk( - ctx context.Context, - logger *zap.Logger, +func (s *streamManager) walk( path []int32, lnode, rnode *protobufs.HypergraphComparisonResponse, incomingQueries <-chan *protobufs.HypergraphComparisonQuery, incomingResponses <-chan *protobufs.HypergraphComparisonResponse, - stream HyperStream, - hypergraphStore store.HypergraphStore, - localTree *crypto.VectorCommitmentTree, metadataOnly bool, ) error { select { - case <-ctx.Done(): - return errors.New("context canceled") + case <-s.ctx.Done(): + return s.ctx.Err() default: } pathString := zap.String("path", hex.EncodeToString(packPath(path))) if bytes.Equal(lnode.Commitment, rnode.Commitment) { - logger.Info("commitments match", pathString) + s.logger.Info("commitments match", pathString) return nil } if isLeaf(lnode) && isLeaf(rnode) { if !bytes.Equal(lnode.Commitment, rnode.Commitment) { - logger.Info("leaves mismatch commitments, sending", pathString) - sendLeafData( - stream, - hypergraphStore, - localTree, + s.logger.Info("leaves mismatch commitments, sending", pathString) + s.sendLeafData( path, metadataOnly, ) @@ -427,46 +451,43 @@ func walk( } if isLeaf(rnode) || isLeaf(lnode) { - logger.Info("leaf/branch mismatch at path", pathString) - sendLeafData( - stream, - hypergraphStore, - localTree, + s.logger.Info("leaf/branch mismatch at path", pathString) + err := s.sendLeafData( path, metadataOnly, ) - return nil + return errors.Wrap(err, "walk") } lpref := lnode.Path rpref := rnode.Path if len(lpref) != len(rpref) { - logger.Info( + s.logger.Info( "prefix length mismatch", zap.Int("local_prefix", len(lpref)), zap.Int("remote_prefix", len(rpref)), pathString, ) if len(lpref) > len(rpref) { - logger.Info("local prefix longer, traversing remote to path", pathString) + s.logger.Info("local prefix longer, traversing remote to path", pathString) traverse := lpref[len(rpref)-1:] rtrav := rnode traversePath := append([]int32{}, rpref...) for _, nibble := range traverse { - logger.Info("attempting remote traversal step") + s.logger.Info("attempting remote traversal step") for _, child := range rtrav.Children { if child.Index == nibble { - logger.Info("sending query") + s.logger.Info("sending query") traversePath = append(traversePath, child.Index) var err error rtrav, err = queryNext( - ctx, + s.ctx, incomingResponses, - stream, + s.stream, traversePath, ) if err != nil { - logger.Error("query failed", zap.Error(err)) + s.logger.Error("query failed", zap.Error(err)) return errors.Wrap(err, "walk") } @@ -475,70 +496,61 @@ func walk( } if rtrav == nil { - logger.Info("traversal could not reach path, sending leaf data") - sendLeafData( - stream, - hypergraphStore, - localTree, + s.logger.Info("traversal could not reach path, sending leaf data") + err := s.sendLeafData( path, metadataOnly, ) - return nil + return errors.Wrap(err, "walk") } } - logger.Info("traversal completed, performing walk", pathString) - return walk( - ctx, - logger, + s.logger.Info("traversal completed, performing walk", pathString) + return s.walk( path, lnode, rtrav, incomingQueries, incomingResponses, - stream, - hypergraphStore, - localTree, metadataOnly, ) } else { - logger.Info("remote prefix longer, traversing local to path", pathString) + s.logger.Info("remote prefix longer, traversing local to path", pathString) traverse := rpref[len(lpref)-1:] ltrav := lnode traversedPath := append([]int32{}, lnode.Path...) for _, nibble := range traverse { - logger.Info("attempting local traversal step") + s.logger.Info("attempting local traversal step") preTraversal := append([]int32{}, traversedPath...) for _, child := range ltrav.Children { if child.Index == nibble { traversedPath = append(traversedPath, nibble) var err error - logger.Info("expecting query") + s.logger.Info("expecting query") ltrav, err = handleQueryNext( - ctx, + s.ctx, incomingQueries, - stream, - localTree, + s.stream, + s.localTree, traversedPath, ) if err != nil { - logger.Error("expect failed", zap.Error(err)) + s.logger.Error("expect failed", zap.Error(err)) return errors.Wrap(err, "walk") } if ltrav == nil { - logger.Info("traversal could not reach path, sending leaf data") - sendLeafData( - stream, - hypergraphStore, - localTree, + s.logger.Info("traversal could not reach path, sending leaf data") + if err := s.sendLeafData( path, metadataOnly, - ) + ); err != nil { + return errors.Wrap(err, "walk") + } return nil } } else { - logger.Info( + s.logger.Info( "sending leaves of known missing branch", zap.String( "path", @@ -549,40 +561,34 @@ func walk( ), ), ) - sendLeafData( - stream, - hypergraphStore, - localTree, + if err := s.sendLeafData( append(append([]int32{}, preTraversal...), child.Index), metadataOnly, - ) + ); err != nil { + return errors.Wrap(err, "walk") + } } } } - logger.Info("traversal completed, performing walk", pathString) - return walk( - ctx, - logger, + s.logger.Info("traversal completed, performing walk", pathString) + return s.walk( path, ltrav, rnode, incomingQueries, incomingResponses, - stream, - hypergraphStore, - localTree, metadataOnly, ) } } else { if slices.Compare(lpref, rpref) == 0 { - logger.Debug("prefixes match, diffing children") + s.logger.Debug("prefixes match, diffing children") for i := int32(0); i < 64; i++ { - logger.Debug("checking branch", zap.Int32("branch", i)) + s.logger.Debug("checking branch", zap.Int32("branch", i)) var lchild *protobufs.BranchChild = nil for _, lc := range lnode.Children { if lc.Index == i { - logger.Debug("local instance found", zap.Int32("branch", i)) + s.logger.Debug("local instance found", zap.Int32("branch", i)) lchild = lc break @@ -591,7 +597,7 @@ func walk( var rchild *protobufs.BranchChild = nil for _, rc := range rnode.Children { if rc.Index == i { - logger.Debug("remote instance found", zap.Int32("branch", i)) + s.logger.Debug("remote instance found", zap.Int32("branch", i)) rchild = rc break @@ -599,14 +605,13 @@ func walk( } if (lchild != nil && rchild == nil) || (lchild == nil && rchild != nil) { - logger.Info("branch divergence", pathString) - sendLeafData( - stream, - hypergraphStore, - localTree, + s.logger.Info("branch divergence", pathString) + if err := s.sendLeafData( path, metadataOnly, - ) + ); err != nil { + return errors.Wrap(err, "walk") + } } else { if lchild != nil { nextPath := append( @@ -614,35 +619,29 @@ func walk( lchild.Index, ) lc, rc, err := descendIndex( - ctx, + s.ctx, incomingResponses, - stream, - localTree, + s.stream, + s.localTree, nextPath, ) if err != nil { - logger.Info("incomplete branch descension, sending leaves") - sendLeafData( - stream, - hypergraphStore, - localTree, + s.logger.Info("incomplete branch descension, sending leaves") + if err := s.sendLeafData( nextPath, metadataOnly, - ) + ); err != nil { + return errors.Wrap(err, "walk") + } continue } - if err = walk( - ctx, - logger, + if err = s.walk( nextPath, lc, rc, incomingQueries, incomingResponses, - stream, - hypergraphStore, - localTree, metadataOnly, ); err != nil { return errors.Wrap(err, "walk") @@ -651,14 +650,13 @@ func walk( } } } else { - logger.Info("prefix mismatch on both sides", pathString) - sendLeafData( - stream, - hypergraphStore, - localTree, + s.logger.Info("prefix mismatch on both sides", pathString) + if err := s.sendLeafData( path, metadataOnly, - ) + ); err != nil { + return errors.Wrap(err, "walk") + } } } @@ -781,19 +779,22 @@ func syncTreeBidirectionallyServer( wg := sync.WaitGroup{} wg.Add(1) + manager := &streamManager{ + ctx: stream.Context(), + logger: logger, + stream: stream, + hypergraphStore: localHypergraphStore, + localTree: idSet.GetTree(), + lastSent: time.Now(), + } go func() { defer wg.Done() - err := walk( - stream.Context(), - logger, + err := manager.walk( []int32{}, branchInfo, response, incomingQueriesOut, incomingResponsesOut, - stream, - localHypergraphStore, - idSet.GetTree(), metadataOnly, ) if err != nil { @@ -802,6 +803,7 @@ func syncTreeBidirectionallyServer( }() lastReceived := time.Now() + leafUpdates := 0 outer: for { @@ -846,16 +848,43 @@ outer: idSet.Add(hypergraph.AtomFromBytes(remoteUpdate.Value)) + leafUpdates++ lastReceived = time.Now() - case <-time.After(5 * time.Second): - if time.Since(lastReceived) > 5*time.Second { - break outer + + if leafUpdates > 10000 { + roots := localHypergraph.Commit() + logger.Info( + "hypergraph root commit", + zap.String("root", hex.EncodeToString(roots[0])), + ) + + if err = localHypergraphStore.SaveHypergraph(localHypergraph); err != nil { + logger.Error("error while saving", zap.Error(err)) + } + + leafUpdates = 0 + } + case <-time.After(30 * time.Second): + if time.Since(lastReceived) > 30*time.Second { + if time.Since(manager.lastSent) > 30*time.Second { + break outer + } } } } wg.Wait() + roots := localHypergraph.Commit() + logger.Info( + "hypergraph root commit", + zap.String("root", hex.EncodeToString(roots[0])), + ) + + if err = localHypergraphStore.SaveHypergraph(localHypergraph); err != nil { + logger.Error("error while saving", zap.Error(err)) + } + total, _ := idSet.GetTree().GetMetadata() logger.Info( "current progress", @@ -873,7 +902,17 @@ func (s *hypergraphComparisonServer) HyperStream( } defer s.syncController.EndSyncSession() - return syncTreeBidirectionallyServer( + peerId, ok := grpc.PeerIDFromContext(stream.Context()) + if !ok { + return errors.New("could not identify peer") + } + + status, ok := s.syncController.SyncStatus[peerId.String()] + if ok && time.Since(status.LastSynced) < 30*time.Minute { + return errors.New("peer too recently synced") + } + + err := syncTreeBidirectionallyServer( stream, s.logger, s.localHypergraphStore, @@ -881,6 +920,12 @@ func (s *hypergraphComparisonServer) HyperStream( false, s.debugTotalCoins, ) + s.syncController.SyncStatus[peerId.String()] = &SyncInfo{ + Unreachable: false, + LastSynced: time.Now(), + } + + return err } // SyncTreeBidirectionally performs the tree diff and synchronization. @@ -893,6 +938,7 @@ func SyncTreeBidirectionally( shardKey []byte, phaseSet protobufs.HypergraphPhaseSet, hypergraphStore store.HypergraphStore, + localHypergraph *hypergraph.Hypergraph, set *hypergraph.IdSet, syncController *SyncController, debugTotalCoins int, @@ -983,19 +1029,24 @@ func SyncTreeBidirectionally( wg := sync.WaitGroup{} wg.Add(1) + + manager := &streamManager{ + ctx: stream.Context(), + logger: logger, + stream: stream, + hypergraphStore: hypergraphStore, + localTree: set.GetTree(), + lastSent: time.Now(), + } + go func() { defer wg.Done() - err := walk( - stream.Context(), - logger, + err := manager.walk( []int32{}, branchInfo, response, incomingQueriesOut, incomingResponsesOut, - stream, - hypergraphStore, - set.GetTree(), metadataOnly, ) if err != nil { @@ -1003,6 +1054,7 @@ func SyncTreeBidirectionally( } }() + leafUpdates := 0 lastReceived := time.Now() outer: @@ -1048,10 +1100,27 @@ outer: set.Add(hypergraph.AtomFromBytes(remoteUpdate.Value)) + leafUpdates++ lastReceived = time.Now() - case <-time.After(5 * time.Second): - if time.Since(lastReceived) > 5*time.Second { - break outer + + if leafUpdates > 10000 { + roots := localHypergraph.Commit() + logger.Info( + "hypergraph root commit", + zap.String("root", hex.EncodeToString(roots[0])), + ) + + if err = hypergraphStore.SaveHypergraph(localHypergraph); err != nil { + logger.Error("error while saving", zap.Error(err)) + } + + leafUpdates = 0 + } + case <-time.After(30 * time.Second): + if time.Since(lastReceived) > 30*time.Second { + if time.Since(manager.lastSent) > 30*time.Second { + break outer + } } } } diff --git a/node/rpc/hypergraph_sync_rpc_server_test.go b/node/rpc/hypergraph_sync_rpc_server_test.go index 7cab45d..6c403c6 100644 --- a/node/rpc/hypergraph_sync_rpc_server_test.go +++ b/node/rpc/hypergraph_sync_rpc_server_test.go @@ -12,6 +12,8 @@ import ( "testing" "github.com/cloudflare/circl/sign/ed448" + pcrypto "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/assert" "go.uber.org/zap" "google.golang.org/grpc" @@ -19,11 +21,21 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/config" "source.quilibrium.com/quilibrium/monorepo/node/crypto" "source.quilibrium.com/quilibrium/monorepo/node/hypergraph/application" + internal_grpc "source.quilibrium.com/quilibrium/monorepo/node/internal/grpc" "source.quilibrium.com/quilibrium/monorepo/node/protobufs" "source.quilibrium.com/quilibrium/monorepo/node/rpc" "source.quilibrium.com/quilibrium/monorepo/node/store" ) +type serverStream struct { + grpc.ServerStream + ctx context.Context +} + +func (s *serverStream) Context() context.Context { + return s.ctx +} + type Operation struct { Type string // "AddVertex", "RemoveVertex", "AddHyperedge", "RemoveHyperedge" Vertex application.Vertex @@ -216,7 +228,29 @@ func TestHypergraphSyncServer(t *testing.T) { log.Fatalf("Server: failed to listen: %v", err) } - grpcServer := grpc.NewServer() + grpcServer := grpc.NewServer( + grpc.ChainStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) { + _, priv, _ := ed448.GenerateKey(rand.Reader) + privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv) + if err != nil { + t.FailNow() + } + + pub := privKey.GetPublic() + peerId, err := peer.IDFromPublicKey(pub) + if err != nil { + t.FailNow() + } + + return handler(srv, &serverStream{ + ServerStream: ss, + ctx: internal_grpc.NewContextWithPeerID( + ss.Context(), + peerId, + ), + }) + }), + ) protobufs.RegisterHypergraphComparisonServiceServer( grpcServer, rpc.NewHypergraphComparisonServer(logger, serverHypergraphStore, crdts[0], rpc.NewSyncController(), numOperations), @@ -227,6 +261,7 @@ func TestHypergraphSyncServer(t *testing.T) { log.Fatalf("Server: failed to serve: %v", err) } }() + conn, err := grpc.DialContext(context.TODO(), "localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf("Client: failed to listen: %v", err) @@ -239,7 +274,7 @@ func TestHypergraphSyncServer(t *testing.T) { syncController := rpc.NewSyncController() - err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey], syncController, numOperations, false) + err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1], crdts[1].GetVertexAdds()[shardKey], syncController, numOperations, false) if err != nil { log.Fatalf("Client: failed to sync 1: %v", err) } @@ -258,7 +293,7 @@ func TestHypergraphSyncServer(t *testing.T) { log.Fatalf("Client: failed to stream: %v", err) } - err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey], syncController, numOperations, false) + err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1], crdts[1].GetVertexAdds()[shardKey], syncController, numOperations, false) if err != nil { log.Fatalf("Client: failed to sync 2: %v", err) } diff --git a/node/store/coin.go b/node/store/coin.go index 1c55c07..b836267 100644 --- a/node/store/coin.go +++ b/node/store/coin.go @@ -21,7 +21,11 @@ type CoinStore interface { []*protobufs.PreCoinProof, error, ) - GetCoinByAddress(txn Transaction, address []byte) (*protobufs.Coin, error) + GetCoinByAddress(txn Transaction, address []byte) ( + uint64, + *protobufs.Coin, + error, + ) GetPreCoinProofByAddress(address []byte) (*protobufs.PreCoinProof, error) RangeCoins(start []byte, end []byte) (Iterator, error) RangePreCoinProofs() (Iterator, error) @@ -197,6 +201,7 @@ func (p *PebbleCoinStore) GetPreCoinProofsForOwner(owner []byte) ( } func (p *PebbleCoinStore) GetCoinByAddress(txn Transaction, address []byte) ( + uint64, *protobufs.Coin, error, ) { @@ -211,20 +216,22 @@ func (p *PebbleCoinStore) GetCoinByAddress(txn Transaction, address []byte) ( if err != nil { if errors.Is(err, pebble.ErrNotFound) { err = ErrNotFound - return nil, err + return 0, nil, err } err = errors.Wrap(err, "get coin by address") - return nil, err + return 0, nil, err } defer closer.Close() coin := &protobufs.Coin{} err = proto.Unmarshal(coinBytes[8:], coin) if err != nil { - return nil, errors.Wrap(err, "get coin by address") + return 0, nil, errors.Wrap(err, "get coin by address") } - return coin, nil + frameNumber := binary.BigEndian.Uint64(coinBytes[:8]) + + return frameNumber, coin, nil } func (p *PebbleCoinStore) GetPreCoinProofByAddress(address []byte) (