periodic saving, better logging, better peer choices, additional rebuild

This commit is contained in:
Cassandra Heart 2025-03-07 18:54:51 -06:00
parent 12aafe55e8
commit 03ecf4e417
No known key found for this signature in database
GPG Key ID: 6352152859385958
9 changed files with 384 additions and 153 deletions

View File

@ -674,7 +674,7 @@ func TestHandlePreMidnightMint(t *testing.T) {
err = app.CoinStore.PutCoin(txn, 1, a, e.Coin)
assert.NoError(t, err)
case *protobufs.TokenOutput_DeletedCoin:
c, err := app.CoinStore.GetCoinByAddress(nil, e.DeletedCoin.Address)
_, c, err := app.CoinStore.GetCoinByAddress(nil, e.DeletedCoin.Address)
assert.NoError(t, err)
err = app.CoinStore.DeleteCoin(txn, e.DeletedCoin.Address, c)
assert.NoError(t, err)

View File

@ -63,7 +63,7 @@ func (a *TokenApplication) handleMerge(
owner := &protobufs.AccountRef{}
deleted := []*protobufs.TokenOutput{}
for _, c := range t.Coins {
coin, err := a.CoinStore.GetCoinByAddress(nil, c.Address)
_, coin, err := a.CoinStore.GetCoinByAddress(nil, c.Address)
if err != nil {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle merge")
}

View File

@ -338,7 +338,7 @@ func TestHandleProverJoin(t *testing.T) {
err = app.CoinStore.PutCoin(txn, 1, a, e.Coin)
assert.NoError(t, err)
case *protobufs.TokenOutput_DeletedCoin:
c, err := app.CoinStore.GetCoinByAddress(nil, e.DeletedCoin.Address)
_, c, err := app.CoinStore.GetCoinByAddress(nil, e.DeletedCoin.Address)
assert.NoError(t, err)
err = app.CoinStore.DeleteCoin(txn, e.DeletedCoin.Address, c)
assert.NoError(t, err)
@ -387,7 +387,7 @@ func TestHandleProverJoin(t *testing.T) {
err = app.CoinStore.PutCoin(txn, 4, a, e.Coin)
assert.NoError(t, err)
case *protobufs.TokenOutput_DeletedCoin:
c, err := app.CoinStore.GetCoinByAddress(txn, e.DeletedCoin.Address)
_, c, err := app.CoinStore.GetCoinByAddress(txn, e.DeletedCoin.Address)
assert.NoError(t, err)
err = app.CoinStore.DeleteCoin(txn, e.DeletedCoin.Address, c)
assert.NoError(t, err)
@ -441,7 +441,7 @@ func TestHandleProverJoin(t *testing.T) {
assert.NoError(t, err)
coins = append(coins, a)
case *protobufs.TokenOutput_DeletedCoin:
c, err := app.CoinStore.GetCoinByAddress(txn, e.DeletedCoin.Address)
_, c, err := app.CoinStore.GetCoinByAddress(txn, e.DeletedCoin.Address)
assert.NoError(t, err)
err = app.CoinStore.DeleteCoin(txn, e.DeletedCoin.Address, c)
assert.NoError(t, err)
@ -496,7 +496,7 @@ func TestHandleProverJoin(t *testing.T) {
assert.NoError(t, err)
coins = append(coins, a)
case *protobufs.TokenOutput_DeletedCoin:
c, err := app.CoinStore.GetCoinByAddress(txn, e.DeletedCoin.Address)
_, c, err := app.CoinStore.GetCoinByAddress(txn, e.DeletedCoin.Address)
assert.NoError(t, err)
err = app.CoinStore.DeleteCoin(txn, e.DeletedCoin.Address, c)
assert.NoError(t, err)

View File

@ -23,7 +23,7 @@ func (a *TokenApplication) handleSplit(
newCoins := []*protobufs.Coin{}
newAmounts := []*big.Int{}
coin, err := a.CoinStore.GetCoinByAddress(nil, t.OfCoin.Address)
_, coin, err := a.CoinStore.GetCoinByAddress(nil, t.OfCoin.Address)
if err != nil {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle split")
}

View File

@ -24,7 +24,7 @@ func (a *TokenApplication) handleTransfer(
return nil, errors.Wrap(ErrInvalidStateTransition, "handle transfer")
}
coin, err := a.CoinStore.GetCoinByAddress(nil, t.OfCoin.Address)
_, coin, err := a.CoinStore.GetCoinByAddress(nil, t.OfCoin.Address)
if err != nil {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle transfer")
}

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto"
"encoding/binary"
"encoding/hex"
"fmt"
"math/big"
@ -268,7 +269,7 @@ func NewTokenExecutionEngine(
}
tries := []*tries.RollingFrecencyCritbitTrie{
&tries.RollingFrecencyCritbitTrie{},
{},
}
proverKeys = [][]byte{config.GetGenesis().Beacon}
for _, key := range proverKeys {
@ -395,8 +396,13 @@ func NewTokenExecutionEngine(
panic(err)
}
includeSet := [][]byte{}
for iter.First(); iter.Valid(); iter.Next() {
if bytes.Compare(iter.Key()[2:], start) >= 0 && bytes.Compare(iter.Key()[2:], end) < 0 {
key := make([]byte, len(iter.Key())-2)
copy(key, iter.Key()[2:])
includeSet = append(includeSet, key)
specificRange++
}
totalCoins++
@ -419,6 +425,28 @@ func NewTokenExecutionEngine(
if e.hypergraph == nil || len(e.hypergraph.GetVertexAdds()) == 0 {
e.rebuildHypergraph(specificRange)
}
vertices, ok := e.hypergraph.GetVertexAdds()[hypergraph.ShardKey{
L1: [3]byte(p2p.GetBloomFilterIndices(intrinsicFilter[:], 256, 3)),
L2: [32]byte(slices.Clone(intrinsicFilter[:])),
}]
if !ok {
panic("hypergraph does not contain id set for application")
}
rebuildSet := [][]byte{}
for _, inc := range includeSet {
if !vertices.Has(
[64]byte(slices.Concat(intrinsicFilter, inc)),
) {
rebuildSet = append(rebuildSet, inc)
}
}
if len(rebuildSet) != 0 {
e.rebuildMissingSetForHypergraph(rebuildSet)
}
}
syncServer := qgrpc.NewServer(
@ -622,12 +650,24 @@ func (e *TokenExecutionEngine) hyperSync(totalCoins int) {
}
defer e.syncController.EndSyncSession()
peerId, err := e.pubSub.GetRandomPeer(
append([]byte{0x00}, e.intrinsicFilter...),
)
if err != nil {
e.logger.Error("error getting peer", zap.Error(err))
return
var peerId []byte = nil
for peerId == nil {
var err error
peerId, err = e.pubSub.GetRandomPeer(
append([]byte{0x00}, e.intrinsicFilter...),
)
if err != nil {
e.logger.Error("error getting peer", zap.Error(err))
return
}
info, ok := e.syncController.SyncStatus[peer.ID(peerId).String()]
if ok {
if info.Unreachable || gotime.Since(info.LastSynced) < 30*gotime.Minute {
peerId = nil
continue
}
}
}
e.logger.Info(
@ -643,6 +683,10 @@ func (e *TokenExecutionEngine) hyperSync(totalCoins int) {
"could not establish direct channel",
zap.Error(err),
)
e.syncController.SyncStatus[peer.ID(peerId).String()] = &rpc.SyncInfo{
Unreachable: true,
LastSynced: gotime.Now(),
}
return
}
defer func() {
@ -656,6 +700,10 @@ func (e *TokenExecutionEngine) hyperSync(totalCoins int) {
stream, err := client.HyperStream(e.ctx)
if err != nil {
e.logger.Error("could not open stream", zap.Error(err))
e.syncController.SyncStatus[peer.ID(peerId).String()] = &rpc.SyncInfo{
Unreachable: false,
LastSynced: gotime.Now(),
}
return
}
@ -667,6 +715,7 @@ func (e *TokenExecutionEngine) hyperSync(totalCoins int) {
append(append([]byte{}, key.L1[:]...), key.L2[:]...),
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
e.hypergraphStore,
e.hypergraph,
set,
e.syncController,
totalCoins,
@ -674,7 +723,11 @@ func (e *TokenExecutionEngine) hyperSync(totalCoins int) {
)
if err != nil {
e.logger.Error("error while synchronizing", zap.Error(err))
return
e.syncController.SyncStatus[peer.ID(peerId).String()] = &rpc.SyncInfo{
Unreachable: false,
LastSynced: gotime.Now(),
}
continue
}
}
@ -689,6 +742,73 @@ func (e *TokenExecutionEngine) hyperSync(totalCoins int) {
}
}
func (e *TokenExecutionEngine) rebuildMissingSetForHypergraph(set [][]byte) {
e.logger.Info("rebuilding missing set entries")
var batchKey, batchValue [][]byte
processed := 0
totalRange := len(set)
for _, address := range set {
processed++
key := slices.Clone(address)
batchKey = append(batchKey, key)
frameNumber, coin, err := e.coinStore.GetCoinByAddress(nil, address)
if err != nil {
panic(err)
}
value := []byte{}
value = binary.BigEndian.AppendUint64(value, frameNumber)
value = append(value, coin.Amount...)
// implicit
value = append(value, 0x00)
value = append(value, coin.Owner.GetImplicitAccount().GetAddress()...)
// domain len
value = append(value, 0x00)
value = append(value, coin.Intersection...)
batchValue = append(batchValue, value)
if len(batchKey) == runtime.NumCPU() {
e.addBatchToHypergraph(batchKey, batchValue)
e.logger.Info(
"processed batch",
zap.Float32("percentage", float32(processed)/float32(totalRange)),
)
batchKey = [][]byte{}
batchValue = [][]byte{}
}
}
if len(batchKey) != 0 {
e.addBatchToHypergraph(batchKey, batchValue)
}
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
panic(err)
}
e.logger.Info("committing hypergraph")
roots := e.hypergraph.Commit()
e.logger.Info(
"committed hypergraph state",
zap.String("root", fmt.Sprintf("%x", roots[0])),
)
err = e.hypergraphStore.SaveHypergraph(e.hypergraph)
if err != nil {
txn.Abort()
panic(err)
}
if err = txn.Commit(); err != nil {
txn.Abort()
panic(err)
}
}
func (e *TokenExecutionEngine) rebuildHypergraph(totalRange int) {
e.logger.Info("rebuilding hypergraph")
e.hypergraph = hypergraph.NewHypergraph()
@ -1023,7 +1143,7 @@ func (e *TokenExecutionEngine) ProcessFrame(
panic(err)
}
case *protobufs.TokenOutput_DeletedCoin:
coin, err := e.coinStore.GetCoinByAddress(nil, o.DeletedCoin.Address)
_, coin, err := e.coinStore.GetCoinByAddress(nil, o.DeletedCoin.Address)
if err != nil {
txn.Abort()
return nil, errors.Wrap(err, "process frame")

View File

@ -16,12 +16,14 @@ import (
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/node/crypto"
hypergraph "source.quilibrium.com/quilibrium/monorepo/node/hypergraph/application"
"source.quilibrium.com/quilibrium/monorepo/node/internal/grpc"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
"source.quilibrium.com/quilibrium/monorepo/node/store"
)
type SyncController struct {
isSyncing atomic.Bool
isSyncing atomic.Bool
SyncStatus map[string]*SyncInfo
}
func (s *SyncController) TryEstablishSyncSession() bool {
@ -32,9 +34,15 @@ func (s *SyncController) EndSyncSession() {
s.isSyncing.Store(false)
}
type SyncInfo struct {
Unreachable bool
LastSynced time.Time
}
func NewSyncController() *SyncController {
return &SyncController{
isSyncing: atomic.Bool{},
isSyncing: atomic.Bool{},
SyncStatus: map[string]*SyncInfo{},
}
}
@ -65,12 +73,18 @@ func NewHypergraphComparisonServer(
}
}
type streamManager struct {
ctx context.Context
logger *zap.Logger
stream HyperStream
hypergraphStore store.HypergraphStore
localTree *crypto.VectorCommitmentTree
lastSent time.Time
}
// sendLeafData builds a LeafData message (with the full leaf data) for the
// node at the given path in the local tree and sends it over the stream.
func sendLeafData(
stream HyperStream,
hypergraphStore store.HypergraphStore,
localTree *crypto.VectorCommitmentTree,
func (s *streamManager) sendLeafData(
path []int32,
metadataOnly bool,
) error {
@ -82,7 +96,7 @@ func sendLeafData(
Size: leaf.Size.FillBytes(make([]byte, 32)),
}
if !metadataOnly {
tree, err := hypergraphStore.LoadVertexTree(leaf.Key)
tree, err := s.hypergraphStore.LoadVertexTree(leaf.Key)
if err == nil {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
@ -97,10 +111,28 @@ func sendLeafData(
LeafData: update,
},
}
return stream.Send(msg)
s.logger.Info(
"sending leaf data",
zap.String("key", hex.EncodeToString(leaf.Key)),
)
select {
case <-s.ctx.Done():
return s.ctx.Err()
default:
}
err := s.stream.Send(msg)
if err != nil {
return errors.Wrap(err, "send leaf data")
}
s.lastSent = time.Now()
return nil
}
node := getNodeAtPath(localTree.Root, path, 0)
node := getNodeAtPath(s.localTree.Root, path, 0)
leaf, ok := node.(*crypto.VectorCommitmentLeafNode)
if !ok {
children := crypto.GetAllLeaves(node)
@ -251,7 +283,7 @@ func queryNext(
)
}
return resp, nil
case <-time.After(5 * time.Second):
case <-time.After(30 * time.Second):
return nil, errors.Wrap(
errors.New("timed out"),
"handle query",
@ -306,7 +338,7 @@ func handleQueryNext(
}
return branchInfo, nil
case <-time.After(5 * time.Second):
case <-time.After(30 * time.Second):
return nil, errors.Wrap(
errors.New("timed out"),
"handle query next",
@ -366,7 +398,7 @@ func descendIndex(
}
return branchInfo, resp, nil
case <-time.After(5 * time.Second):
case <-time.After(30 * time.Second):
return nil, nil, errors.Wrap(
errors.New("timed out"),
"descend index",
@ -387,38 +419,30 @@ func packPath(path []int32) []byte {
return b
}
func walk(
ctx context.Context,
logger *zap.Logger,
func (s *streamManager) walk(
path []int32,
lnode, rnode *protobufs.HypergraphComparisonResponse,
incomingQueries <-chan *protobufs.HypergraphComparisonQuery,
incomingResponses <-chan *protobufs.HypergraphComparisonResponse,
stream HyperStream,
hypergraphStore store.HypergraphStore,
localTree *crypto.VectorCommitmentTree,
metadataOnly bool,
) error {
select {
case <-ctx.Done():
return errors.New("context canceled")
case <-s.ctx.Done():
return s.ctx.Err()
default:
}
pathString := zap.String("path", hex.EncodeToString(packPath(path)))
if bytes.Equal(lnode.Commitment, rnode.Commitment) {
logger.Info("commitments match", pathString)
s.logger.Info("commitments match", pathString)
return nil
}
if isLeaf(lnode) && isLeaf(rnode) {
if !bytes.Equal(lnode.Commitment, rnode.Commitment) {
logger.Info("leaves mismatch commitments, sending", pathString)
sendLeafData(
stream,
hypergraphStore,
localTree,
s.logger.Info("leaves mismatch commitments, sending", pathString)
s.sendLeafData(
path,
metadataOnly,
)
@ -427,46 +451,43 @@ func walk(
}
if isLeaf(rnode) || isLeaf(lnode) {
logger.Info("leaf/branch mismatch at path", pathString)
sendLeafData(
stream,
hypergraphStore,
localTree,
s.logger.Info("leaf/branch mismatch at path", pathString)
err := s.sendLeafData(
path,
metadataOnly,
)
return nil
return errors.Wrap(err, "walk")
}
lpref := lnode.Path
rpref := rnode.Path
if len(lpref) != len(rpref) {
logger.Info(
s.logger.Info(
"prefix length mismatch",
zap.Int("local_prefix", len(lpref)),
zap.Int("remote_prefix", len(rpref)),
pathString,
)
if len(lpref) > len(rpref) {
logger.Info("local prefix longer, traversing remote to path", pathString)
s.logger.Info("local prefix longer, traversing remote to path", pathString)
traverse := lpref[len(rpref)-1:]
rtrav := rnode
traversePath := append([]int32{}, rpref...)
for _, nibble := range traverse {
logger.Info("attempting remote traversal step")
s.logger.Info("attempting remote traversal step")
for _, child := range rtrav.Children {
if child.Index == nibble {
logger.Info("sending query")
s.logger.Info("sending query")
traversePath = append(traversePath, child.Index)
var err error
rtrav, err = queryNext(
ctx,
s.ctx,
incomingResponses,
stream,
s.stream,
traversePath,
)
if err != nil {
logger.Error("query failed", zap.Error(err))
s.logger.Error("query failed", zap.Error(err))
return errors.Wrap(err, "walk")
}
@ -475,70 +496,61 @@ func walk(
}
if rtrav == nil {
logger.Info("traversal could not reach path, sending leaf data")
sendLeafData(
stream,
hypergraphStore,
localTree,
s.logger.Info("traversal could not reach path, sending leaf data")
err := s.sendLeafData(
path,
metadataOnly,
)
return nil
return errors.Wrap(err, "walk")
}
}
logger.Info("traversal completed, performing walk", pathString)
return walk(
ctx,
logger,
s.logger.Info("traversal completed, performing walk", pathString)
return s.walk(
path,
lnode,
rtrav,
incomingQueries,
incomingResponses,
stream,
hypergraphStore,
localTree,
metadataOnly,
)
} else {
logger.Info("remote prefix longer, traversing local to path", pathString)
s.logger.Info("remote prefix longer, traversing local to path", pathString)
traverse := rpref[len(lpref)-1:]
ltrav := lnode
traversedPath := append([]int32{}, lnode.Path...)
for _, nibble := range traverse {
logger.Info("attempting local traversal step")
s.logger.Info("attempting local traversal step")
preTraversal := append([]int32{}, traversedPath...)
for _, child := range ltrav.Children {
if child.Index == nibble {
traversedPath = append(traversedPath, nibble)
var err error
logger.Info("expecting query")
s.logger.Info("expecting query")
ltrav, err = handleQueryNext(
ctx,
s.ctx,
incomingQueries,
stream,
localTree,
s.stream,
s.localTree,
traversedPath,
)
if err != nil {
logger.Error("expect failed", zap.Error(err))
s.logger.Error("expect failed", zap.Error(err))
return errors.Wrap(err, "walk")
}
if ltrav == nil {
logger.Info("traversal could not reach path, sending leaf data")
sendLeafData(
stream,
hypergraphStore,
localTree,
s.logger.Info("traversal could not reach path, sending leaf data")
if err := s.sendLeafData(
path,
metadataOnly,
)
); err != nil {
return errors.Wrap(err, "walk")
}
return nil
}
} else {
logger.Info(
s.logger.Info(
"sending leaves of known missing branch",
zap.String(
"path",
@ -549,40 +561,34 @@ func walk(
),
),
)
sendLeafData(
stream,
hypergraphStore,
localTree,
if err := s.sendLeafData(
append(append([]int32{}, preTraversal...), child.Index),
metadataOnly,
)
); err != nil {
return errors.Wrap(err, "walk")
}
}
}
}
logger.Info("traversal completed, performing walk", pathString)
return walk(
ctx,
logger,
s.logger.Info("traversal completed, performing walk", pathString)
return s.walk(
path,
ltrav,
rnode,
incomingQueries,
incomingResponses,
stream,
hypergraphStore,
localTree,
metadataOnly,
)
}
} else {
if slices.Compare(lpref, rpref) == 0 {
logger.Debug("prefixes match, diffing children")
s.logger.Debug("prefixes match, diffing children")
for i := int32(0); i < 64; i++ {
logger.Debug("checking branch", zap.Int32("branch", i))
s.logger.Debug("checking branch", zap.Int32("branch", i))
var lchild *protobufs.BranchChild = nil
for _, lc := range lnode.Children {
if lc.Index == i {
logger.Debug("local instance found", zap.Int32("branch", i))
s.logger.Debug("local instance found", zap.Int32("branch", i))
lchild = lc
break
@ -591,7 +597,7 @@ func walk(
var rchild *protobufs.BranchChild = nil
for _, rc := range rnode.Children {
if rc.Index == i {
logger.Debug("remote instance found", zap.Int32("branch", i))
s.logger.Debug("remote instance found", zap.Int32("branch", i))
rchild = rc
break
@ -599,14 +605,13 @@ func walk(
}
if (lchild != nil && rchild == nil) ||
(lchild == nil && rchild != nil) {
logger.Info("branch divergence", pathString)
sendLeafData(
stream,
hypergraphStore,
localTree,
s.logger.Info("branch divergence", pathString)
if err := s.sendLeafData(
path,
metadataOnly,
)
); err != nil {
return errors.Wrap(err, "walk")
}
} else {
if lchild != nil {
nextPath := append(
@ -614,35 +619,29 @@ func walk(
lchild.Index,
)
lc, rc, err := descendIndex(
ctx,
s.ctx,
incomingResponses,
stream,
localTree,
s.stream,
s.localTree,
nextPath,
)
if err != nil {
logger.Info("incomplete branch descension, sending leaves")
sendLeafData(
stream,
hypergraphStore,
localTree,
s.logger.Info("incomplete branch descension, sending leaves")
if err := s.sendLeafData(
nextPath,
metadataOnly,
)
); err != nil {
return errors.Wrap(err, "walk")
}
continue
}
if err = walk(
ctx,
logger,
if err = s.walk(
nextPath,
lc,
rc,
incomingQueries,
incomingResponses,
stream,
hypergraphStore,
localTree,
metadataOnly,
); err != nil {
return errors.Wrap(err, "walk")
@ -651,14 +650,13 @@ func walk(
}
}
} else {
logger.Info("prefix mismatch on both sides", pathString)
sendLeafData(
stream,
hypergraphStore,
localTree,
s.logger.Info("prefix mismatch on both sides", pathString)
if err := s.sendLeafData(
path,
metadataOnly,
)
); err != nil {
return errors.Wrap(err, "walk")
}
}
}
@ -781,19 +779,22 @@ func syncTreeBidirectionallyServer(
wg := sync.WaitGroup{}
wg.Add(1)
manager := &streamManager{
ctx: stream.Context(),
logger: logger,
stream: stream,
hypergraphStore: localHypergraphStore,
localTree: idSet.GetTree(),
lastSent: time.Now(),
}
go func() {
defer wg.Done()
err := walk(
stream.Context(),
logger,
err := manager.walk(
[]int32{},
branchInfo,
response,
incomingQueriesOut,
incomingResponsesOut,
stream,
localHypergraphStore,
idSet.GetTree(),
metadataOnly,
)
if err != nil {
@ -802,6 +803,7 @@ func syncTreeBidirectionallyServer(
}()
lastReceived := time.Now()
leafUpdates := 0
outer:
for {
@ -846,16 +848,43 @@ outer:
idSet.Add(hypergraph.AtomFromBytes(remoteUpdate.Value))
leafUpdates++
lastReceived = time.Now()
case <-time.After(5 * time.Second):
if time.Since(lastReceived) > 5*time.Second {
break outer
if leafUpdates > 10000 {
roots := localHypergraph.Commit()
logger.Info(
"hypergraph root commit",
zap.String("root", hex.EncodeToString(roots[0])),
)
if err = localHypergraphStore.SaveHypergraph(localHypergraph); err != nil {
logger.Error("error while saving", zap.Error(err))
}
leafUpdates = 0
}
case <-time.After(30 * time.Second):
if time.Since(lastReceived) > 30*time.Second {
if time.Since(manager.lastSent) > 30*time.Second {
break outer
}
}
}
}
wg.Wait()
roots := localHypergraph.Commit()
logger.Info(
"hypergraph root commit",
zap.String("root", hex.EncodeToString(roots[0])),
)
if err = localHypergraphStore.SaveHypergraph(localHypergraph); err != nil {
logger.Error("error while saving", zap.Error(err))
}
total, _ := idSet.GetTree().GetMetadata()
logger.Info(
"current progress",
@ -873,7 +902,17 @@ func (s *hypergraphComparisonServer) HyperStream(
}
defer s.syncController.EndSyncSession()
return syncTreeBidirectionallyServer(
peerId, ok := grpc.PeerIDFromContext(stream.Context())
if !ok {
return errors.New("could not identify peer")
}
status, ok := s.syncController.SyncStatus[peerId.String()]
if ok && time.Since(status.LastSynced) < 30*time.Minute {
return errors.New("peer too recently synced")
}
err := syncTreeBidirectionallyServer(
stream,
s.logger,
s.localHypergraphStore,
@ -881,6 +920,12 @@ func (s *hypergraphComparisonServer) HyperStream(
false,
s.debugTotalCoins,
)
s.syncController.SyncStatus[peerId.String()] = &SyncInfo{
Unreachable: false,
LastSynced: time.Now(),
}
return err
}
// SyncTreeBidirectionally performs the tree diff and synchronization.
@ -893,6 +938,7 @@ func SyncTreeBidirectionally(
shardKey []byte,
phaseSet protobufs.HypergraphPhaseSet,
hypergraphStore store.HypergraphStore,
localHypergraph *hypergraph.Hypergraph,
set *hypergraph.IdSet,
syncController *SyncController,
debugTotalCoins int,
@ -983,19 +1029,24 @@ func SyncTreeBidirectionally(
wg := sync.WaitGroup{}
wg.Add(1)
manager := &streamManager{
ctx: stream.Context(),
logger: logger,
stream: stream,
hypergraphStore: hypergraphStore,
localTree: set.GetTree(),
lastSent: time.Now(),
}
go func() {
defer wg.Done()
err := walk(
stream.Context(),
logger,
err := manager.walk(
[]int32{},
branchInfo,
response,
incomingQueriesOut,
incomingResponsesOut,
stream,
hypergraphStore,
set.GetTree(),
metadataOnly,
)
if err != nil {
@ -1003,6 +1054,7 @@ func SyncTreeBidirectionally(
}
}()
leafUpdates := 0
lastReceived := time.Now()
outer:
@ -1048,10 +1100,27 @@ outer:
set.Add(hypergraph.AtomFromBytes(remoteUpdate.Value))
leafUpdates++
lastReceived = time.Now()
case <-time.After(5 * time.Second):
if time.Since(lastReceived) > 5*time.Second {
break outer
if leafUpdates > 10000 {
roots := localHypergraph.Commit()
logger.Info(
"hypergraph root commit",
zap.String("root", hex.EncodeToString(roots[0])),
)
if err = hypergraphStore.SaveHypergraph(localHypergraph); err != nil {
logger.Error("error while saving", zap.Error(err))
}
leafUpdates = 0
}
case <-time.After(30 * time.Second):
if time.Since(lastReceived) > 30*time.Second {
if time.Since(manager.lastSent) > 30*time.Second {
break outer
}
}
}
}

View File

@ -12,6 +12,8 @@ import (
"testing"
"github.com/cloudflare/circl/sign/ed448"
pcrypto "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -19,11 +21,21 @@ import (
"source.quilibrium.com/quilibrium/monorepo/node/config"
"source.quilibrium.com/quilibrium/monorepo/node/crypto"
"source.quilibrium.com/quilibrium/monorepo/node/hypergraph/application"
internal_grpc "source.quilibrium.com/quilibrium/monorepo/node/internal/grpc"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
"source.quilibrium.com/quilibrium/monorepo/node/rpc"
"source.quilibrium.com/quilibrium/monorepo/node/store"
)
type serverStream struct {
grpc.ServerStream
ctx context.Context
}
func (s *serverStream) Context() context.Context {
return s.ctx
}
type Operation struct {
Type string // "AddVertex", "RemoveVertex", "AddHyperedge", "RemoveHyperedge"
Vertex application.Vertex
@ -216,7 +228,29 @@ func TestHypergraphSyncServer(t *testing.T) {
log.Fatalf("Server: failed to listen: %v", err)
}
grpcServer := grpc.NewServer()
grpcServer := grpc.NewServer(
grpc.ChainStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
_, priv, _ := ed448.GenerateKey(rand.Reader)
privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv)
if err != nil {
t.FailNow()
}
pub := privKey.GetPublic()
peerId, err := peer.IDFromPublicKey(pub)
if err != nil {
t.FailNow()
}
return handler(srv, &serverStream{
ServerStream: ss,
ctx: internal_grpc.NewContextWithPeerID(
ss.Context(),
peerId,
),
})
}),
)
protobufs.RegisterHypergraphComparisonServiceServer(
grpcServer,
rpc.NewHypergraphComparisonServer(logger, serverHypergraphStore, crdts[0], rpc.NewSyncController(), numOperations),
@ -227,6 +261,7 @@ func TestHypergraphSyncServer(t *testing.T) {
log.Fatalf("Server: failed to serve: %v", err)
}
}()
conn, err := grpc.DialContext(context.TODO(), "localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("Client: failed to listen: %v", err)
@ -239,7 +274,7 @@ func TestHypergraphSyncServer(t *testing.T) {
syncController := rpc.NewSyncController()
err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey], syncController, numOperations, false)
err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1], crdts[1].GetVertexAdds()[shardKey], syncController, numOperations, false)
if err != nil {
log.Fatalf("Client: failed to sync 1: %v", err)
}
@ -258,7 +293,7 @@ func TestHypergraphSyncServer(t *testing.T) {
log.Fatalf("Client: failed to stream: %v", err)
}
err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey], syncController, numOperations, false)
err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1], crdts[1].GetVertexAdds()[shardKey], syncController, numOperations, false)
if err != nil {
log.Fatalf("Client: failed to sync 2: %v", err)
}

View File

@ -21,7 +21,11 @@ type CoinStore interface {
[]*protobufs.PreCoinProof,
error,
)
GetCoinByAddress(txn Transaction, address []byte) (*protobufs.Coin, error)
GetCoinByAddress(txn Transaction, address []byte) (
uint64,
*protobufs.Coin,
error,
)
GetPreCoinProofByAddress(address []byte) (*protobufs.PreCoinProof, error)
RangeCoins(start []byte, end []byte) (Iterator, error)
RangePreCoinProofs() (Iterator, error)
@ -197,6 +201,7 @@ func (p *PebbleCoinStore) GetPreCoinProofsForOwner(owner []byte) (
}
func (p *PebbleCoinStore) GetCoinByAddress(txn Transaction, address []byte) (
uint64,
*protobufs.Coin,
error,
) {
@ -211,20 +216,22 @@ func (p *PebbleCoinStore) GetCoinByAddress(txn Transaction, address []byte) (
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
err = ErrNotFound
return nil, err
return 0, nil, err
}
err = errors.Wrap(err, "get coin by address")
return nil, err
return 0, nil, err
}
defer closer.Close()
coin := &protobufs.Coin{}
err = proto.Unmarshal(coinBytes[8:], coin)
if err != nil {
return nil, errors.Wrap(err, "get coin by address")
return 0, nil, errors.Wrap(err, "get coin by address")
}
return coin, nil
frameNumber := binary.BigEndian.Uint64(coinBytes[:8])
return frameNumber, coin, nil
}
func (p *PebbleCoinStore) GetPreCoinProofByAddress(address []byte) (