diff --git a/RELEASE-NOTES b/RELEASE-NOTES index b29e293..f9ff1ca 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -1,3 +1,10 @@ +# 2.1.0.14 +- Resolves race condition around QC processing +- Remove noisy sync logs +- Skip unnecessary prover check for global prover info +- Fix issue with 100+ rejections/confirmations +- Resolve sync panic + # 2.1.0.13 - Extends ProverConfirm and ProverReject to have multiple filters per message - Adds snapshot integration to allow hypersync to occur concurrently with writes diff --git a/config/version.go b/config/version.go index 02dcba1..02aaac7 100644 --- a/config/version.go +++ b/config/version.go @@ -43,7 +43,7 @@ func FormatVersion(version []byte) string { } func GetPatchNumber() byte { - return 0x0d + return 0x0e } func GetRCNumber() byte { diff --git a/consensus/participant/participant.go b/consensus/participant/participant.go index ee1dc2a..b16d974 100644 --- a/consensus/participant/participant.go +++ b/consensus/participant/participant.go @@ -44,7 +44,7 @@ func NewParticipant[ pending []*models.SignedProposal[StateT, VoteT], ) (*eventloop.EventLoop[StateT, VoteT], error) { cfg, err := timeout.NewConfig( - 24*time.Second, + 36*time.Second, 3*time.Minute, 1.2, 6, diff --git a/hypergraph/hypergraph.go b/hypergraph/hypergraph.go index b711003..1facfb3 100644 --- a/hypergraph/hypergraph.go +++ b/hypergraph/hypergraph.go @@ -1,6 +1,7 @@ package hypergraph import ( + "context" "math/big" "sync" @@ -53,6 +54,8 @@ type HypergraphCRDT struct { // provides context-driven info for client identification authenticationProvider channel.AuthenticationProvider + + shutdownCtx context.Context } var _ hypergraph.Hypergraph = (*HypergraphCRDT)(nil) @@ -67,6 +70,7 @@ func NewHypergraph( prover crypto.InclusionProver, coveredPrefix []int, authenticationProvider channel.AuthenticationProvider, + maxSyncSessions int, ) *HypergraphCRDT { hg := &HypergraphCRDT{ logger: logger, @@ -79,7 +83,7 @@ func NewHypergraph( prover: prover, coveredPrefix: coveredPrefix, authenticationProvider: authenticationProvider, - syncController: hypergraph.NewSyncController(), + syncController: hypergraph.NewSyncController(maxSyncSessions), snapshotMgr: newSnapshotManager(logger), } @@ -116,6 +120,29 @@ func (hg *HypergraphCRDT) cloneSetWithStore( return set } +func (hg *HypergraphCRDT) SetShutdownContext(ctx context.Context) { + hg.shutdownCtx = ctx +} + +func (hg *HypergraphCRDT) contextWithShutdown( + parent context.Context, +) (context.Context, context.CancelFunc) { + if hg.shutdownCtx == nil { + return parent, func() {} + } + + ctx, cancel := context.WithCancel(parent) + go func() { + select { + case <-ctx.Done(): + case <-hg.shutdownCtx.Done(): + cancel() + } + }() + + return ctx, cancel +} + func (hg *HypergraphCRDT) snapshotSet( shardKey tries.ShardKey, targetStore tries.TreeBackingStore, diff --git a/hypergraph/sync.go b/hypergraph/sync.go index e73c8e1..52ae949 100644 --- a/hypergraph/sync.go +++ b/hypergraph/sync.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "fmt" "io" + "net" "slices" "strings" "sync" @@ -13,6 +14,7 @@ import ( "github.com/pkg/errors" "go.uber.org/zap" + "google.golang.org/grpc/peer" "source.quilibrium.com/quilibrium/monorepo/protobufs" "source.quilibrium.com/quilibrium/monorepo/types/hypergraph" "source.quilibrium.com/quilibrium/monorepo/types/tries" @@ -22,12 +24,20 @@ import ( func (hg *HypergraphCRDT) HyperStream( stream protobufs.HypergraphComparisonService_HyperStreamServer, ) error { - peerId, err := hg.authenticationProvider.Identify(stream.Context()) + requestCtx := stream.Context() + ctx, shutdownCancel := hg.contextWithShutdown(requestCtx) + defer shutdownCancel() + + peerId, err := hg.authenticationProvider.Identify(requestCtx) if err != nil { return errors.Wrap(err, "hyper stream") } peerKey := peerId.String() + sessionLogger := hg.logger.With(zap.String("peer_id", peerId.String())) + if addr := peerIPFromContext(requestCtx); addr != "" { + sessionLogger = sessionLogger.With(zap.String("peer_ip", addr)) + } if !hg.syncController.TryEstablishSyncSession(peerKey) { return errors.New("peer already syncing") } @@ -43,17 +53,17 @@ func (hg *HypergraphCRDT) HyperStream( root := handle.Root() if len(root) != 0 { - hg.logger.Debug( + sessionLogger.Debug( "acquired snapshot", zap.String("root", hex.EncodeToString(root)), ) } else { - hg.logger.Debug("acquired snapshot", zap.String("root", "")) + sessionLogger.Debug("acquired snapshot", zap.String("root", "")) } snapshotStore := handle.Store() - err = hg.syncTreeServer(stream, snapshotStore, root) + err = hg.syncTreeServer(ctx, stream, snapshotStore, root, sessionLogger) hg.syncController.SetStatus(peerKey, &hypergraph.SyncInfo{ Unreachable: false, @@ -208,9 +218,6 @@ func (hg *HypergraphCRDT) Sync( } }() - wg := sync.WaitGroup{} - wg.Add(1) - manager := &streamManager{ ctx: ctx, cancel: cancel, @@ -222,6 +229,8 @@ func (hg *HypergraphCRDT) Sync( lastSent: time.Now(), } + var wg sync.WaitGroup + wg.Add(1) go func() { defer wg.Done() err := manager.walk( @@ -481,11 +490,6 @@ func (s *streamManager) sendLeafData( }, } - // s.logger.Info( - // "sending leaf data", - // zap.String("key", hex.EncodeToString(leaf.Key)), - // ) - select { case <-s.ctx.Done(): return s.ctx.Err() @@ -526,7 +530,6 @@ func (s *streamManager) sendLeafData( } if node == nil { - // s.logger.Info("no node, sending 0 leaves") if err := s.stream.Send(&protobufs.HypergraphComparison{ Payload: &protobufs.HypergraphComparison_Metadata{ Metadata: &protobufs.HypersyncMetadata{Leaves: 0}, @@ -552,7 +555,6 @@ func (s *streamManager) sendLeafData( } count++ } - // s.logger.Info("sending set of leaves", zap.Uint64("leaf_count", count)) if err := s.stream.Send(&protobufs.HypergraphComparison{ Payload: &protobufs.HypergraphComparison_Metadata{ Metadata: &protobufs.HypersyncMetadata{Leaves: count}, @@ -571,7 +573,6 @@ func (s *streamManager) sendLeafData( } } else { count = 1 - // s.logger.Info("sending one leaf", zap.Uint64("leaf_count", count)) if err := s.stream.Send(&protobufs.HypergraphComparison{ Payload: &protobufs.HypergraphComparison_Metadata{ Metadata: &protobufs.HypersyncMetadata{Leaves: count}, @@ -738,7 +739,7 @@ func getBranchInfoFromTree( if branch, ok := node.(*tries.LazyVectorCommitmentBranchNode); ok { branchInfo.Commitment = branch.Commitment if len(branch.Commitment) == 0 { - panic("branch cannot have no commitment") + return nil, errors.New("invalid commitment") } for _, p := range branch.Prefix { @@ -771,7 +772,7 @@ func getBranchInfoFromTree( } if len(childCommit) == 0 { - panic("cannot have non-committed child") + return nil, errors.New("invalid commitment") } branchInfo.Children = append( branchInfo.Children, @@ -785,7 +786,7 @@ func getBranchInfoFromTree( } else if leaf, ok := node.(*tries.LazyVectorCommitmentLeafNode); ok { branchInfo.Commitment = leaf.Commitment if len(branchInfo.Commitment) == 0 { - panic("leaf cannot have no commitment") + return nil, errors.New("invalid commitment") } } return branchInfo, nil @@ -840,16 +841,14 @@ func isLeaf(info *protobufs.HypergraphComparisonResponse) bool { return len(info.Children) == 0 } -func queryNext( - ctx context.Context, +func (s *streamManager) queryNext( incomingResponses <-chan *protobufs.HypergraphComparisonResponse, - stream hypergraph.HyperStream, path []int32, ) ( *protobufs.HypergraphComparisonResponse, error, ) { - if err := stream.Send(&protobufs.HypergraphComparison{ + if err := s.stream.Send(&protobufs.HypergraphComparison{ Payload: &protobufs.HypergraphComparison_Query{ Query: &protobufs.HypergraphComparisonQuery{ Path: path, @@ -861,7 +860,7 @@ func queryNext( } select { - case <-ctx.Done(): + case <-s.ctx.Done(): return nil, errors.Wrap( errors.New("context canceled"), "handle query", @@ -883,12 +882,11 @@ func queryNext( } func (s *streamManager) handleLeafData( - ctx context.Context, incomingLeaves <-chan *protobufs.HypergraphComparison, ) error { expectedLeaves := uint64(0) select { - case <-ctx.Done(): + case <-s.ctx.Done(): return errors.Wrap( errors.New("context canceled"), "handle leaf data", @@ -940,7 +938,7 @@ func (s *streamManager) handleLeafData( } } select { - case <-ctx.Done(): + case <-s.ctx.Done(): return errors.Wrap( errors.New("context canceled"), "handle leaf data", @@ -1032,19 +1030,15 @@ func (s *streamManager) handleLeafData( return nil } -func handleQueryNext( - logger *zap.Logger, - ctx context.Context, +func (s *streamManager) handleQueryNext( incomingQueries <-chan *protobufs.HypergraphComparisonQuery, - stream hypergraph.HyperStream, - localTree *tries.LazyVectorCommitmentTree, path []int32, ) ( *protobufs.HypergraphComparisonResponse, error, ) { select { - case <-ctx.Done(): + case <-s.ctx.Done(): return nil, errors.Wrap( errors.New("context canceled"), "handle query next", @@ -1064,7 +1058,7 @@ func handleQueryNext( ) } - branchInfo, err := getBranchInfoFromTree(logger, localTree, path) + branchInfo, err := getBranchInfoFromTree(s.logger, s.localTree, path) if err != nil { return nil, errors.Wrap(err, "handle query next") } @@ -1075,7 +1069,7 @@ func handleQueryNext( }, } - if err := stream.Send(resp); err != nil { + if err := s.stream.Send(resp); err != nil { return nil, errors.Wrap(err, "handle query next") } @@ -1088,19 +1082,15 @@ func handleQueryNext( } } -func descendIndex( - logger *zap.Logger, - ctx context.Context, +func (s *streamManager) descendIndex( incomingResponses <-chan *protobufs.HypergraphComparisonResponse, - stream hypergraph.HyperStream, - localTree *tries.LazyVectorCommitmentTree, path []int32, ) ( *protobufs.HypergraphComparisonResponse, *protobufs.HypergraphComparisonResponse, error, ) { - branchInfo, err := getBranchInfoFromTree(logger, localTree, path) + branchInfo, err := getBranchInfoFromTree(s.logger, s.localTree, path) if err != nil { return nil, nil, errors.Wrap(err, "descend index") } @@ -1111,12 +1101,12 @@ func descendIndex( }, } - if err := stream.Send(resp); err != nil { + if err := s.stream.Send(resp); err != nil { return nil, nil, errors.Wrap(err, "descend index") } select { - case <-ctx.Done(): + case <-s.ctx.Done(): return nil, nil, errors.Wrap( errors.New("context canceled"), "handle query next", @@ -1172,7 +1162,7 @@ func (s *streamManager) walk( default: } - pathString := zap.String("path", hex.EncodeToString(packPath(path))) + // pathString := zap.String("path", hex.EncodeToString(packPath(path))) if bytes.Equal(lnode.Commitment, rnode.Commitment) { // s.logger.Debug( @@ -1196,7 +1186,7 @@ func (s *streamManager) walk( ) return errors.Wrap(err, "walk") } else { - err := s.handleLeafData(s.ctx, incomingLeaves) + err := s.handleLeafData(incomingLeaves) return errors.Wrap(err, "walk") } } @@ -1222,10 +1212,8 @@ func (s *streamManager) walk( // s.logger.Debug("sending query") traversePath = append(traversePath, child.Index) var err error - rtrav, err = queryNext( - s.ctx, + rtrav, err = s.queryNext( incomingResponses, - s.stream, traversePath, ) if err != nil { @@ -1246,7 +1234,7 @@ func (s *streamManager) walk( ) return errors.Wrap(err, "walk") } else { - err := s.handleLeafData(s.ctx, incomingLeaves) + err := s.handleLeafData(incomingLeaves) return errors.Wrap(err, "walk") } } @@ -1276,12 +1264,8 @@ func (s *streamManager) walk( traversedPath = append(traversedPath, nibble) var err error // s.logger.Debug("expecting query") - ltrav, err = handleQueryNext( - s.logger, - s.ctx, + ltrav, err = s.handleQueryNext( incomingQueries, - s.stream, - s.localTree, traversedPath, ) if err != nil { @@ -1298,7 +1282,7 @@ func (s *streamManager) walk( ) return errors.Wrap(err, "walk") } else { - err := s.handleLeafData(s.ctx, incomingLeaves) + err := s.handleLeafData(incomingLeaves) return errors.Wrap(err, "walk") } } @@ -1322,7 +1306,7 @@ func (s *streamManager) walk( return errors.Wrap(err, "walk") } } else { - err := s.handleLeafData(s.ctx, incomingLeaves) + err := s.handleLeafData(incomingLeaves) if err != nil { return errors.Wrap(err, "walk") } @@ -1367,7 +1351,7 @@ func (s *streamManager) walk( } if (lchild != nil && rchild == nil) || (lchild == nil && rchild != nil) { - s.logger.Info("branch divergence", pathString) + // s.logger.Info("branch divergence", pathString) if lchild != nil { nextPath := append( append([]int32{}, lpref...), @@ -1384,7 +1368,7 @@ func (s *streamManager) walk( } if rchild != nil { if !isServer { - err := s.handleLeafData(s.ctx, incomingLeaves) + err := s.handleLeafData(incomingLeaves) if err != nil { return errors.Wrap(err, "walk") } @@ -1396,12 +1380,8 @@ func (s *streamManager) walk( append([]int32{}, lpref...), lchild.Index, ) - lc, rc, err := descendIndex( - s.logger, - s.ctx, + lc, rc, err := s.descendIndex( incomingResponses, - s.stream, - s.localTree, nextPath, ) if err != nil { @@ -1414,7 +1394,7 @@ func (s *streamManager) walk( return errors.Wrap(err, "walk") } } else { - err := s.handleLeafData(s.ctx, incomingLeaves) + err := s.handleLeafData(incomingLeaves) if err != nil { return errors.Wrap(err, "walk") } @@ -1447,7 +1427,7 @@ func (s *streamManager) walk( return errors.Wrap(err, "walk") } } else { - err := s.handleLeafData(s.ctx, incomingLeaves) + err := s.handleLeafData(incomingLeaves) if err != nil { return errors.Wrap(err, "walk") } @@ -1462,17 +1442,23 @@ func (s *streamManager) walk( // server side. It sends the local root info, then processes incoming messages, // and queues further queries as differences are detected. func (hg *HypergraphCRDT) syncTreeServer( + ctx context.Context, stream protobufs.HypergraphComparisonService_HyperStreamServer, snapshotStore tries.TreeBackingStore, snapshotRoot []byte, + sessionLogger *zap.Logger, ) error { + logger := sessionLogger + if logger == nil { + logger = hg.logger + } if len(snapshotRoot) != 0 { - hg.logger.Info( + logger.Info( "syncing with snapshot", zap.String("root", hex.EncodeToString(snapshotRoot)), ) } else { - hg.logger.Info("syncing with snapshot", zap.String("root", "")) + logger.Info("syncing with snapshot", zap.String("root", "")) } msg, err := stream.Recv() @@ -1484,7 +1470,7 @@ func (hg *HypergraphCRDT) syncTreeServer( return errors.New("client did not send valid initialization message") } - hg.logger.Info("received initialization message") + logger.Info("received initialization message") if len(query.ShardKey) != 35 { return errors.New("invalid shard key") @@ -1501,7 +1487,7 @@ func (hg *HypergraphCRDT) syncTreeServer( } branchInfo, err := getBranchInfoFromTree( - hg.logger, + logger, idSet.GetTree(), query.Path, ) @@ -1537,7 +1523,7 @@ func (hg *HypergraphCRDT) syncTreeServer( ) } - ctx, cancel := context.WithCancel(stream.Context()) + ctx, cancel := context.WithCancel(ctx) incomingQueriesIn, incomingQueriesOut := UnboundedChan[*protobufs.HypergraphComparisonQuery]( @@ -1559,14 +1545,14 @@ func (hg *HypergraphCRDT) syncTreeServer( for { msg, err := stream.Recv() if err == io.EOF { - hg.logger.Info("server stream recv eof") + logger.Info("server stream recv eof") cancel() close(incomingQueriesIn) close(incomingResponsesIn) return } if err != nil { - hg.logger.Info("server stream recv error", zap.Error(err)) + logger.Info("server stream recv error", zap.Error(err)) cancel() close(incomingQueriesIn) close(incomingResponsesIn) @@ -1577,7 +1563,7 @@ func (hg *HypergraphCRDT) syncTreeServer( } switch m := msg.Payload.(type) { case *protobufs.HypergraphComparison_LeafData: - hg.logger.Warn("received leaf from client, terminating") + logger.Warn("received leaf from client, terminating") cancel() close(incomingQueriesIn) close(incomingResponsesIn) @@ -1592,18 +1578,17 @@ func (hg *HypergraphCRDT) syncTreeServer( } }() - wg := sync.WaitGroup{} - wg.Add(1) - manager := &streamManager{ ctx: ctx, cancel: cancel, - logger: hg.logger, + logger: logger, stream: stream, hypergraphStore: snapshotStore, localTree: idSet.GetTree(), lastSent: time.Now(), } + var wg sync.WaitGroup + wg.Add(1) go func() { defer wg.Done() err := manager.walk( @@ -1617,7 +1602,7 @@ func (hg *HypergraphCRDT) syncTreeServer( true, ) if err != nil { - hg.logger.Error("error while syncing", zap.Error(err)) + logger.Error("error while syncing", zap.Error(err)) } }() @@ -1657,3 +1642,18 @@ func UnboundedChan[T any]( }() return in, out } + +func peerIPFromContext(ctx context.Context) string { + if ctx == nil { + return "" + } + p, ok := peer.FromContext(ctx) + if !ok || p.Addr == nil { + return "" + } + host, _, err := net.SplitHostPort(p.Addr.String()) + if err != nil { + return p.Addr.String() + } + return host +} diff --git a/node/app/wire.go b/node/app/wire.go index 3d6a84d..6278076 100644 --- a/node/app/wire.go +++ b/node/app/wire.go @@ -149,8 +149,13 @@ var engineSet = wire.NewSet( func provideHypergraph( store *store.PebbleHypergraphStore, + config *config.Config, ) (thypergraph.Hypergraph, error) { - return store.LoadHypergraph(&tests.Nopthenticator{}) + workers := 1 + if config.Engine.ArchiveMode { + workers = 100 + } + return store.LoadHypergraph(&tests.Nopthenticator{}, workers) } var hypergraphSet = wire.NewSet( diff --git a/node/app/wire_gen.go b/node/app/wire_gen.go index 2c3bca6..4f89c2a 100644 --- a/node/app/wire_gen.go +++ b/node/app/wire_gen.go @@ -91,7 +91,7 @@ func NewDataWorkerNodeWithProxyPubsub(logger *zap.Logger, config2 *config.Config mpCitHVerifiableEncryptor := newVerifiableEncryptor() kzgInclusionProver := bls48581.NewKZGInclusionProver(logger) pebbleHypergraphStore := store2.NewPebbleHypergraphStore(dbConfig, pebbleDB, logger, mpCitHVerifiableEncryptor, kzgInclusionProver) - hypergraph, err := provideHypergraph(pebbleHypergraphStore) + hypergraph, err := provideHypergraph(pebbleHypergraphStore, config2) if err != nil { return nil, err } @@ -151,7 +151,7 @@ func NewDataWorkerNodeWithoutProxyPubsub(logger *zap.Logger, config2 *config.Con mpCitHVerifiableEncryptor := newVerifiableEncryptor() kzgInclusionProver := bls48581.NewKZGInclusionProver(logger) pebbleHypergraphStore := store2.NewPebbleHypergraphStore(dbConfig, pebbleDB, logger, mpCitHVerifiableEncryptor, kzgInclusionProver) - hypergraph, err := provideHypergraph(pebbleHypergraphStore) + hypergraph, err := provideHypergraph(pebbleHypergraphStore, config2) if err != nil { return nil, err } @@ -205,7 +205,7 @@ func NewMasterNode(logger *zap.Logger, config2 *config.Config, coreId uint) (*Ma mpCitHVerifiableEncryptor := newVerifiableEncryptor() kzgInclusionProver := bls48581.NewKZGInclusionProver(logger) pebbleHypergraphStore := store2.NewPebbleHypergraphStore(dbConfig, pebbleDB, logger, mpCitHVerifiableEncryptor, kzgInclusionProver) - hypergraph, err := provideHypergraph(pebbleHypergraphStore) + hypergraph, err := provideHypergraph(pebbleHypergraphStore, config2) if err != nil { return nil, err } @@ -301,9 +301,13 @@ var engineSet = wire.NewSet(vdf.NewCachedWesolowskiFrameProver, bls48581.NewKZGI ), ) -func provideHypergraph(store3 *store2.PebbleHypergraphStore, +func provideHypergraph(store3 *store2.PebbleHypergraphStore, config *config.Config, ) (hypergraph.Hypergraph, error) { - return store3.LoadHypergraph(&tests.Nopthenticator{}) + workers := 1 + if config.Engine.ArchiveMode { + workers = 100 + } + return store3.LoadHypergraph(&tests.Nopthenticator{}, workers) } var hypergraphSet = wire.NewSet( diff --git a/node/consensus/app/app_consensus_engine.go b/node/consensus/app/app_consensus_engine.go index d6756ce..f45950e 100644 --- a/node/consensus/app/app_consensus_engine.go +++ b/node/consensus/app/app_consensus_engine.go @@ -132,6 +132,7 @@ type AppConsensusEngine struct { ctx lifecycle.SignalerContext cancel context.CancelFunc quit chan struct{} + frameChainChecker *AppFrameChainChecker canRunStandalone bool blacklistMap map[string]bool currentRank uint64 @@ -264,7 +265,7 @@ func NewAppConsensusEngine( proofCache: make(map[uint64][516]byte), collectedMessages: []*protobufs.Message{}, provingMessages: []*protobufs.Message{}, - appMessageSpillover: make(map[uint64][]*protobufs.Message), + appMessageSpillover: make(map[uint64][]*protobufs.Message), consensusMessageQueue: make(chan *pb.Message, 1000), proverMessageQueue: make(chan *pb.Message, 1000), frameMessageQueue: make(chan *pb.Message, 100), @@ -279,6 +280,8 @@ func NewAppConsensusEngine( peerAuthCache: make(map[string]time.Time), } + engine.frameChainChecker = NewAppFrameChainChecker(clockStore, logger, appAddress) + keyId := "q-prover-key" key, err := keyManager.GetSigningKey(keyId) @@ -743,6 +746,13 @@ func NewAppConsensusEngine( }) engine.ComponentManager = componentBuilder.Build() + if hgWithShutdown, ok := engine.hyperSync.(interface { + SetShutdownContext(context.Context) + }); ok { + hgWithShutdown.SetShutdownContext( + contextFromShutdownSignal(engine.ShutdownSignal()), + ) + } return engine, nil } @@ -998,6 +1008,18 @@ func (e *AppConsensusEngine) materialize( return nil } +func contextFromShutdownSignal(sig <-chan struct{}) context.Context { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + if sig == nil { + return + } + <-sig + cancel() + }() + return ctx +} + func (e *AppConsensusEngine) getPeerID() PeerID { return PeerID{ID: e.getProverAddress()} } @@ -2699,7 +2721,7 @@ func (e *AppConsensusEngine) getPendingProposals( nextQC, err := e.clockStore.GetQuorumCertificate(e.appAddress, rank) if err != nil { e.logger.Debug("no qc for rank", zap.Error(err)) - continue + break } value, err := e.clockStore.GetStagedShardClockFrame( @@ -2710,8 +2732,7 @@ func (e *AppConsensusEngine) getPendingProposals( ) if err != nil { e.logger.Debug("no frame for qc", zap.Error(err)) - parent = nextQC - continue + break } var priorTCModel models.TimeoutCertificate = nil diff --git a/node/consensus/app/frame_chain_checker.go b/node/consensus/app/frame_chain_checker.go new file mode 100644 index 0000000..65d916f --- /dev/null +++ b/node/consensus/app/frame_chain_checker.go @@ -0,0 +1,144 @@ +package app + +import ( + "bytes" + "encoding/hex" + + "go.uber.org/zap" + "source.quilibrium.com/quilibrium/monorepo/consensus/models" + "source.quilibrium.com/quilibrium/monorepo/protobufs" + "source.quilibrium.com/quilibrium/monorepo/types/store" +) + +type AppFrameChainReader interface { + GetShardClockFrame( + filter []byte, + frameNumber uint64, + truncate bool, + ) (*protobufs.AppShardFrame, error) + GetStagedShardClockFrame( + filter []byte, + frameNumber uint64, + parentSelector []byte, + truncate bool, + ) (*protobufs.AppShardFrame, error) +} + +type AppFrameChainChecker struct { + store AppFrameChainReader + filter []byte + logger *zap.Logger +} + +func NewAppFrameChainChecker( + store store.ClockStore, + logger *zap.Logger, + filter []byte, +) *AppFrameChainChecker { + if store == nil { + return nil + } + if logger == nil { + logger = zap.NewNop() + } + return &AppFrameChainChecker{ + store: appFrameChainStoreAdapter{store: store}, + filter: filter, + logger: logger, + } +} + +func (c *AppFrameChainChecker) CanProcessSequentialChain( + finalized *models.State[*protobufs.AppShardFrame], + proposal *protobufs.AppShardProposal, +) bool { + if c == nil || c.store == nil || proposal == nil || + proposal.State == nil || proposal.State.Header == nil { + return false + } + + parentSelector := proposal.State.Header.ParentSelector + if len(parentSelector) == 0 { + return false + } + + frameNumber := proposal.State.Header.FrameNumber + if frameNumber == 0 { + return false + } + + for frameNumber > 0 && len(parentSelector) > 0 { + frameNumber-- + sealed, err := c.store.GetShardClockFrame(c.filter, frameNumber, false) + if err == nil && sealed != nil && + bytes.Equal([]byte(sealed.Identity()), parentSelector) { + c.logger.Debug( + "app frame chain linked to sealed frame", + zap.Uint64("sealed_frame_number", frameNumber), + ) + return true + } + + candidate, err := c.store.GetStagedShardClockFrame( + c.filter, + frameNumber, + parentSelector, + false, + ) + if err == nil && candidate != nil { + parentSelector = candidate.Header.ParentSelector + // keep walking + continue + } + + if finalized != nil && finalized.State != nil && + (*finalized.State).Header != nil && + frameNumber == (*finalized.State).Header.FrameNumber && + bytes.Equal([]byte(finalized.Identifier), parentSelector) { + c.logger.Debug( + "app frame chain linked to finalized frame", + zap.Uint64("finalized_frame_number", frameNumber), + ) + return true + } + + c.logger.Debug( + "missing app ancestor frame while validating chain", + zap.Uint64("missing_frame_number", frameNumber), + zap.String( + "expected_parent_selector", + hex.EncodeToString(parentSelector), + ), + ) + return false + } + + return false +} + +type appFrameChainStoreAdapter struct { + store store.ClockStore +} + +func (a appFrameChainStoreAdapter) GetShardClockFrame( + filter []byte, + frameNumber uint64, + truncate bool, +) (*protobufs.AppShardFrame, error) { + frame, _, err := a.store.GetShardClockFrame(filter, frameNumber, truncate) + return frame, err +} + +func (a appFrameChainStoreAdapter) GetStagedShardClockFrame( + filter []byte, + frameNumber uint64, + parentSelector []byte, + truncate bool, +) (*protobufs.AppShardFrame, error) { + return a.store.GetStagedShardClockFrame( + filter, + frameNumber, + parentSelector, + truncate, + ) +} diff --git a/node/consensus/app/message_processors.go b/node/consensus/app/message_processors.go index 67069a6..42a20ba 100644 --- a/node/consensus/app/message_processors.go +++ b/node/consensus/app/message_processors.go @@ -258,6 +258,19 @@ func (e *AppConsensusEngine) handleAppShardProposal( return } } + if e.frameChainChecker != nil && + e.frameChainChecker.CanProcessSequentialChain(finalized, proposal) { + e.deleteCachedProposal(frameNumber) + if e.processProposal(proposal) { + e.drainProposalCache(frameNumber + 1) + return + } + + e.logger.Debug("failed to process sequential proposal, caching") + e.cacheProposal(proposal) + return + } + expectedFrame, _, err := e.clockStore.GetLatestShardClockFrame(e.appAddress) if err != nil { e.logger.Error("could not obtain app time reel head", zap.Error(err)) diff --git a/node/consensus/app/services.go b/node/consensus/app/services.go index b46a342..1423061 100644 --- a/node/consensus/app/services.go +++ b/node/consensus/app/services.go @@ -14,6 +14,7 @@ import ( "google.golang.org/grpc/status" qgrpc "source.quilibrium.com/quilibrium/monorepo/node/internal/grpc" "source.quilibrium.com/quilibrium/monorepo/protobufs" + "source.quilibrium.com/quilibrium/monorepo/types/store" ) func (e *AppConsensusEngine) GetAppShardFrame( @@ -99,10 +100,10 @@ func (e *AppConsensusEngine) GetAppShardProposal( zap.Uint64("frame_number", request.FrameNumber), zap.String("peer_id", peerID.String()), ) - frame, _, err := e.clockStore.GetShardClockFrame( + frame, err := e.loadAppFrameMatchingSelector( request.Filter, request.FrameNumber, - false, + nil, ) if err != nil { return &protobufs.AppShardProposalResponse{}, nil @@ -117,10 +118,10 @@ func (e *AppConsensusEngine) GetAppShardProposal( return &protobufs.AppShardProposalResponse{}, nil } - parent, _, err := e.clockStore.GetShardClockFrame( + parent, err := e.loadAppFrameMatchingSelector( request.Filter, request.FrameNumber-1, - false, + frame.Header.ParentSelector, ) if err != nil { e.logger.Debug( @@ -168,6 +169,53 @@ func (e *AppConsensusEngine) RegisterServices(server *grpc.Server) { protobufs.RegisterOnionServiceServer(server, e.onionService) } +func (e *AppConsensusEngine) loadAppFrameMatchingSelector( + filter []byte, + frameNumber uint64, + expectedSelector []byte, +) (*protobufs.AppShardFrame, error) { + matchesSelector := func(frame *protobufs.AppShardFrame) bool { + if frame == nil || frame.Header == nil || len(expectedSelector) == 0 { + return true + } + return bytes.Equal(frame.Header.ParentSelector, expectedSelector) + } + + frame, _, err := e.clockStore.GetShardClockFrame(filter, frameNumber, false) + if err == nil && matchesSelector(frame) { + return frame, nil + } + + iter, iterErr := e.clockStore.RangeStagedShardClockFrames( + filter, + frameNumber, + frameNumber, + ) + if iterErr != nil { + if err != nil { + return nil, err + } + return nil, iterErr + } + defer iter.Close() + + for ok := iter.First(); ok && iter.Valid(); ok = iter.Next() { + candidate, valErr := iter.Value() + if valErr != nil { + return nil, valErr + } + if matchesSelector(candidate) { + return candidate, nil + } + } + + if err == nil && matchesSelector(frame) { + return frame, nil + } + + return nil, store.ErrNotFound +} + func (e *AppConsensusEngine) authenticateProverFromContext( ctx context.Context, ) (peer.ID, error) { diff --git a/node/consensus/global/consensus_dynamic_committee.go b/node/consensus/global/consensus_dynamic_committee.go index 8ed1337..19da9bd 100644 --- a/node/consensus/global/consensus_dynamic_committee.go +++ b/node/consensus/global/consensus_dynamic_committee.go @@ -141,7 +141,7 @@ func (e *GlobalConsensusEngine) QuorumThresholdForRank( total += p.Seniority } - return (total * 2) / 3, nil + return (total * 4) / 6, nil } // Self implements consensus.DynamicCommittee. @@ -163,7 +163,7 @@ func (e *GlobalConsensusEngine) TimeoutThresholdForRank( total += p.Seniority } - return (total * 2) / 3, nil + return (total * 4) / 6, nil } func internalProversToWeightedIdentity( diff --git a/node/consensus/global/consensus_leader_provider.go b/node/consensus/global/consensus_leader_provider.go index 259547b..67e7b69 100644 --- a/node/consensus/global/consensus_leader_provider.go +++ b/node/consensus/global/consensus_leader_provider.go @@ -71,7 +71,18 @@ func (p *GlobalLeaderProvider) ProveNextState( filter []byte, priorState models.Identity, ) (**protobufs.GlobalFrame, error) { - prior, err := p.engine.clockStore.GetLatestGlobalClockFrame() + latestQC, qcErr := p.engine.clockStore.GetLatestQuorumCertificate(nil) + if qcErr != nil { + p.engine.logger.Debug( + "could not fetch latest quorum certificate", + zap.Error(qcErr), + ) + } + + prior, err := p.engine.clockStore.GetGlobalClockFrameCandidate( + latestQC.FrameNumber, + []byte(priorState), + ) if err != nil { frameProvingTotal.WithLabelValues("error").Inc() return nil, models.NewNoVoteErrorf("could not collect: %+w", err) @@ -82,14 +93,6 @@ func (p *GlobalLeaderProvider) ProveNextState( return nil, models.NewNoVoteErrorf("missing prior frame") } - latestQC, qcErr := p.engine.clockStore.GetLatestQuorumCertificate(nil) - if qcErr != nil { - p.engine.logger.Debug( - "could not fetch latest quorum certificate", - zap.Error(qcErr), - ) - } - if prior.Identity() != priorState { frameProvingTotal.WithLabelValues("error").Inc() diff --git a/node/consensus/global/consensus_voting_provider.go b/node/consensus/global/consensus_voting_provider.go index fe8e161..644bc37 100644 --- a/node/consensus/global/consensus_voting_provider.go +++ b/node/consensus/global/consensus_voting_provider.go @@ -25,22 +25,6 @@ func (p *GlobalVotingProvider) FinalizeQuorumCertificate( state *models.State[*protobufs.GlobalFrame], aggregatedSignature models.AggregatedSignature, ) (models.QuorumCertificate, error) { - cloned := (*state.State).Clone().(*protobufs.GlobalFrame) - cloned.Header.PublicKeySignatureBls48581 = - &protobufs.BLS48581AggregateSignature{ - Signature: aggregatedSignature.GetSignature(), - PublicKey: &protobufs.BLS48581G2PublicKey{ - KeyValue: aggregatedSignature.GetPubKey(), - }, - Bitmask: aggregatedSignature.GetBitmask(), - } - frameBytes, err := cloned.ToCanonicalBytes() - if err != nil { - return nil, errors.Wrap(err, "finalize quorum certificate") - } - - p.engine.pubsub.PublishToBitmask(GLOBAL_FRAME_BITMASK, frameBytes) - return &protobufs.QuorumCertificate{ Rank: (*state.State).GetRank(), FrameNumber: (*state.State).Header.FrameNumber, diff --git a/node/consensus/global/frame_chain_checker.go b/node/consensus/global/frame_chain_checker.go new file mode 100644 index 0000000..df2879a --- /dev/null +++ b/node/consensus/global/frame_chain_checker.go @@ -0,0 +1,135 @@ +package global + +import ( + "bytes" + "encoding/hex" + "errors" + + "go.uber.org/zap" + "source.quilibrium.com/quilibrium/monorepo/consensus/models" + "source.quilibrium.com/quilibrium/monorepo/protobufs" + "source.quilibrium.com/quilibrium/monorepo/types/store" +) + +// FrameChainReader captures the minimal subset of clock store functionality +// required for sequential frame verification. +type FrameChainReader interface { + GetGlobalClockFrame(uint64) (*protobufs.GlobalFrame, error) + GetGlobalClockFrameCandidate(uint64, []byte) (*protobufs.GlobalFrame, error) +} + +// FrameChainChecker verifies whether a proposal's parent chain can be linked +// through stored frame candidates or sealed frames. +type FrameChainChecker struct { + store FrameChainReader + logger *zap.Logger +} + +// NewFrameChainChecker creates a new FrameChainChecker. +func NewFrameChainChecker( + store FrameChainReader, + logger *zap.Logger, +) *FrameChainChecker { + if store == nil { + return nil + } + if logger == nil { + logger = zap.NewNop() + } + return &FrameChainChecker{store: store, logger: logger} +} + +// CanProcessSequentialChain returns true if the proposal's ancestors can be +// chained back to an existing sealed frame or the finalized state. +func (c *FrameChainChecker) CanProcessSequentialChain( + finalized *models.State[*protobufs.GlobalFrame], + proposal *protobufs.GlobalProposal, +) bool { + if c == nil || c.store == nil || proposal == nil || + proposal.State == nil || proposal.State.Header == nil { + return false + } + + parentSelector := proposal.State.Header.ParentSelector + if len(parentSelector) == 0 { + return false + } + + frameNumber := proposal.State.Header.FrameNumber + if frameNumber == 0 { + return false + } + + for frameNumber > 0 && len(parentSelector) > 0 { + frameNumber-- + + if sealed, err := c.store.GetGlobalClockFrame(frameNumber); err == nil && + sealed != nil && + bytes.Equal([]byte(sealed.Identity()), parentSelector) { + c.logger.Debug( + "frame chain linked to sealed frame", + zap.Uint64("sealed_frame_number", frameNumber), + ) + return true + } else if err != nil && !errors.Is(err, store.ErrNotFound) { + c.logger.Warn( + "failed to read sealed frame during chain validation", + zap.Uint64("frame_number", frameNumber), + zap.Error(err), + ) + return false + } + + candidate, err := c.store.GetGlobalClockFrameCandidate( + frameNumber, + parentSelector, + ) + if err == nil && candidate != nil { + if candidate.Header == nil || + candidate.Header.FrameNumber != frameNumber { + c.logger.Debug( + "candidate frame had mismatched header", + zap.Uint64("frame_number", frameNumber), + ) + return false + } + c.logger.Debug( + "frame chain matched candidate", + zap.Uint64("candidate_frame_number", frameNumber), + ) + parentSelector = candidate.Header.ParentSelector + continue + } + if err != nil && !errors.Is(err, store.ErrNotFound) { + c.logger.Warn( + "failed to read candidate frame during chain validation", + zap.Uint64("frame_number", frameNumber), + zap.Error(err), + ) + return false + } + + if finalized != nil && finalized.State != nil && + (*finalized.State).Header != nil && + frameNumber == (*finalized.State).Header.FrameNumber && + bytes.Equal([]byte(finalized.Identifier), parentSelector) { + c.logger.Debug( + "frame chain linked to finalized frame", + zap.Uint64("finalized_frame_number", frameNumber), + ) + return true + } + + c.logger.Debug( + "missing ancestor frame while validating chain", + zap.Uint64("missing_frame_number", frameNumber), + zap.String( + "expected_parent_selector", + hex.EncodeToString(parentSelector), + ), + ) + return false + } + + return false +} diff --git a/node/consensus/global/frame_chain_checker_test.go b/node/consensus/global/frame_chain_checker_test.go new file mode 100644 index 0000000..f97bf58 --- /dev/null +++ b/node/consensus/global/frame_chain_checker_test.go @@ -0,0 +1,180 @@ +package global + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "source.quilibrium.com/quilibrium/monorepo/consensus/models" + "source.quilibrium.com/quilibrium/monorepo/protobufs" + "source.quilibrium.com/quilibrium/monorepo/types/store" +) + +func TestFrameChainChecker_CanProcessSequentialChain(t *testing.T) { + store := newMockFrameChainStore() + checker := NewFrameChainChecker(store, zap.NewNop()) + + finalized := newTestFrame(10, nil) + store.addSealed(finalized) + finalizedState := &models.State[*protobufs.GlobalFrame]{ + Rank: finalized.Header.Rank, + Identifier: finalized.Identity(), + State: &finalized, + } + + candidate := newTestFrame( + 11, + []byte(finalized.Identity()), + ) + store.addCandidate(candidate) + + proposalFrame := newTestFrame( + 12, + []byte(candidate.Identity()), + ) + proposal := &protobufs.GlobalProposal{ + State: proposalFrame, + } + + require.True( + t, + checker.CanProcessSequentialChain(finalizedState, proposal), + ) +} + +func TestFrameChainChecker_CanProcessSequentialChainMultipleCandidates( + t *testing.T, +) { + store := newMockFrameChainStore() + checker := NewFrameChainChecker(store, zap.NewNop()) + + finalized := newTestFrame(20, nil) + store.addSealed(finalized) + finalizedState := &models.State[*protobufs.GlobalFrame]{ + Rank: finalized.Header.Rank, + Identifier: finalized.Identity(), + State: &finalized, + } + + candidate21 := newTestFrame( + 21, + []byte(finalized.Identity()), + ) + store.addCandidate(candidate21) + + candidate22 := newTestFrame( + 22, + []byte(candidate21.Identity()), + ) + store.addCandidate(candidate22) + + proposal := &protobufs.GlobalProposal{ + State: newTestFrame( + 23, + []byte(candidate22.Identity()), + ), + } + + require.True( + t, + checker.CanProcessSequentialChain(finalizedState, proposal), + ) +} + +func TestFrameChainChecker_CanProcessSequentialChainMissingParent( + t *testing.T, +) { + store := newMockFrameChainStore() + checker := NewFrameChainChecker(store, zap.NewNop()) + + finalized := newTestFrame(5, nil) + store.addSealed(finalized) + finalizedState := &models.State[*protobufs.GlobalFrame]{ + Rank: finalized.Header.Rank, + Identifier: finalized.Identity(), + State: &finalized, + } + + // Proposal references a parent that does not exist + proposal := &protobufs.GlobalProposal{ + State: newTestFrame( + 6, + []byte("missing-parent"), + ), + } + + require.False( + t, + checker.CanProcessSequentialChain(finalizedState, proposal), + ) +} + +type mockFrameChainStore struct { + sealed map[uint64]*protobufs.GlobalFrame + candidates map[uint64]map[string]*protobufs.GlobalFrame +} + +func newMockFrameChainStore() *mockFrameChainStore { + return &mockFrameChainStore{ + sealed: make(map[uint64]*protobufs.GlobalFrame), + candidates: make(map[uint64]map[string]*protobufs.GlobalFrame), + } +} + +func (m *mockFrameChainStore) addSealed(frame *protobufs.GlobalFrame) { + if frame == nil || frame.Header == nil { + return + } + m.sealed[frame.Header.FrameNumber] = frame +} + +func (m *mockFrameChainStore) addCandidate(frame *protobufs.GlobalFrame) { + if frame == nil || frame.Header == nil { + return + } + key := frame.Header.FrameNumber + if _, ok := m.candidates[key]; !ok { + m.candidates[key] = make(map[string]*protobufs.GlobalFrame) + } + m.candidates[key][string(frame.Identity())] = frame +} + +func (m *mockFrameChainStore) GetGlobalClockFrame( + frameNumber uint64, +) (*protobufs.GlobalFrame, error) { + frame, ok := m.sealed[frameNumber] + if !ok { + return nil, store.ErrNotFound + } + return frame, nil +} + +func (m *mockFrameChainStore) GetGlobalClockFrameCandidate( + frameNumber uint64, + selector []byte, +) (*protobufs.GlobalFrame, error) { + candidates := m.candidates[frameNumber] + if candidates == nil { + return nil, store.ErrNotFound + } + frame, ok := candidates[string(selector)] + if !ok { + return nil, store.ErrNotFound + } + return frame, nil +} + +func newTestFrame( + frameNumber uint64, + parentSelector []byte, +) *protobufs.GlobalFrame { + header := &protobufs.GlobalFrameHeader{ + FrameNumber: frameNumber, + ParentSelector: parentSelector, + Output: []byte{byte(frameNumber)}, + Rank: frameNumber, + } + return &protobufs.GlobalFrame{ + Header: header, + } +} diff --git a/node/consensus/global/global_consensus_engine.go b/node/consensus/global/global_consensus_engine.go index 3d0e8ca..bf80100 100644 --- a/node/consensus/global/global_consensus_engine.go +++ b/node/consensus/global/global_consensus_engine.go @@ -96,6 +96,18 @@ type LockedTransaction struct { Filled bool } +func contextFromShutdownSignal(sig <-chan struct{}) context.Context { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + if sig == nil { + return + } + <-sig + cancel() + }() + return ctx +} + // GlobalConsensusEngine uses the generic state machine for consensus type GlobalConsensusEngine struct { *lifecycle.ComponentManager @@ -133,6 +145,7 @@ type GlobalConsensusEngine struct { peerInfoManager tp2p.PeerInfoManager workerManager worker.WorkerManager proposer *provers.Manager + frameChainChecker *FrameChainChecker currentRank uint64 alertPublicKey []byte hasSentKeyBundle bool @@ -319,6 +332,7 @@ func NewGlobalConsensusEngine( appShardCache: make(map[string]*appShardCacheEntry), globalMessageSpillover: make(map[uint64][][]byte), } + engine.frameChainChecker = NewFrameChainChecker(clockStore, logger) if err := engine.initGlobalMessageAggregator(); err != nil { return nil, err @@ -934,6 +948,13 @@ func NewGlobalConsensusEngine( } engine.ComponentManager = componentBuilder.Build() + if hgWithShutdown, ok := engine.hyperSync.(interface { + SetShutdownContext(context.Context) + }); ok { + hgWithShutdown.SetShutdownContext( + contextFromShutdownSignal(engine.ShutdownSignal()), + ) + } return engine, nil } @@ -1595,8 +1616,8 @@ func (e *GlobalConsensusEngine) materialize( return errors.Wrap(err, "materialize") } - if e.verifyProverRoot(frameNumber, expectedProverRoot, proposer) { - if !e.config.Engine.ArchiveMode || e.config.P2P.Network == 99 { + if !e.config.Engine.ArchiveMode || e.config.P2P.Network == 99 { + if e.verifyProverRoot(frameNumber, expectedProverRoot, proposer) { e.reconcileLocalWorkerAllocations() } } @@ -3217,7 +3238,7 @@ func (e *GlobalConsensusEngine) DecideWorkerJoins( msg, ) if err != nil { - e.logger.Error("could not construct join", zap.Error(err)) + e.logger.Error("could not construct join decisions", zap.Error(err)) return errors.Wrap(err, "decide worker joins") } @@ -3538,7 +3559,7 @@ func (e *GlobalConsensusEngine) OnQuorumCertificateTriggeredRankChange( newRank uint64, qc models.QuorumCertificate, ) { - e.logger.Debug("adding certified state", zap.Uint64("rank", newRank-1)) + e.logger.Debug("processing certified state", zap.Uint64("rank", newRank-1)) parentQC, err := e.clockStore.GetLatestQuorumCertificate(nil) if err != nil { @@ -3613,6 +3634,20 @@ func (e *GlobalConsensusEngine) OnQuorumCertificateTriggeredRankChange( return } + cloned := frame.Clone().(*protobufs.GlobalFrame) + cloned.Header.PublicKeySignatureBls48581 = + &protobufs.BLS48581AggregateSignature{ + Signature: qc.GetAggregatedSignature().GetSignature(), + PublicKey: &protobufs.BLS48581G2PublicKey{ + KeyValue: qc.GetAggregatedSignature().GetPubKey(), + }, + Bitmask: qc.GetAggregatedSignature().GetBitmask(), + } + frameBytes, err := cloned.ToCanonicalBytes() + if err == nil { + e.pubsub.PublishToBitmask(GLOBAL_FRAME_BITMASK, frameBytes) + } + if !bytes.Equal(frame.Header.ParentSelector, parentQC.Selector) { e.logger.Error( "quorum certificate does not match frame parent", @@ -3650,73 +3685,14 @@ func (e *GlobalConsensusEngine) OnQuorumCertificateTriggeredRankChange( frame.Header.PublicKeySignatureBls48581 = aggregateSig - latest, err := e.clockStore.GetLatestGlobalClockFrame() - if err != nil { - e.logger.Error("could not obtain latest frame", zap.Error(err)) - return + proposal := &protobufs.GlobalProposal{ + State: frame, + ParentQuorumCertificate: parentQC, + PriorRankTimeoutCertificate: priorRankTC, + Vote: vote, } - if latest.Header.FrameNumber+1 != frame.Header.FrameNumber || - !bytes.Equal([]byte(latest.Identity()), frame.Header.ParentSelector) { - e.logger.Debug( - "not next frame, cannot advance", - zap.Uint64("latest_frame_number", latest.Header.FrameNumber), - zap.Uint64("new_frame_number", frame.Header.FrameNumber), - zap.String( - "latest_frame_selector", - hex.EncodeToString([]byte(latest.Identity())), - ), - zap.String( - "new_frame_number", - hex.EncodeToString(frame.Header.ParentSelector), - ), - ) - return - } - - txn, err = e.clockStore.NewTransaction(false) - if err != nil { - e.logger.Error("could not create transaction", zap.Error(err)) - return - } - - if err := e.materialize( - txn, - frame, - ); err != nil { - _ = txn.Abort() - e.logger.Error("could not materialize frame requests", zap.Error(err)) - return - } - if err := e.clockStore.PutGlobalClockFrame(frame, txn); err != nil { - _ = txn.Abort() - e.logger.Error("could not put global frame", zap.Error(err)) - return - } - - if err := e.clockStore.PutCertifiedGlobalState( - &protobufs.GlobalProposal{ - State: frame, - ParentQuorumCertificate: parentQC, - PriorRankTimeoutCertificate: priorRankTC, - Vote: vote, - }, - txn, - ); err != nil { - e.logger.Error("could not insert certified state", zap.Error(err)) - txn.Abort() - return - } - - if err := txn.Commit(); err != nil { - e.logger.Error("could not commit transaction", zap.Error(err)) - txn.Abort() - } - - if err := e.checkShardCoverage(frame.GetFrameNumber()); err != nil { - e.logger.Error("could not check shard coverage", zap.Error(err)) - return - } + e.globalProposalQueue <- proposal } // OnRankChange implements consensus.Consumer. @@ -4050,7 +4026,7 @@ func (e *GlobalConsensusEngine) getPendingProposals( nextQC, err := e.clockStore.GetQuorumCertificate(nil, rank) if err != nil { e.logger.Debug("no qc for rank", zap.Error(err)) - continue + break } value, err := e.clockStore.GetGlobalClockFrameCandidate( @@ -4059,8 +4035,7 @@ func (e *GlobalConsensusEngine) getPendingProposals( ) if err != nil { e.logger.Debug("no frame for qc", zap.Error(err)) - parent = nextQC - continue + break } var priorTCModel models.TimeoutCertificate = nil diff --git a/node/consensus/global/message_collector.go b/node/consensus/global/message_collector.go index 2c7f3c4..8fa3539 100644 --- a/node/consensus/global/message_collector.go +++ b/node/consensus/global/message_collector.go @@ -245,6 +245,8 @@ func (e *GlobalConsensusEngine) addGlobalMessage(data []byte) { bundle.Requests = bundle.Requests[:maxGlobalMessagesPerFrame] } + e.logBundleRequestTypes(bundle) + encoded, err := bundle.ToCanonicalBytes() if err != nil { if e.logger != nil { @@ -263,6 +265,132 @@ func (e *GlobalConsensusEngine) addGlobalMessage(data []byte) { e.messageAggregator.Add(record) } +func (e *GlobalConsensusEngine) logBundleRequestTypes( + bundle *protobufs.MessageBundle, +) { + requestTypes := make([]string, 0, len(bundle.Requests)) + detailFields := make([]zap.Field, 0) + for idx, request := range bundle.Requests { + typeName, detailField, hasDetail := requestTypeNameAndDetail(idx, request) + requestTypes = append(requestTypes, typeName) + if hasDetail { + detailFields = append(detailFields, detailField) + } + } + + fields := []zap.Field{ + zap.Int("request_count", len(bundle.Requests)), + zap.Strings("request_types", requestTypes), + zap.Int64("bundle_timestamp", bundle.Timestamp), + } + fields = append(fields, detailFields...) + + e.logger.Debug("collected global request bundle", fields...) +} + +func requestTypeNameAndDetail( + idx int, + req *protobufs.MessageRequest, +) (string, zap.Field, bool) { + if req == nil || req.GetRequest() == nil { + return "nil_request", zap.Field{}, false + } + + switch actual := req.GetRequest().(type) { + case *protobufs.MessageRequest_Join: + return "ProverJoin", zap.Field{}, false + case *protobufs.MessageRequest_Leave: + return "ProverLeave", zap.Field{}, false + case *protobufs.MessageRequest_Pause: + return "ProverPause", zap.Field{}, false + case *protobufs.MessageRequest_Resume: + return "ProverResume", zap.Field{}, false + case *protobufs.MessageRequest_Confirm: + return "ProverConfirm", zap.Field{}, false + case *protobufs.MessageRequest_Reject: + return "ProverReject", zap.Field{}, false + case *protobufs.MessageRequest_Kick: + return "ProverKick", zap.Field{}, false + case *protobufs.MessageRequest_Update: + return "ProverUpdate", + zap.Any(fmt.Sprintf("request_%d_prover_update", idx), actual.Update), + true + case *protobufs.MessageRequest_TokenDeploy: + return "TokenDeploy", + zap.Any(fmt.Sprintf("request_%d_token_deploy", idx), actual.TokenDeploy), + true + case *protobufs.MessageRequest_TokenUpdate: + return "TokenUpdate", + zap.Any(fmt.Sprintf("request_%d_token_update", idx), actual.TokenUpdate), + true + case *protobufs.MessageRequest_Transaction: + return "Transaction", + zap.Any(fmt.Sprintf("request_%d_transaction", idx), actual.Transaction), + true + case *protobufs.MessageRequest_PendingTransaction: + return "PendingTransaction", + zap.Any( + fmt.Sprintf("request_%d_pending_transaction", idx), + actual.PendingTransaction, + ), + true + case *protobufs.MessageRequest_MintTransaction: + return "MintTransaction", + zap.Any(fmt.Sprintf("request_%d_mint_transaction", idx), actual.MintTransaction), + true + case *protobufs.MessageRequest_HypergraphDeploy: + return "HypergraphDeploy", + zap.Any(fmt.Sprintf("request_%d_hypergraph_deploy", idx), actual.HypergraphDeploy), + true + case *protobufs.MessageRequest_HypergraphUpdate: + return "HypergraphUpdate", + zap.Any(fmt.Sprintf("request_%d_hypergraph_update", idx), actual.HypergraphUpdate), + true + case *protobufs.MessageRequest_VertexAdd: + return "VertexAdd", + zap.Any(fmt.Sprintf("request_%d_vertex_add", idx), actual.VertexAdd), + true + case *protobufs.MessageRequest_VertexRemove: + return "VertexRemove", + zap.Any(fmt.Sprintf("request_%d_vertex_remove", idx), actual.VertexRemove), + true + case *protobufs.MessageRequest_HyperedgeAdd: + return "HyperedgeAdd", + zap.Any(fmt.Sprintf("request_%d_hyperedge_add", idx), actual.HyperedgeAdd), + true + case *protobufs.MessageRequest_HyperedgeRemove: + return "HyperedgeRemove", + zap.Any(fmt.Sprintf("request_%d_hyperedge_remove", idx), actual.HyperedgeRemove), + true + case *protobufs.MessageRequest_ComputeDeploy: + return "ComputeDeploy", + zap.Any(fmt.Sprintf("request_%d_compute_deploy", idx), actual.ComputeDeploy), + true + case *protobufs.MessageRequest_ComputeUpdate: + return "ComputeUpdate", + zap.Any(fmt.Sprintf("request_%d_compute_update", idx), actual.ComputeUpdate), + true + case *protobufs.MessageRequest_CodeDeploy: + return "CodeDeploy", + zap.Any(fmt.Sprintf("request_%d_code_deploy", idx), actual.CodeDeploy), + true + case *protobufs.MessageRequest_CodeExecute: + return "CodeExecute", + zap.Any(fmt.Sprintf("request_%d_code_execute", idx), actual.CodeExecute), + true + case *protobufs.MessageRequest_CodeFinalize: + return "CodeFinalize", + zap.Any(fmt.Sprintf("request_%d_code_finalize", idx), actual.CodeFinalize), + true + case *protobufs.MessageRequest_Shard: + return "ShardFrame", + zap.Any(fmt.Sprintf("request_%d_shard_frame", idx), actual.Shard), + true + default: + return "unknown_request", zap.Field{}, false + } +} + func (e *GlobalConsensusEngine) getMessageCollector( rank uint64, ) (keyedaggregator.Collector[sequencedGlobalMessage], bool, error) { diff --git a/node/consensus/global/message_processors.go b/node/consensus/global/message_processors.go index f9d3015..5e0b38d 100644 --- a/node/consensus/global/message_processors.go +++ b/node/consensus/global/message_processors.go @@ -954,9 +954,14 @@ func (e *GlobalConsensusEngine) handleGlobalProposal( finalizedFrameNumber := (*finalized.State).Header.FrameNumber frameNumber := proposal.State.Header.FrameNumber - // drop proposals if we already processed them + // drop proposals if we already processed them, unless we need to + // rehydrate the finalized frame in persistence if frameNumber <= finalizedFrameNumber || proposal.State.Header.Rank <= finalizedRank { + if e.tryRecoverFinalizedFrame(proposal, finalized) { + return + } + e.logger.Debug( "dropping stale (lower than finalized) proposal", zap.Uint64("finalized_rank", finalizedRank), @@ -988,13 +993,11 @@ func (e *GlobalConsensusEngine) handleGlobalProposal( // if we have a parent, cache and move on if proposal.State.Header.FrameNumber != 0 { // also check with persistence layer - parent, err := e.clockStore.GetGlobalClockFrame( - proposal.State.Header.FrameNumber - 1, - ) - if err != nil || !bytes.Equal( - []byte(parent.Identity()), + _, err := e.clockStore.GetGlobalClockFrameCandidate( + proposal.State.Header.FrameNumber-1, proposal.State.Header.ParentSelector, - ) { + ) + if err != nil { e.logger.Debug( "parent frame not stored, requesting sync", zap.Uint64("rank", proposal.GetRank()), @@ -1022,37 +1025,68 @@ func (e *GlobalConsensusEngine) handleGlobalProposal( } } - expectedFrame, err := e.clockStore.GetLatestGlobalClockFrame() - if err != nil { - e.logger.Error("could not obtain latest global frame", zap.Error(err)) - return - } - - expectedFrameNumber := expectedFrame.Header.FrameNumber + 1 - - if frameNumber < expectedFrameNumber { - e.logger.Debug( - "dropping proposal behind expected frame", - zap.Uint64("frame_number", frameNumber), - zap.Uint64("expected_frame_number", expectedFrameNumber), - ) - return - } - - if frameNumber == expectedFrameNumber { + if e.frameChainChecker != nil && + e.frameChainChecker.CanProcessSequentialChain(finalized, proposal) { e.deleteCachedProposal(frameNumber) if e.processProposal(proposal) { e.drainProposalCache(frameNumber + 1) return } - e.logger.Debug("failed to process expected proposal, caching") + e.logger.Debug("failed to process sequential proposal, caching") e.cacheProposal(proposal) return } e.cacheProposal(proposal) - e.drainProposalCache(expectedFrameNumber) +} + +func (e *GlobalConsensusEngine) tryRecoverFinalizedFrame( + proposal *protobufs.GlobalProposal, + finalized *models.State[*protobufs.GlobalFrame], +) bool { + if proposal == nil || + proposal.State == nil || + proposal.State.Header == nil || + finalized == nil || + finalized.State == nil || + (*finalized.State).Header == nil { + return false + } + + frameNumber := proposal.State.Header.FrameNumber + finalizedFrameNumber := (*finalized.State).Header.FrameNumber + if frameNumber != finalizedFrameNumber { + return false + } + + if !bytes.Equal( + []byte(finalized.Identifier), + []byte(proposal.State.Identity()), + ) { + e.logger.Warn( + "received conflicting finalized frame during sync", + zap.Uint64("finalized_frame_number", finalizedFrameNumber), + zap.String( + "expected", + hex.EncodeToString([]byte(finalized.Identifier)), + ), + zap.String( + "received", + hex.EncodeToString([]byte(proposal.State.Identity())), + ), + ) + return true + } + + e.registerPendingCertifiedParent(proposal) + + e.logger.Debug( + "cached finalized frame for descendant processing", + zap.Uint64("frame_number", frameNumber), + ) + + return true } func (e *GlobalConsensusEngine) processProposal( @@ -1272,6 +1306,19 @@ func (e *GlobalConsensusEngine) trySealParentWithChild( return } + finalized := e.forks.FinalizedState() + if finalized != nil && finalized.State != nil && + parentFrame <= (*finalized.State).Header.FrameNumber { + e.logger.Debug( + "skipping sealing for already finalized parent", + zap.Uint64("parent_frame", parentFrame), + ) + e.pendingCertifiedParentsMu.Lock() + delete(e.pendingCertifiedParents, parentFrame) + e.pendingCertifiedParentsMu.Unlock() + return + } + e.logger.Debug( "sealing parent with descendant proposal", zap.Uint64("parent_frame", parent.State.Header.FrameNumber), diff --git a/node/consensus/global/services.go b/node/consensus/global/services.go index 7a1c11b..e431572 100644 --- a/node/consensus/global/services.go +++ b/node/consensus/global/services.go @@ -96,7 +96,10 @@ func (e *GlobalConsensusEngine) GetGlobalProposal( zap.Uint64("frame_number", request.FrameNumber), zap.String("peer_id", peerID.String()), ) - frame, err := e.clockStore.GetGlobalClockFrame(request.FrameNumber) + frame, err := e.loadFrameMatchingSelector( + request.FrameNumber, + nil, + ) if err != nil { return &protobufs.GlobalProposalResponse{}, nil } @@ -110,7 +113,10 @@ func (e *GlobalConsensusEngine) GetGlobalProposal( return &protobufs.GlobalProposalResponse{}, nil } - parent, err := e.clockStore.GetGlobalClockFrame(request.FrameNumber - 1) + parent, err := e.loadFrameMatchingSelector( + request.FrameNumber-1, + frame.Header.ParentSelector, + ) if err != nil { e.logger.Debug( "received error while fetching global frame parent", @@ -147,6 +153,51 @@ func (e *GlobalConsensusEngine) GetGlobalProposal( }, nil } +func (e *GlobalConsensusEngine) loadFrameMatchingSelector( + frameNumber uint64, + expectedSelector []byte, +) (*protobufs.GlobalFrame, error) { + matchesSelector := func(frame *protobufs.GlobalFrame) bool { + if frame == nil || frame.Header == nil || len(expectedSelector) == 0 { + return true + } + return bytes.Equal([]byte(frame.Identity()), expectedSelector) + } + + frame, err := e.clockStore.GetGlobalClockFrame(frameNumber) + if err == nil && matchesSelector(frame) { + return frame, nil + } + + iter, iterErr := e.clockStore.RangeGlobalClockFrameCandidates( + frameNumber, + frameNumber, + ) + if iterErr != nil { + if err != nil { + return nil, err + } + return nil, iterErr + } + defer iter.Close() + + for ok := iter.First(); ok && iter.Valid(); ok = iter.Next() { + candidate, valErr := iter.Value() + if valErr != nil { + return nil, valErr + } + if matchesSelector(candidate) { + return candidate, nil + } + } + + if err == nil && matchesSelector(frame) { + return frame, nil + } + + return nil, store.ErrNotFound +} + func (e *GlobalConsensusEngine) GetAppShards( ctx context.Context, req *protobufs.GetAppShardsRequest, diff --git a/node/consensus/provers/proposer.go b/node/consensus/provers/proposer.go index 061e914..2212002 100644 --- a/node/consensus/provers/proposer.go +++ b/node/consensus/provers/proposer.go @@ -417,6 +417,9 @@ func (m *Manager) DecideJoins( if len(p) == 0 { continue } + if len(reject) > 99 { + break + } pc := make([]byte, len(p)) copy(pc, p) reject = append(reject, pc) @@ -431,6 +434,12 @@ func (m *Manager) DecideJoins( if len(p) == 0 { continue } + if len(reject) > 99 { + break + } + if len(confirm) > 99 { + break + } key := hex.EncodeToString(p) rec, ok := byHex[key] @@ -456,22 +465,25 @@ func (m *Manager) DecideJoins( } } - if availableWorkers == 0 && len(confirm) > 0 { - m.logger.Info( - "skipping confirmations due to lack of available workers", - zap.Int("pending_confirmations", len(confirm)), - ) - confirm = nil - } else if availableWorkers > 0 && len(confirm) > availableWorkers { - m.logger.Warn( - "limiting confirmations due to worker capacity", - zap.Int("pending_confirmations", len(confirm)), - zap.Int("available_workers", availableWorkers), - ) - confirm = confirm[:availableWorkers] + if len(reject) > 0 { + return m.workerMgr.DecideAllocations(reject, nil) + } else { + if availableWorkers == 0 && len(confirm) > 0 { + m.logger.Info( + "skipping confirmations due to lack of available workers", + zap.Int("pending_confirmations", len(confirm)), + ) + confirm = nil + } else if availableWorkers > 0 && len(confirm) > availableWorkers { + m.logger.Warn( + "limiting confirmations due to worker capacity", + zap.Int("pending_confirmations", len(confirm)), + zap.Int("available_workers", availableWorkers), + ) + confirm = confirm[:availableWorkers] + } + return m.workerMgr.DecideAllocations(nil, confirm) } - - return m.workerMgr.DecideAllocations(reject, confirm) } func (m *Manager) unallocatedWorkerCount() (int, error) { diff --git a/node/execution/state/hypergraph/hypergraph_state_test.go b/node/execution/state/hypergraph/hypergraph_state_test.go index 6b4372f..8b45a10 100644 --- a/node/execution/state/hypergraph/hypergraph_state_test.go +++ b/node/execution/state/hypergraph/hypergraph_state_test.go @@ -47,6 +47,7 @@ func setupTest(t *testing.T) (*hypergraph.HypergraphState, thypergraph.Hypergrap incProver, []int{}, &tests.Nopthenticator{}, + 200, ) st := hypergraph.NewHypergraphState(hg) @@ -96,6 +97,7 @@ func TestHypergraphState(t *testing.T) { incProver, []int{}, &tests.Nopthenticator{}, + 200, ) data := enc.Encrypt(make([]byte, 20), pub) verenc := data[0].Compress() diff --git a/node/rpc/hypergraph_sync_rpc_server_test.go b/node/rpc/hypergraph_sync_rpc_server_test.go index 2dc8382..1fe35f6 100644 --- a/node/rpc/hypergraph_sync_rpc_server_test.go +++ b/node/rpc/hypergraph_sync_rpc_server_test.go @@ -140,9 +140,9 @@ func TestHypergraphSyncServer(t *testing.T) { inclusionProver, ) crdts := make([]application.Hypergraph, numParties) - crdts[0] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "server")), serverHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}) - crdts[1] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "client")), clientHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}) - crdts[2] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "control")), controlHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}) + crdts[0] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "server")), serverHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}, 200) + crdts[1] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "client")), clientHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}, 200) + crdts[2] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "control")), controlHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}, 200) servertxn, _ := serverHypergraphStore.NewTransaction(false) clienttxn, _ := clientHypergraphStore.NewTransaction(false) @@ -417,8 +417,8 @@ func TestHypergraphPartialSync(t *testing.T) { inclusionProver, ) crdts := make([]application.Hypergraph, numParties) - crdts[0] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "server")), serverHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}) - crdts[2] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "control")), controlHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}) + crdts[0] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "server")), serverHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}, 200) + crdts[2] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "control")), controlHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}, 200) servertxn, _ := serverHypergraphStore.NewTransaction(false) controltxn, _ := controlHypergraphStore.NewTransaction(false) @@ -452,7 +452,7 @@ func TestHypergraphPartialSync(t *testing.T) { servertxn.Commit() - crdts[1] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "client")), clientHypergraphStore, inclusionProver, toIntSlice(toUint32Slice(branchfork)), &tests.Nopthenticator{}) + crdts[1] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "client")), clientHypergraphStore, inclusionProver, toIntSlice(toUint32Slice(branchfork)), &tests.Nopthenticator{}, 200) logger.Info("saved") for _, op := range operations1 { @@ -807,6 +807,7 @@ func TestHypergraphSyncWithConcurrentCommits(t *testing.T) { inclusionProver, []int{}, &tests.Nopthenticator{}, + 200, ) for i := 0; i < clientCount; i++ { clientPath := filepath.Join(clientBase, fmt.Sprintf("client-%d", i)) @@ -824,6 +825,7 @@ func TestHypergraphSyncWithConcurrentCommits(t *testing.T) { inclusionProver, []int{}, &tests.Nopthenticator{}, + 200, ) } defer func() { diff --git a/node/store/clock.go b/node/store/clock.go index 3626210..94ac976 100644 --- a/node/store/clock.go +++ b/node/store/clock.go @@ -28,6 +28,11 @@ type PebbleGlobalClockIterator struct { db *PebbleClockStore } +type PebbleGlobalClockCandidateIterator struct { + i store.Iterator + db *PebbleClockStore +} + type PebbleClockIterator struct { filter []byte start uint64 @@ -36,6 +41,14 @@ type PebbleClockIterator struct { db *PebbleClockStore } +type PebbleStagedShardFrameIterator struct { + filter []byte + start uint64 + end uint64 + cur uint64 + db *PebbleClockStore +} + type PebbleGlobalStateIterator struct { i store.Iterator db *PebbleClockStore @@ -66,7 +79,9 @@ type PebbleTimeoutCertificateIterator struct { } var _ store.TypedIterator[*protobufs.GlobalFrame] = (*PebbleGlobalClockIterator)(nil) +var _ store.TypedIterator[*protobufs.GlobalFrame] = (*PebbleGlobalClockCandidateIterator)(nil) var _ store.TypedIterator[*protobufs.AppShardFrame] = (*PebbleClockIterator)(nil) +var _ store.TypedIterator[*protobufs.AppShardFrame] = (*PebbleStagedShardFrameIterator)(nil) var _ store.TypedIterator[*protobufs.GlobalProposal] = (*PebbleGlobalStateIterator)(nil) var _ store.TypedIterator[*protobufs.AppShardProposal] = (*PebbleAppShardStateIterator)(nil) var _ store.TypedIterator[*protobufs.QuorumCertificate] = (*PebbleQuorumCertificateIterator)(nil) @@ -165,6 +180,95 @@ func (p *PebbleGlobalClockIterator) Close() error { return errors.Wrap(p.i.Close(), "closing global clock iterator") } +func (p *PebbleGlobalClockCandidateIterator) First() bool { + return p.i.First() +} + +func (p *PebbleGlobalClockCandidateIterator) Next() bool { + return p.i.Next() +} + +func (p *PebbleGlobalClockCandidateIterator) Valid() bool { + return p.i.Valid() +} + +func (p *PebbleGlobalClockCandidateIterator) Value() ( + *protobufs.GlobalFrame, + error, +) { + if !p.i.Valid() { + return nil, store.ErrNotFound + } + + key := p.i.Key() + value := p.i.Value() + + frameNumber, selector, err := extractFrameNumberAndSelectorFromCandidateKey(key) + if err != nil { + return nil, errors.Wrap(err, "get candidate clock frame iterator value") + } + + header := &protobufs.GlobalFrameHeader{} + if err := proto.Unmarshal(value, header); err != nil { + return nil, errors.Wrap(err, "get candidate clock frame iterator value") + } + + frame := &protobufs.GlobalFrame{ + Header: header, + } + + var requests []*protobufs.MessageBundle + requestIndex := uint16(0) + for { + requestKey := clockGlobalFrameRequestCandidateKey( + selector, + frameNumber, + requestIndex, + ) + requestData, closer, err := p.db.db.Get(requestKey) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + break + } + return nil, errors.Wrap(err, "get candidate clock frame iterator value") + } + defer closer.Close() + + request := &protobufs.MessageBundle{} + if err := proto.Unmarshal(requestData, request); err != nil { + return nil, errors.Wrap(err, "get candidate clock frame iterator value") + } + + requests = append(requests, request) + requestIndex++ + } + + frame.Requests = requests + + return frame, nil +} + +func (p *PebbleGlobalClockCandidateIterator) TruncatedValue() ( + *protobufs.GlobalFrame, + error, +) { + if !p.i.Valid() { + return nil, store.ErrNotFound + } + + value := p.i.Value() + header := &protobufs.GlobalFrameHeader{} + if err := proto.Unmarshal(value, header); err != nil { + return nil, errors.Wrap(err, "get candidate clock frame iterator value") + } + + return &protobufs.GlobalFrame{Header: header}, nil +} + +func (p *PebbleGlobalClockCandidateIterator) Close() error { + return errors.Wrap(p.i.Close(), "closing global clock candidate iterator") +} + func (p *PebbleClockIterator) First() bool { p.cur = p.start return true @@ -208,6 +312,46 @@ func (p *PebbleClockIterator) Value() (*protobufs.AppShardFrame, error) { return frame, nil } +func (p *PebbleStagedShardFrameIterator) First() bool { + p.cur = p.start + return true +} + +func (p *PebbleStagedShardFrameIterator) Next() bool { + p.cur++ + return p.cur <= p.end +} + +func (p *PebbleStagedShardFrameIterator) Valid() bool { + return p.cur >= p.start && p.cur <= p.end +} + +func (p *PebbleStagedShardFrameIterator) Value() (*protobufs.AppShardFrame, error) { + if !p.Valid() { + return nil, store.ErrNotFound + } + + frames, err := p.db.GetStagedShardClockFramesForFrameNumber(p.filter, p.cur) + if err != nil { + return nil, errors.Wrap(err, "get staged shard clocks") + } + if len(frames) == 0 { + return nil, store.ErrNotFound + } + return frames[len(frames)-1], nil +} + +func (p *PebbleStagedShardFrameIterator) TruncatedValue() ( + *protobufs.AppShardFrame, + error, +) { + return p.Value() +} + +func (p *PebbleStagedShardFrameIterator) Close() error { + return nil +} + func (p *PebbleClockIterator) Close() error { return nil } @@ -554,6 +698,23 @@ func extractFrameNumberFromGlobalFrameKey( return binary.BigEndian.Uint64(copied[2:10]), nil } +func extractFrameNumberAndSelectorFromCandidateKey( + key []byte, +) (uint64, []byte, error) { + frameNumber, err := extractFrameNumberFromGlobalFrameKey(key) + if err != nil { + return 0, nil, err + } + if len(key) < 10 { + return 0, nil, errors.Wrap( + store.ErrInvalidData, + "extract selector from global frame candidate key", + ) + } + selector := slices.Clone(key[10:]) + return frameNumber, selector, nil +} + func clockShardFrameKey( filter []byte, frameNumber uint64, @@ -896,6 +1057,28 @@ func (p *PebbleClockStore) RangeGlobalClockFrames( return &PebbleGlobalClockIterator{i: iter, db: p}, nil } +// RangeGlobalClockFrameCandidates implements ClockStore. +func (p *PebbleClockStore) RangeGlobalClockFrameCandidates( + startFrameNumber uint64, + endFrameNumber uint64, +) (store.TypedIterator[*protobufs.GlobalFrame], error) { + if startFrameNumber > endFrameNumber { + temp := endFrameNumber + endFrameNumber = startFrameNumber + startFrameNumber = temp + } + + iter, err := p.db.NewIter( + clockGlobalFrameCandidateKey(startFrameNumber, nil), + clockGlobalFrameCandidateKey(endFrameNumber+1, nil), + ) + if err != nil { + return nil, errors.Wrap(err, "range global clock frame candidates") + } + + return &PebbleGlobalClockCandidateIterator{i: iter, db: p}, nil +} + // PutGlobalClockFrame implements ClockStore. func (p *PebbleClockStore) PutGlobalClockFrame( frame *protobufs.GlobalFrame, @@ -1370,6 +1553,24 @@ func (p *PebbleClockStore) RangeShardClockFrames( }, nil } +func (p *PebbleClockStore) RangeStagedShardClockFrames( + filter []byte, + startFrameNumber uint64, + endFrameNumber uint64, +) (store.TypedIterator[*protobufs.AppShardFrame], error) { + if startFrameNumber > endFrameNumber { + startFrameNumber, endFrameNumber = endFrameNumber, startFrameNumber + } + + return &PebbleStagedShardFrameIterator{ + filter: filter, + start: startFrameNumber, + end: endFrameNumber, + cur: startFrameNumber, + db: p, + }, nil +} + func (p *PebbleClockStore) SetLatestShardClockFrameNumber( filter []byte, frameNumber uint64, diff --git a/node/store/consensus.go b/node/store/consensus.go index 507c20d..98030c3 100644 --- a/node/store/consensus.go +++ b/node/store/consensus.go @@ -41,7 +41,7 @@ func (p *PebbleConsensusStore) GetConsensusState(filter []byte) ( ) if err != nil { if errors.Is(err, pebble.ErrNotFound) { - return nil, ErrNotFound + return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get consensus state") @@ -119,7 +119,7 @@ func (p *PebbleConsensusStore) GetLivenessState(filter []byte) ( ) if err != nil { if errors.Is(err, pebble.ErrNotFound) { - return nil, ErrNotFound + return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get liveness state") diff --git a/node/store/hypergraph.go b/node/store/hypergraph.go index d03f0c7..8a0fa0a 100644 --- a/node/store/hypergraph.go +++ b/node/store/hypergraph.go @@ -465,6 +465,7 @@ func (p *PebbleHypergraphStore) SetCoveredPrefix(coveredPrefix []int) error { func (p *PebbleHypergraphStore) LoadHypergraph( authenticationProvider channel.AuthenticationProvider, + maxSyncSessions int, ) ( hypergraph.Hypergraph, error, @@ -491,6 +492,7 @@ func (p *PebbleHypergraphStore) LoadHypergraph( p.prover, coveredPrefix, authenticationProvider, + maxSyncSessions, ) vertexAddsIter, err := p.db.NewIter( diff --git a/node/store/pebble.go b/node/store/pebble.go index beee923..27adc75 100644 --- a/node/store/pebble.go +++ b/node/store/pebble.go @@ -33,6 +33,15 @@ var pebbleMigrations = []func(*pebble.Batch) error{ migration_2_1_0_10, migration_2_1_0_10, migration_2_1_0_11, + migration_2_1_0_14, + migration_2_1_0_141, + migration_2_1_0_142, + migration_2_1_0_143, + migration_2_1_0_144, + migration_2_1_0_145, + migration_2_1_0_146, + migration_2_1_0_147, + migration_2_1_0_148, } func NewPebbleDB( @@ -167,8 +176,8 @@ func (p *PebbleDB) migrate(logger *zap.Logger) error { for i := int(storedVersion); i < len(pebbleMigrations); i++ { logger.Warn( "performing pebble store migration", - zap.Int("from_version", int(i)), - zap.Int("to_version", int(i+1)), + zap.Int("from_version", int(storedVersion)), + zap.Int("to_version", int(storedVersion+1)), ) if err := pebbleMigrations[i](batch); err != nil { batch.Close() @@ -177,8 +186,8 @@ func (p *PebbleDB) migrate(logger *zap.Logger) error { } logger.Info( "migration step completed", - zap.Int("from_version", int(i)), - zap.Int("to_version", int(i+1)), + zap.Int("from_version", int(storedVersion)), + zap.Int("to_version", int(storedVersion+1)), ) } @@ -329,92 +338,6 @@ func (t *PebbleTransaction) DeleteRange( var _ store.Transaction = (*PebbleTransaction)(nil) -type pebbleSnapshotDB struct { - snap *pebble.Snapshot -} - -func (p *pebbleSnapshotDB) Get(key []byte) ([]byte, io.Closer, error) { - return p.snap.Get(key) -} - -func (p *pebbleSnapshotDB) Set(key, value []byte) error { - return errors.New("pebble snapshot is read-only") -} - -func (p *pebbleSnapshotDB) Delete(key []byte) error { - return errors.New("pebble snapshot is read-only") -} - -func (p *pebbleSnapshotDB) NewBatch(indexed bool) store.Transaction { - return &snapshotTransaction{} -} - -func (p *pebbleSnapshotDB) NewIter(lowerBound []byte, upperBound []byte) ( - store.Iterator, - error, -) { - return p.snap.NewIter(&pebble.IterOptions{ - LowerBound: lowerBound, - UpperBound: upperBound, - }) -} - -func (p *pebbleSnapshotDB) Compact(start, end []byte, parallelize bool) error { - return errors.New("pebble snapshot is read-only") -} - -func (p *pebbleSnapshotDB) Close() error { - return p.snap.Close() -} - -func (p *pebbleSnapshotDB) DeleteRange(start, end []byte) error { - return errors.New("pebble snapshot is read-only") -} - -func (p *pebbleSnapshotDB) CompactAll() error { - return errors.New("pebble snapshot is read-only") -} - -var _ store.KVDB = (*pebbleSnapshotDB)(nil) - -type snapshotTransaction struct{} - -func (s *snapshotTransaction) Get(key []byte) ([]byte, io.Closer, error) { - return nil, nil, errors.New("pebble snapshot transaction is read-only") -} - -func (s *snapshotTransaction) Set(key []byte, value []byte) error { - return errors.New("pebble snapshot transaction is read-only") -} - -func (s *snapshotTransaction) Commit() error { - return errors.New("pebble snapshot transaction is read-only") -} - -func (s *snapshotTransaction) Delete(key []byte) error { - return errors.New("pebble snapshot transaction is read-only") -} - -func (s *snapshotTransaction) Abort() error { - return nil -} - -func (s *snapshotTransaction) NewIter( - lowerBound []byte, - upperBound []byte, -) (store.Iterator, error) { - return nil, errors.New("pebble snapshot transaction is read-only") -} - -func (s *snapshotTransaction) DeleteRange( - lowerBound []byte, - upperBound []byte, -) error { - return errors.New("pebble snapshot transaction is read-only") -} - -var _ store.Transaction = (*snapshotTransaction)(nil) - func rightAlign(data []byte, size int) []byte { l := len(data) @@ -560,3 +483,125 @@ func migration_2_1_0_10(b *pebble.Batch) error { func migration_2_1_0_11(b *pebble.Batch) error { return nil } + +func migration_2_1_0_14(b *pebble.Batch) error { + return nil +} + +func migration_2_1_0_141(b *pebble.Batch) error { + return migration_2_1_0_14(b) +} + +func migration_2_1_0_142(b *pebble.Batch) error { + return migration_2_1_0_14(b) +} + +func migration_2_1_0_143(b *pebble.Batch) error { + return migration_2_1_0_14(b) +} + +func migration_2_1_0_144(b *pebble.Batch) error { + return migration_2_1_0_14(b) +} + +func migration_2_1_0_145(b *pebble.Batch) error { + return migration_2_1_0_14(b) +} + +func migration_2_1_0_146(b *pebble.Batch) error { + return migration_2_1_0_14(b) +} + +func migration_2_1_0_147(b *pebble.Batch) error { + return migration_2_1_0_14(b) +} + +func migration_2_1_0_148(b *pebble.Batch) error { + return migration_2_1_0_14(b) +} + +type pebbleSnapshotDB struct { + snap *pebble.Snapshot +} + +func (p *pebbleSnapshotDB) Get(key []byte) ([]byte, io.Closer, error) { + return p.snap.Get(key) +} + +func (p *pebbleSnapshotDB) Set(key, value []byte) error { + return errors.New("pebble snapshot is read-only") +} + +func (p *pebbleSnapshotDB) Delete(key []byte) error { + return errors.New("pebble snapshot is read-only") +} + +func (p *pebbleSnapshotDB) NewBatch(indexed bool) store.Transaction { + return &snapshotTransaction{} +} + +func (p *pebbleSnapshotDB) NewIter(lowerBound []byte, upperBound []byte) ( + store.Iterator, + error, +) { + return p.snap.NewIter(&pebble.IterOptions{ + LowerBound: lowerBound, + UpperBound: upperBound, + }) +} + +func (p *pebbleSnapshotDB) Compact(start, end []byte, parallelize bool) error { + return errors.New("pebble snapshot is read-only") +} + +func (p *pebbleSnapshotDB) Close() error { + return p.snap.Close() +} + +func (p *pebbleSnapshotDB) DeleteRange(start, end []byte) error { + return errors.New("pebble snapshot is read-only") +} + +func (p *pebbleSnapshotDB) CompactAll() error { + return errors.New("pebble snapshot is read-only") +} + +var _ store.KVDB = (*pebbleSnapshotDB)(nil) + +type snapshotTransaction struct{} + +func (s *snapshotTransaction) Get(key []byte) ([]byte, io.Closer, error) { + return nil, nil, errors.New("pebble snapshot transaction is read-only") +} + +func (s *snapshotTransaction) Set(key []byte, value []byte) error { + return errors.New("pebble snapshot transaction is read-only") +} + +func (s *snapshotTransaction) Commit() error { + return errors.New("pebble snapshot transaction is read-only") +} + +func (s *snapshotTransaction) Delete(key []byte) error { + return errors.New("pebble snapshot transaction is read-only") +} + +func (s *snapshotTransaction) Abort() error { + return nil +} + +func (s *snapshotTransaction) NewIter( + lowerBound []byte, + upperBound []byte, +) (store.Iterator, error) { + return nil, errors.New("pebble snapshot transaction is read-only") +} + +func (s *snapshotTransaction) DeleteRange( + lowerBound []byte, + upperBound []byte, +) error { + return errors.New("pebble snapshot transaction is read-only") +} + +var _ store.Transaction = (*snapshotTransaction)(nil) diff --git a/node/store/token.go b/node/store/token.go index 08dd3f9..26045c1 100644 --- a/node/store/token.go +++ b/node/store/token.go @@ -117,7 +117,7 @@ func (p *PebbleTokenStore) GetCoinsForOwner( ) if err != nil { if errors.Is(err, pebble.ErrNotFound) { - err = ErrNotFound + err = store.ErrNotFound return nil, nil, nil, err } err = errors.Wrap(err, "get coins for owner") @@ -156,7 +156,7 @@ func (p *PebbleTokenStore) GetCoinByAddress( coinBytes, closer, err := p.db.Get(coinKey(address)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { - err = ErrNotFound + err = store.ErrNotFound return 0, nil, err } err = errors.Wrap(err, "get coin by address") @@ -297,7 +297,7 @@ func (p *PebbleTokenStore) GetPendingTransactionByAddress( ) if err != nil { if errors.Is(err, pebble.ErrNotFound) { - err = ErrNotFound + err = store.ErrNotFound return nil, err } err = errors.Wrap(err, "get pending transaction by address") @@ -325,7 +325,7 @@ func (p *PebbleTokenStore) GetPendingTransactionsForOwner( ) if err != nil { if errors.Is(err, pebble.ErrNotFound) { - err = ErrNotFound + err = store.ErrNotFound return nil, err } err = errors.Wrap(err, "get pending transactions for owner") @@ -355,7 +355,7 @@ func (p *PebbleTokenStore) GetTransactionByAddress( txnBytes, closer, err := p.db.Get(transactionKey(domain, address)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { - err = ErrNotFound + err = store.ErrNotFound return nil, err } err = errors.Wrap(err, "get transaction by address") @@ -383,7 +383,7 @@ func (p *PebbleTokenStore) GetTransactionsForOwner( ) if err != nil { if errors.Is(err, pebble.ErrNotFound) { - err = ErrNotFound + err = store.ErrNotFound return nil, err } err = errors.Wrap(err, "get transactions for owner") diff --git a/node/store/worker.go b/node/store/worker.go index 699132e..fe4dfec 100644 --- a/node/store/worker.go +++ b/node/store/worker.go @@ -11,8 +11,6 @@ import ( "source.quilibrium.com/quilibrium/monorepo/types/store" ) -var ErrNotFound = errors.New("worker not found") - var _ store.WorkerStore = (*PebbleWorkerStore)(nil) type PebbleWorkerStore struct { @@ -55,7 +53,7 @@ func (p *PebbleWorkerStore) GetWorker(coreId uint) (*store.WorkerInfo, error) { data, closer, err := p.db.Get(workerKey(coreId)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { - return nil, ErrNotFound + return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get worker") } @@ -80,7 +78,7 @@ func (p *PebbleWorkerStore) GetWorkerByFilter(filter []byte) ( data, closer, err := p.db.Get(workerByFilterKey(filter)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { - return nil, ErrNotFound + return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get worker by filter") } diff --git a/node/tests/hypergraph_convergence_test.go b/node/tests/hypergraph_convergence_test.go index 2f1a0bb..ab737e3 100644 --- a/node/tests/hypergraph_convergence_test.go +++ b/node/tests/hypergraph_convergence_test.go @@ -109,7 +109,7 @@ func TestConvergence(t *testing.T) { store0 = s } hgs := pebblestore.NewPebbleHypergraphStore(&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, s, logger, enc, incProver) - crdts[i] = hg.NewHypergraph(logger, hgs, incProver, []int{}, &Nopthenticator{}) + crdts[i] = hg.NewHypergraph(logger, hgs, incProver, []int{}, &Nopthenticator{}, 200) hgs.MarkHypergraphAsComplete() } @@ -191,7 +191,7 @@ func TestConvergence(t *testing.T) { logger, _ := zap.NewDevelopment() hgs := pebblestore.NewPebbleHypergraphStore(&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, store0, logger, enc, incProver) - compload, err := hgs.LoadHypergraph(&Nopthenticator{}) + compload, err := hgs.LoadHypergraph(&Nopthenticator{}, 200) if err != nil { t.Errorf("Could not load hg, %v", err) } diff --git a/node/tests/hypergraph_test.go b/node/tests/hypergraph_test.go index 3b8681a..6139a6b 100644 --- a/node/tests/hypergraph_test.go +++ b/node/tests/hypergraph_test.go @@ -76,6 +76,7 @@ func TestHypergraph(t *testing.T) { prover, []int{}, &Nopthenticator{}, + 200, ) data := enc.Encrypt(make([]byte, 20), pub) verenc := data[0].Compress() @@ -145,6 +146,7 @@ func TestHypergraph(t *testing.T) { prover, []int{}, &Nopthenticator{}, + 200, ) data := enc.Encrypt(make([]byte, 20), pub) verenc := data[0].Compress() @@ -210,6 +212,7 @@ func TestHypergraph(t *testing.T) { prover, []int{}, &Nopthenticator{}, + 200, ) data := enc.Encrypt(make([]byte, 20), pub) verenc := data[0].Compress() @@ -269,6 +272,7 @@ func TestHypergraph(t *testing.T) { prover, []int{}, &Nopthenticator{}, + 200, ) data := enc.Encrypt(make([]byte, 20), pub) verenc := data[0].Compress() @@ -325,6 +329,7 @@ func TestHypergraph(t *testing.T) { prover, []int{}, &Nopthenticator{}, + 200, ) data := enc.Encrypt(make([]byte, 20), pub) verenc := data[0].Compress() @@ -370,6 +375,7 @@ func TestHypergraph(t *testing.T) { prover, []int{}, &Nopthenticator{}, + 200, ) data := enc.Encrypt(make([]byte, 20), pub) verenc := data[0].Compress() diff --git a/node/worker/manager.go b/node/worker/manager.go index 28fb641..5151594 100644 --- a/node/worker/manager.go +++ b/node/worker/manager.go @@ -21,7 +21,6 @@ import ( "google.golang.org/grpc" "source.quilibrium.com/quilibrium/monorepo/config" "source.quilibrium.com/quilibrium/monorepo/node/p2p" - "source.quilibrium.com/quilibrium/monorepo/node/store" "source.quilibrium.com/quilibrium/monorepo/protobufs" "source.quilibrium.com/quilibrium/monorepo/types/channel" typesStore "source.quilibrium.com/quilibrium/monorepo/types/store" @@ -309,7 +308,7 @@ func (w *WorkerManager) registerWorker(info *typesStore.WorkerInfo) error { existing, err := w.store.GetWorker(info.CoreId) creating := false if err != nil { - if errors.Is(err, store.ErrNotFound) { + if errors.Is(err, typesStore.ErrNotFound) { creating = true } else { workerOperationsTotal.WithLabelValues("register", "error").Inc() @@ -408,7 +407,7 @@ func (w *WorkerManager) AllocateWorker(coreId uint, filter []byte) error { worker, err := w.store.GetWorker(coreId) if err != nil { workerOperationsTotal.WithLabelValues("allocate", "error").Inc() - if errors.Is(err, store.ErrNotFound) { + if errors.Is(err, typesStore.ErrNotFound) { return errors.Wrap( errors.New("worker not found"), "allocate worker", @@ -486,7 +485,7 @@ func (w *WorkerManager) DeallocateWorker(coreId uint) error { worker, err := w.store.GetWorker(coreId) if err != nil { workerOperationsTotal.WithLabelValues("deallocate", "error").Inc() - if errors.Is(err, store.ErrNotFound) { + if errors.Is(err, typesStore.ErrNotFound) { return errors.New("worker not found") } return errors.Wrap(err, "deallocate worker") @@ -572,7 +571,7 @@ func (w *WorkerManager) GetWorkerIdByFilter(filter []byte) (uint, error) { // Fallback to store worker, err := w.store.GetWorkerByFilter(filter) if err != nil { - if errors.Is(err, store.ErrNotFound) { + if errors.Is(err, typesStore.ErrNotFound) { return 0, errors.Wrap( errors.New("no worker found for filter"), "get worker id by filter", @@ -608,7 +607,7 @@ func (w *WorkerManager) GetFilterByWorkerId(coreId uint) ([]byte, error) { // Fallback to store worker, err := w.store.GetWorker(coreId) if err != nil { - if errors.Is(err, store.ErrNotFound) { + if errors.Is(err, typesStore.ErrNotFound) { return nil, errors.Wrap( errors.New("worker not found"), "get filter by worker id", @@ -849,7 +848,7 @@ func (w *WorkerManager) ensureWorkerRegistered( if err == nil { return nil } - if !errors.Is(err, store.ErrNotFound) { + if !errors.Is(err, typesStore.ErrNotFound) { return err } diff --git a/node/worker/manager_test.go b/node/worker/manager_test.go index 4b5038e..91d6c9a 100644 --- a/node/worker/manager_test.go +++ b/node/worker/manager_test.go @@ -19,7 +19,6 @@ import ( "source.quilibrium.com/quilibrium/monorepo/config" qgrpc "source.quilibrium.com/quilibrium/monorepo/node/internal/grpc" "source.quilibrium.com/quilibrium/monorepo/node/p2p" - "source.quilibrium.com/quilibrium/monorepo/node/store" "source.quilibrium.com/quilibrium/monorepo/protobufs" "source.quilibrium.com/quilibrium/monorepo/types/channel" typesStore "source.quilibrium.com/quilibrium/monorepo/types/store" @@ -63,7 +62,7 @@ func (m *mockWorkerStore) GetWorker(coreId uint) (*typesStore.WorkerInfo, error) defer m.mu.Unlock() worker, exists := m.workers[coreId] if !exists { - return nil, store.ErrNotFound + return nil, typesStore.ErrNotFound } workerCopy := *worker @@ -78,7 +77,7 @@ func (m *mockWorkerStore) GetWorkerByFilter(filter []byte) (*typesStore.WorkerIn } worker, exists := m.workersByFilter[string(filter)] if !exists { - return nil, store.ErrNotFound + return nil, typesStore.ErrNotFound } return worker, nil } @@ -106,7 +105,7 @@ func (m *mockWorkerStore) DeleteWorker(txn typesStore.Transaction, coreId uint) defer m.mu.Unlock() worker, exists := m.workers[coreId] if !exists { - return store.ErrNotFound + return typesStore.ErrNotFound } delete(m.workers, coreId) if len(worker.Filter) > 0 { diff --git a/types/hypergraph/sync.go b/types/hypergraph/sync.go index d54e616..d754a9e 100644 --- a/types/hypergraph/sync.go +++ b/types/hypergraph/sync.go @@ -10,6 +10,8 @@ type SyncController struct { globalSync atomic.Bool statusMu sync.RWMutex syncStatus map[string]*SyncInfo + maxActiveSessions int32 + activeSessions atomic.Int32 } func (s *SyncController) TryEstablishSyncSession(peerID string) bool { @@ -18,7 +20,16 @@ func (s *SyncController) TryEstablishSyncSession(peerID string) bool { } info := s.getOrCreate(peerID) - return !info.inProgress.Swap(true) + if info.inProgress.Swap(true) { + return false + } + + if !s.incrementActiveSessions() { + info.inProgress.Store(false) + return false + } + + return true } func (s *SyncController) EndSyncSession(peerID string) { @@ -31,7 +42,9 @@ func (s *SyncController) EndSyncSession(peerID string) { info := s.syncStatus[peerID] s.statusMu.RUnlock() if info != nil { - info.inProgress.Store(false) + if info.inProgress.Swap(false) { + s.decrementActiveSessions() + } } } @@ -71,8 +84,45 @@ type SyncInfo struct { inProgress atomic.Bool } -func NewSyncController() *SyncController { +func NewSyncController(maxActiveSessions int) *SyncController { + var max int32 + if maxActiveSessions > 0 { + max = int32(maxActiveSessions) + } return &SyncController{ - syncStatus: map[string]*SyncInfo{}, + syncStatus: map[string]*SyncInfo{}, + maxActiveSessions: max, + } +} + +func (s *SyncController) incrementActiveSessions() bool { + if s.maxActiveSessions <= 0 { + return true + } + + for { + current := s.activeSessions.Load() + if current >= s.maxActiveSessions { + return false + } + if s.activeSessions.CompareAndSwap(current, current+1) { + return true + } + } +} + +func (s *SyncController) decrementActiveSessions() { + if s.maxActiveSessions <= 0 { + return + } + + for { + current := s.activeSessions.Load() + if current == 0 { + return + } + if s.activeSessions.CompareAndSwap(current, current-1) { + return + } } } diff --git a/types/mocks/clock_store.go b/types/mocks/clock_store.go index 9a2f7e3..722a553 100644 --- a/types/mocks/clock_store.go +++ b/types/mocks/clock_store.go @@ -16,6 +16,36 @@ type MockClockStore struct { mock.Mock } +// RangeStagedShardClockFrames implements store.ClockStore. +func (m *MockClockStore) RangeStagedShardClockFrames( + filter []byte, + startFrameNumber uint64, + endFrameNumber uint64, +) (store.TypedIterator[*protobufs.AppShardFrame], error) { + args := m.Called( + filter, + startFrameNumber, + endFrameNumber, + ) + + return args.Get(0).(store.TypedIterator[*protobufs.AppShardFrame]), + args.Error(1) +} + +// RangeGlobalClockFrameCandidates implements store.ClockStore. +func (m *MockClockStore) RangeGlobalClockFrameCandidates( + startFrameNumber uint64, + endFrameNumber uint64, +) (store.TypedIterator[*protobufs.GlobalFrame], error) { + args := m.Called( + startFrameNumber, + endFrameNumber, + ) + + return args.Get(0).(store.TypedIterator[*protobufs.GlobalFrame]), + args.Error(1) +} + // GetGlobalClockFrameCandidate implements store.ClockStore. func (m *MockClockStore) GetGlobalClockFrameCandidate( frameNumber uint64, diff --git a/types/store/clock.go b/types/store/clock.go index fcf4f10..ce37008 100644 --- a/types/store/clock.go +++ b/types/store/clock.go @@ -16,6 +16,10 @@ type ClockStore interface { startFrameNumber uint64, endFrameNumber uint64, ) (TypedIterator[*protobufs.GlobalFrame], error) + RangeGlobalClockFrameCandidates( + startFrameNumber uint64, + endFrameNumber uint64, + ) (TypedIterator[*protobufs.GlobalFrame], error) PutGlobalClockFrame(frame *protobufs.GlobalFrame, txn Transaction) error PutGlobalClockFrameCandidate( frame *protobufs.GlobalFrame, @@ -107,6 +111,11 @@ type ClockStore interface { parentSelector []byte, truncate bool, ) (*protobufs.AppShardFrame, error) + RangeStagedShardClockFrames( + filter []byte, + startFrameNumber uint64, + endFrameNumber uint64, + ) (TypedIterator[*protobufs.AppShardFrame], error) GetStagedShardClockFramesForFrameNumber( filter []byte, frameNumber uint64, diff --git a/types/store/hypergraph.go b/types/store/hypergraph.go index 0d08cca..56c95af 100644 --- a/types/store/hypergraph.go +++ b/types/store/hypergraph.go @@ -23,6 +23,7 @@ type HypergraphStore interface { SetCoveredPrefix(coveredPrefix []int) error LoadHypergraph( authenticationProvider channel.AuthenticationProvider, + numSyncWorkers int, ) ( hypergraph.Hypergraph, error,