v2.1.0.14

This commit is contained in:
Cassandra Heart 2025-12-03 23:53:46 -06:00
parent 3f516b04fd
commit 615e3bdcbc
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
36 changed files with 1483 additions and 362 deletions

View File

@ -43,7 +43,7 @@ func FormatVersion(version []byte) string {
}
func GetPatchNumber() byte {
return 0x0d
return 0x0e
}
func GetRCNumber() byte {

View File

@ -44,7 +44,7 @@ func NewParticipant[
pending []*models.SignedProposal[StateT, VoteT],
) (*eventloop.EventLoop[StateT, VoteT], error) {
cfg, err := timeout.NewConfig(
24*time.Second,
36*time.Second,
3*time.Minute,
1.2,
6,

View File

@ -1,6 +1,7 @@
package hypergraph
import (
"context"
"math/big"
"sync"
@ -53,6 +54,8 @@ type HypergraphCRDT struct {
// provides context-driven info for client identification
authenticationProvider channel.AuthenticationProvider
shutdownCtx context.Context
}
var _ hypergraph.Hypergraph = (*HypergraphCRDT)(nil)
@ -67,6 +70,7 @@ func NewHypergraph(
prover crypto.InclusionProver,
coveredPrefix []int,
authenticationProvider channel.AuthenticationProvider,
maxSyncSessions int,
) *HypergraphCRDT {
hg := &HypergraphCRDT{
logger: logger,
@ -79,7 +83,7 @@ func NewHypergraph(
prover: prover,
coveredPrefix: coveredPrefix,
authenticationProvider: authenticationProvider,
syncController: hypergraph.NewSyncController(),
syncController: hypergraph.NewSyncController(maxSyncSessions),
snapshotMgr: newSnapshotManager(logger),
}
@ -116,6 +120,29 @@ func (hg *HypergraphCRDT) cloneSetWithStore(
return set
}
func (hg *HypergraphCRDT) SetShutdownContext(ctx context.Context) {
hg.shutdownCtx = ctx
}
func (hg *HypergraphCRDT) contextWithShutdown(
parent context.Context,
) (context.Context, context.CancelFunc) {
if hg.shutdownCtx == nil {
return parent, func() {}
}
ctx, cancel := context.WithCancel(parent)
go func() {
select {
case <-ctx.Done():
case <-hg.shutdownCtx.Done():
cancel()
}
}()
return ctx, cancel
}
func (hg *HypergraphCRDT) snapshotSet(
shardKey tries.ShardKey,
targetStore tries.TreeBackingStore,

View File

@ -6,6 +6,7 @@ import (
"encoding/hex"
"fmt"
"io"
"net"
"slices"
"strings"
"sync"
@ -13,6 +14,7 @@ import (
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc/peer"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/hypergraph"
"source.quilibrium.com/quilibrium/monorepo/types/tries"
@ -22,12 +24,20 @@ import (
func (hg *HypergraphCRDT) HyperStream(
stream protobufs.HypergraphComparisonService_HyperStreamServer,
) error {
peerId, err := hg.authenticationProvider.Identify(stream.Context())
requestCtx := stream.Context()
ctx, shutdownCancel := hg.contextWithShutdown(requestCtx)
defer shutdownCancel()
peerId, err := hg.authenticationProvider.Identify(requestCtx)
if err != nil {
return errors.Wrap(err, "hyper stream")
}
peerKey := peerId.String()
sessionLogger := hg.logger.With(zap.String("peer_id", peerId.String()))
if addr := peerIPFromContext(requestCtx); addr != "" {
sessionLogger = sessionLogger.With(zap.String("peer_ip", addr))
}
if !hg.syncController.TryEstablishSyncSession(peerKey) {
return errors.New("peer already syncing")
}
@ -43,17 +53,17 @@ func (hg *HypergraphCRDT) HyperStream(
root := handle.Root()
if len(root) != 0 {
hg.logger.Debug(
sessionLogger.Debug(
"acquired snapshot",
zap.String("root", hex.EncodeToString(root)),
)
} else {
hg.logger.Debug("acquired snapshot", zap.String("root", ""))
sessionLogger.Debug("acquired snapshot", zap.String("root", ""))
}
snapshotStore := handle.Store()
err = hg.syncTreeServer(stream, snapshotStore, root)
err = hg.syncTreeServer(ctx, stream, snapshotStore, root, sessionLogger)
hg.syncController.SetStatus(peerKey, &hypergraph.SyncInfo{
Unreachable: false,
@ -208,9 +218,6 @@ func (hg *HypergraphCRDT) Sync(
}
}()
wg := sync.WaitGroup{}
wg.Add(1)
manager := &streamManager{
ctx: ctx,
cancel: cancel,
@ -222,6 +229,8 @@ func (hg *HypergraphCRDT) Sync(
lastSent: time.Now(),
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := manager.walk(
@ -481,11 +490,6 @@ func (s *streamManager) sendLeafData(
},
}
// s.logger.Info(
// "sending leaf data",
// zap.String("key", hex.EncodeToString(leaf.Key)),
// )
select {
case <-s.ctx.Done():
return s.ctx.Err()
@ -526,7 +530,6 @@ func (s *streamManager) sendLeafData(
}
if node == nil {
// s.logger.Info("no node, sending 0 leaves")
if err := s.stream.Send(&protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Metadata{
Metadata: &protobufs.HypersyncMetadata{Leaves: 0},
@ -552,7 +555,6 @@ func (s *streamManager) sendLeafData(
}
count++
}
// s.logger.Info("sending set of leaves", zap.Uint64("leaf_count", count))
if err := s.stream.Send(&protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Metadata{
Metadata: &protobufs.HypersyncMetadata{Leaves: count},
@ -571,7 +573,6 @@ func (s *streamManager) sendLeafData(
}
} else {
count = 1
// s.logger.Info("sending one leaf", zap.Uint64("leaf_count", count))
if err := s.stream.Send(&protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Metadata{
Metadata: &protobufs.HypersyncMetadata{Leaves: count},
@ -738,7 +739,7 @@ func getBranchInfoFromTree(
if branch, ok := node.(*tries.LazyVectorCommitmentBranchNode); ok {
branchInfo.Commitment = branch.Commitment
if len(branch.Commitment) == 0 {
panic("branch cannot have no commitment")
return nil, errors.New("invalid commitment")
}
for _, p := range branch.Prefix {
@ -771,7 +772,7 @@ func getBranchInfoFromTree(
}
if len(childCommit) == 0 {
panic("cannot have non-committed child")
return nil, errors.New("invalid commitment")
}
branchInfo.Children = append(
branchInfo.Children,
@ -785,7 +786,7 @@ func getBranchInfoFromTree(
} else if leaf, ok := node.(*tries.LazyVectorCommitmentLeafNode); ok {
branchInfo.Commitment = leaf.Commitment
if len(branchInfo.Commitment) == 0 {
panic("leaf cannot have no commitment")
return nil, errors.New("invalid commitment")
}
}
return branchInfo, nil
@ -840,16 +841,14 @@ func isLeaf(info *protobufs.HypergraphComparisonResponse) bool {
return len(info.Children) == 0
}
func queryNext(
ctx context.Context,
func (s *streamManager) queryNext(
incomingResponses <-chan *protobufs.HypergraphComparisonResponse,
stream hypergraph.HyperStream,
path []int32,
) (
*protobufs.HypergraphComparisonResponse,
error,
) {
if err := stream.Send(&protobufs.HypergraphComparison{
if err := s.stream.Send(&protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Query{
Query: &protobufs.HypergraphComparisonQuery{
Path: path,
@ -861,7 +860,7 @@ func queryNext(
}
select {
case <-ctx.Done():
case <-s.ctx.Done():
return nil, errors.Wrap(
errors.New("context canceled"),
"handle query",
@ -883,12 +882,11 @@ func queryNext(
}
func (s *streamManager) handleLeafData(
ctx context.Context,
incomingLeaves <-chan *protobufs.HypergraphComparison,
) error {
expectedLeaves := uint64(0)
select {
case <-ctx.Done():
case <-s.ctx.Done():
return errors.Wrap(
errors.New("context canceled"),
"handle leaf data",
@ -940,7 +938,7 @@ func (s *streamManager) handleLeafData(
}
}
select {
case <-ctx.Done():
case <-s.ctx.Done():
return errors.Wrap(
errors.New("context canceled"),
"handle leaf data",
@ -1032,19 +1030,15 @@ func (s *streamManager) handleLeafData(
return nil
}
func handleQueryNext(
logger *zap.Logger,
ctx context.Context,
func (s *streamManager) handleQueryNext(
incomingQueries <-chan *protobufs.HypergraphComparisonQuery,
stream hypergraph.HyperStream,
localTree *tries.LazyVectorCommitmentTree,
path []int32,
) (
*protobufs.HypergraphComparisonResponse,
error,
) {
select {
case <-ctx.Done():
case <-s.ctx.Done():
return nil, errors.Wrap(
errors.New("context canceled"),
"handle query next",
@ -1064,7 +1058,7 @@ func handleQueryNext(
)
}
branchInfo, err := getBranchInfoFromTree(logger, localTree, path)
branchInfo, err := getBranchInfoFromTree(s.logger, s.localTree, path)
if err != nil {
return nil, errors.Wrap(err, "handle query next")
}
@ -1075,7 +1069,7 @@ func handleQueryNext(
},
}
if err := stream.Send(resp); err != nil {
if err := s.stream.Send(resp); err != nil {
return nil, errors.Wrap(err, "handle query next")
}
@ -1088,19 +1082,15 @@ func handleQueryNext(
}
}
func descendIndex(
logger *zap.Logger,
ctx context.Context,
func (s *streamManager) descendIndex(
incomingResponses <-chan *protobufs.HypergraphComparisonResponse,
stream hypergraph.HyperStream,
localTree *tries.LazyVectorCommitmentTree,
path []int32,
) (
*protobufs.HypergraphComparisonResponse,
*protobufs.HypergraphComparisonResponse,
error,
) {
branchInfo, err := getBranchInfoFromTree(logger, localTree, path)
branchInfo, err := getBranchInfoFromTree(s.logger, s.localTree, path)
if err != nil {
return nil, nil, errors.Wrap(err, "descend index")
}
@ -1111,12 +1101,12 @@ func descendIndex(
},
}
if err := stream.Send(resp); err != nil {
if err := s.stream.Send(resp); err != nil {
return nil, nil, errors.Wrap(err, "descend index")
}
select {
case <-ctx.Done():
case <-s.ctx.Done():
return nil, nil, errors.Wrap(
errors.New("context canceled"),
"handle query next",
@ -1172,7 +1162,7 @@ func (s *streamManager) walk(
default:
}
pathString := zap.String("path", hex.EncodeToString(packPath(path)))
// pathString := zap.String("path", hex.EncodeToString(packPath(path)))
if bytes.Equal(lnode.Commitment, rnode.Commitment) {
// s.logger.Debug(
@ -1196,7 +1186,7 @@ func (s *streamManager) walk(
)
return errors.Wrap(err, "walk")
} else {
err := s.handleLeafData(s.ctx, incomingLeaves)
err := s.handleLeafData(incomingLeaves)
return errors.Wrap(err, "walk")
}
}
@ -1222,10 +1212,8 @@ func (s *streamManager) walk(
// s.logger.Debug("sending query")
traversePath = append(traversePath, child.Index)
var err error
rtrav, err = queryNext(
s.ctx,
rtrav, err = s.queryNext(
incomingResponses,
s.stream,
traversePath,
)
if err != nil {
@ -1246,7 +1234,7 @@ func (s *streamManager) walk(
)
return errors.Wrap(err, "walk")
} else {
err := s.handleLeafData(s.ctx, incomingLeaves)
err := s.handleLeafData(incomingLeaves)
return errors.Wrap(err, "walk")
}
}
@ -1276,12 +1264,8 @@ func (s *streamManager) walk(
traversedPath = append(traversedPath, nibble)
var err error
// s.logger.Debug("expecting query")
ltrav, err = handleQueryNext(
s.logger,
s.ctx,
ltrav, err = s.handleQueryNext(
incomingQueries,
s.stream,
s.localTree,
traversedPath,
)
if err != nil {
@ -1298,7 +1282,7 @@ func (s *streamManager) walk(
)
return errors.Wrap(err, "walk")
} else {
err := s.handleLeafData(s.ctx, incomingLeaves)
err := s.handleLeafData(incomingLeaves)
return errors.Wrap(err, "walk")
}
}
@ -1322,7 +1306,7 @@ func (s *streamManager) walk(
return errors.Wrap(err, "walk")
}
} else {
err := s.handleLeafData(s.ctx, incomingLeaves)
err := s.handleLeafData(incomingLeaves)
if err != nil {
return errors.Wrap(err, "walk")
}
@ -1367,7 +1351,7 @@ func (s *streamManager) walk(
}
if (lchild != nil && rchild == nil) ||
(lchild == nil && rchild != nil) {
s.logger.Info("branch divergence", pathString)
// s.logger.Info("branch divergence", pathString)
if lchild != nil {
nextPath := append(
append([]int32{}, lpref...),
@ -1384,7 +1368,7 @@ func (s *streamManager) walk(
}
if rchild != nil {
if !isServer {
err := s.handleLeafData(s.ctx, incomingLeaves)
err := s.handleLeafData(incomingLeaves)
if err != nil {
return errors.Wrap(err, "walk")
}
@ -1396,12 +1380,8 @@ func (s *streamManager) walk(
append([]int32{}, lpref...),
lchild.Index,
)
lc, rc, err := descendIndex(
s.logger,
s.ctx,
lc, rc, err := s.descendIndex(
incomingResponses,
s.stream,
s.localTree,
nextPath,
)
if err != nil {
@ -1414,7 +1394,7 @@ func (s *streamManager) walk(
return errors.Wrap(err, "walk")
}
} else {
err := s.handleLeafData(s.ctx, incomingLeaves)
err := s.handleLeafData(incomingLeaves)
if err != nil {
return errors.Wrap(err, "walk")
}
@ -1447,7 +1427,7 @@ func (s *streamManager) walk(
return errors.Wrap(err, "walk")
}
} else {
err := s.handleLeafData(s.ctx, incomingLeaves)
err := s.handleLeafData(incomingLeaves)
if err != nil {
return errors.Wrap(err, "walk")
}
@ -1462,17 +1442,23 @@ func (s *streamManager) walk(
// server side. It sends the local root info, then processes incoming messages,
// and queues further queries as differences are detected.
func (hg *HypergraphCRDT) syncTreeServer(
ctx context.Context,
stream protobufs.HypergraphComparisonService_HyperStreamServer,
snapshotStore tries.TreeBackingStore,
snapshotRoot []byte,
sessionLogger *zap.Logger,
) error {
logger := sessionLogger
if logger == nil {
logger = hg.logger
}
if len(snapshotRoot) != 0 {
hg.logger.Info(
logger.Info(
"syncing with snapshot",
zap.String("root", hex.EncodeToString(snapshotRoot)),
)
} else {
hg.logger.Info("syncing with snapshot", zap.String("root", ""))
logger.Info("syncing with snapshot", zap.String("root", ""))
}
msg, err := stream.Recv()
@ -1484,7 +1470,7 @@ func (hg *HypergraphCRDT) syncTreeServer(
return errors.New("client did not send valid initialization message")
}
hg.logger.Info("received initialization message")
logger.Info("received initialization message")
if len(query.ShardKey) != 35 {
return errors.New("invalid shard key")
@ -1501,7 +1487,7 @@ func (hg *HypergraphCRDT) syncTreeServer(
}
branchInfo, err := getBranchInfoFromTree(
hg.logger,
logger,
idSet.GetTree(),
query.Path,
)
@ -1537,7 +1523,7 @@ func (hg *HypergraphCRDT) syncTreeServer(
)
}
ctx, cancel := context.WithCancel(stream.Context())
ctx, cancel := context.WithCancel(ctx)
incomingQueriesIn, incomingQueriesOut :=
UnboundedChan[*protobufs.HypergraphComparisonQuery](
@ -1559,14 +1545,14 @@ func (hg *HypergraphCRDT) syncTreeServer(
for {
msg, err := stream.Recv()
if err == io.EOF {
hg.logger.Info("server stream recv eof")
logger.Info("server stream recv eof")
cancel()
close(incomingQueriesIn)
close(incomingResponsesIn)
return
}
if err != nil {
hg.logger.Info("server stream recv error", zap.Error(err))
logger.Info("server stream recv error", zap.Error(err))
cancel()
close(incomingQueriesIn)
close(incomingResponsesIn)
@ -1577,7 +1563,7 @@ func (hg *HypergraphCRDT) syncTreeServer(
}
switch m := msg.Payload.(type) {
case *protobufs.HypergraphComparison_LeafData:
hg.logger.Warn("received leaf from client, terminating")
logger.Warn("received leaf from client, terminating")
cancel()
close(incomingQueriesIn)
close(incomingResponsesIn)
@ -1592,18 +1578,17 @@ func (hg *HypergraphCRDT) syncTreeServer(
}
}()
wg := sync.WaitGroup{}
wg.Add(1)
manager := &streamManager{
ctx: ctx,
cancel: cancel,
logger: hg.logger,
logger: logger,
stream: stream,
hypergraphStore: snapshotStore,
localTree: idSet.GetTree(),
lastSent: time.Now(),
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := manager.walk(
@ -1617,7 +1602,7 @@ func (hg *HypergraphCRDT) syncTreeServer(
true,
)
if err != nil {
hg.logger.Error("error while syncing", zap.Error(err))
logger.Error("error while syncing", zap.Error(err))
}
}()
@ -1657,3 +1642,18 @@ func UnboundedChan[T any](
}()
return in, out
}
func peerIPFromContext(ctx context.Context) string {
if ctx == nil {
return ""
}
p, ok := peer.FromContext(ctx)
if !ok || p.Addr == nil {
return ""
}
host, _, err := net.SplitHostPort(p.Addr.String())
if err != nil {
return p.Addr.String()
}
return host
}

View File

@ -149,8 +149,13 @@ var engineSet = wire.NewSet(
func provideHypergraph(
store *store.PebbleHypergraphStore,
config *config.Config,
) (thypergraph.Hypergraph, error) {
return store.LoadHypergraph(&tests.Nopthenticator{})
workers := 1
if config.Engine.ArchiveMode {
workers = 100
}
return store.LoadHypergraph(&tests.Nopthenticator{}, workers)
}
var hypergraphSet = wire.NewSet(

View File

@ -91,7 +91,7 @@ func NewDataWorkerNodeWithProxyPubsub(logger *zap.Logger, config2 *config.Config
mpCitHVerifiableEncryptor := newVerifiableEncryptor()
kzgInclusionProver := bls48581.NewKZGInclusionProver(logger)
pebbleHypergraphStore := store2.NewPebbleHypergraphStore(dbConfig, pebbleDB, logger, mpCitHVerifiableEncryptor, kzgInclusionProver)
hypergraph, err := provideHypergraph(pebbleHypergraphStore)
hypergraph, err := provideHypergraph(pebbleHypergraphStore, config2)
if err != nil {
return nil, err
}
@ -151,7 +151,7 @@ func NewDataWorkerNodeWithoutProxyPubsub(logger *zap.Logger, config2 *config.Con
mpCitHVerifiableEncryptor := newVerifiableEncryptor()
kzgInclusionProver := bls48581.NewKZGInclusionProver(logger)
pebbleHypergraphStore := store2.NewPebbleHypergraphStore(dbConfig, pebbleDB, logger, mpCitHVerifiableEncryptor, kzgInclusionProver)
hypergraph, err := provideHypergraph(pebbleHypergraphStore)
hypergraph, err := provideHypergraph(pebbleHypergraphStore, config2)
if err != nil {
return nil, err
}
@ -205,7 +205,7 @@ func NewMasterNode(logger *zap.Logger, config2 *config.Config, coreId uint) (*Ma
mpCitHVerifiableEncryptor := newVerifiableEncryptor()
kzgInclusionProver := bls48581.NewKZGInclusionProver(logger)
pebbleHypergraphStore := store2.NewPebbleHypergraphStore(dbConfig, pebbleDB, logger, mpCitHVerifiableEncryptor, kzgInclusionProver)
hypergraph, err := provideHypergraph(pebbleHypergraphStore)
hypergraph, err := provideHypergraph(pebbleHypergraphStore, config2)
if err != nil {
return nil, err
}
@ -301,9 +301,13 @@ var engineSet = wire.NewSet(vdf.NewCachedWesolowskiFrameProver, bls48581.NewKZGI
),
)
func provideHypergraph(store3 *store2.PebbleHypergraphStore,
func provideHypergraph(store3 *store2.PebbleHypergraphStore, config *config.Config,
) (hypergraph.Hypergraph, error) {
return store3.LoadHypergraph(&tests.Nopthenticator{})
workers := 1
if config.Engine.ArchiveMode {
workers = 100
}
return store3.LoadHypergraph(&tests.Nopthenticator{}, workers)
}
var hypergraphSet = wire.NewSet(

View File

@ -132,6 +132,7 @@ type AppConsensusEngine struct {
ctx lifecycle.SignalerContext
cancel context.CancelFunc
quit chan struct{}
frameChainChecker *AppFrameChainChecker
canRunStandalone bool
blacklistMap map[string]bool
currentRank uint64
@ -264,7 +265,7 @@ func NewAppConsensusEngine(
proofCache: make(map[uint64][516]byte),
collectedMessages: []*protobufs.Message{},
provingMessages: []*protobufs.Message{},
appMessageSpillover: make(map[uint64][]*protobufs.Message),
appMessageSpillover: make(map[uint64][]*protobufs.Message),
consensusMessageQueue: make(chan *pb.Message, 1000),
proverMessageQueue: make(chan *pb.Message, 1000),
frameMessageQueue: make(chan *pb.Message, 100),
@ -279,6 +280,8 @@ func NewAppConsensusEngine(
peerAuthCache: make(map[string]time.Time),
}
engine.frameChainChecker = NewAppFrameChainChecker(clockStore, logger, appAddress)
keyId := "q-prover-key"
key, err := keyManager.GetSigningKey(keyId)
@ -743,6 +746,13 @@ func NewAppConsensusEngine(
})
engine.ComponentManager = componentBuilder.Build()
if hgWithShutdown, ok := engine.hyperSync.(interface {
SetShutdownContext(context.Context)
}); ok {
hgWithShutdown.SetShutdownContext(
contextFromShutdownSignal(engine.ShutdownSignal()),
)
}
return engine, nil
}
@ -998,6 +1008,18 @@ func (e *AppConsensusEngine) materialize(
return nil
}
func contextFromShutdownSignal(sig <-chan struct{}) context.Context {
ctx, cancel := context.WithCancel(context.Background())
go func() {
if sig == nil {
return
}
<-sig
cancel()
}()
return ctx
}
func (e *AppConsensusEngine) getPeerID() PeerID {
return PeerID{ID: e.getProverAddress()}
}
@ -2699,7 +2721,7 @@ func (e *AppConsensusEngine) getPendingProposals(
nextQC, err := e.clockStore.GetQuorumCertificate(e.appAddress, rank)
if err != nil {
e.logger.Debug("no qc for rank", zap.Error(err))
continue
break
}
value, err := e.clockStore.GetStagedShardClockFrame(
@ -2710,8 +2732,7 @@ func (e *AppConsensusEngine) getPendingProposals(
)
if err != nil {
e.logger.Debug("no frame for qc", zap.Error(err))
parent = nextQC
continue
break
}
var priorTCModel models.TimeoutCertificate = nil

View File

@ -0,0 +1,144 @@
package app
import (
"bytes"
"encoding/hex"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/store"
)
type AppFrameChainReader interface {
GetShardClockFrame(
filter []byte,
frameNumber uint64,
truncate bool,
) (*protobufs.AppShardFrame, error)
GetStagedShardClockFrame(
filter []byte,
frameNumber uint64,
parentSelector []byte,
truncate bool,
) (*protobufs.AppShardFrame, error)
}
type AppFrameChainChecker struct {
store AppFrameChainReader
filter []byte
logger *zap.Logger
}
func NewAppFrameChainChecker(
store store.ClockStore,
logger *zap.Logger,
filter []byte,
) *AppFrameChainChecker {
if store == nil {
return nil
}
if logger == nil {
logger = zap.NewNop()
}
return &AppFrameChainChecker{
store: appFrameChainStoreAdapter{store: store},
filter: filter,
logger: logger,
}
}
func (c *AppFrameChainChecker) CanProcessSequentialChain(
finalized *models.State[*protobufs.AppShardFrame],
proposal *protobufs.AppShardProposal,
) bool {
if c == nil || c.store == nil || proposal == nil ||
proposal.State == nil || proposal.State.Header == nil {
return false
}
parentSelector := proposal.State.Header.ParentSelector
if len(parentSelector) == 0 {
return false
}
frameNumber := proposal.State.Header.FrameNumber
if frameNumber == 0 {
return false
}
for frameNumber > 0 && len(parentSelector) > 0 {
frameNumber--
sealed, err := c.store.GetShardClockFrame(c.filter, frameNumber, false)
if err == nil && sealed != nil &&
bytes.Equal([]byte(sealed.Identity()), parentSelector) {
c.logger.Debug(
"app frame chain linked to sealed frame",
zap.Uint64("sealed_frame_number", frameNumber),
)
return true
}
candidate, err := c.store.GetStagedShardClockFrame(
c.filter,
frameNumber,
parentSelector,
false,
)
if err == nil && candidate != nil {
parentSelector = candidate.Header.ParentSelector
// keep walking
continue
}
if finalized != nil && finalized.State != nil &&
(*finalized.State).Header != nil &&
frameNumber == (*finalized.State).Header.FrameNumber &&
bytes.Equal([]byte(finalized.Identifier), parentSelector) {
c.logger.Debug(
"app frame chain linked to finalized frame",
zap.Uint64("finalized_frame_number", frameNumber),
)
return true
}
c.logger.Debug(
"missing app ancestor frame while validating chain",
zap.Uint64("missing_frame_number", frameNumber),
zap.String(
"expected_parent_selector",
hex.EncodeToString(parentSelector),
),
)
return false
}
return false
}
type appFrameChainStoreAdapter struct {
store store.ClockStore
}
func (a appFrameChainStoreAdapter) GetShardClockFrame(
filter []byte,
frameNumber uint64,
truncate bool,
) (*protobufs.AppShardFrame, error) {
frame, _, err := a.store.GetShardClockFrame(filter, frameNumber, truncate)
return frame, err
}
func (a appFrameChainStoreAdapter) GetStagedShardClockFrame(
filter []byte,
frameNumber uint64,
parentSelector []byte,
truncate bool,
) (*protobufs.AppShardFrame, error) {
return a.store.GetStagedShardClockFrame(
filter,
frameNumber,
parentSelector,
truncate,
)
}

View File

@ -258,6 +258,19 @@ func (e *AppConsensusEngine) handleAppShardProposal(
return
}
}
if e.frameChainChecker != nil &&
e.frameChainChecker.CanProcessSequentialChain(finalized, proposal) {
e.deleteCachedProposal(frameNumber)
if e.processProposal(proposal) {
e.drainProposalCache(frameNumber + 1)
return
}
e.logger.Debug("failed to process sequential proposal, caching")
e.cacheProposal(proposal)
return
}
expectedFrame, _, err := e.clockStore.GetLatestShardClockFrame(e.appAddress)
if err != nil {
e.logger.Error("could not obtain app time reel head", zap.Error(err))

View File

@ -14,6 +14,7 @@ import (
"google.golang.org/grpc/status"
qgrpc "source.quilibrium.com/quilibrium/monorepo/node/internal/grpc"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/store"
)
func (e *AppConsensusEngine) GetAppShardFrame(
@ -99,10 +100,10 @@ func (e *AppConsensusEngine) GetAppShardProposal(
zap.Uint64("frame_number", request.FrameNumber),
zap.String("peer_id", peerID.String()),
)
frame, _, err := e.clockStore.GetShardClockFrame(
frame, err := e.loadAppFrameMatchingSelector(
request.Filter,
request.FrameNumber,
false,
nil,
)
if err != nil {
return &protobufs.AppShardProposalResponse{}, nil
@ -117,10 +118,10 @@ func (e *AppConsensusEngine) GetAppShardProposal(
return &protobufs.AppShardProposalResponse{}, nil
}
parent, _, err := e.clockStore.GetShardClockFrame(
parent, err := e.loadAppFrameMatchingSelector(
request.Filter,
request.FrameNumber-1,
false,
frame.Header.ParentSelector,
)
if err != nil {
e.logger.Debug(
@ -168,6 +169,53 @@ func (e *AppConsensusEngine) RegisterServices(server *grpc.Server) {
protobufs.RegisterOnionServiceServer(server, e.onionService)
}
func (e *AppConsensusEngine) loadAppFrameMatchingSelector(
filter []byte,
frameNumber uint64,
expectedSelector []byte,
) (*protobufs.AppShardFrame, error) {
matchesSelector := func(frame *protobufs.AppShardFrame) bool {
if frame == nil || frame.Header == nil || len(expectedSelector) == 0 {
return true
}
return bytes.Equal(frame.Header.ParentSelector, expectedSelector)
}
frame, _, err := e.clockStore.GetShardClockFrame(filter, frameNumber, false)
if err == nil && matchesSelector(frame) {
return frame, nil
}
iter, iterErr := e.clockStore.RangeStagedShardClockFrames(
filter,
frameNumber,
frameNumber,
)
if iterErr != nil {
if err != nil {
return nil, err
}
return nil, iterErr
}
defer iter.Close()
for ok := iter.First(); ok && iter.Valid(); ok = iter.Next() {
candidate, valErr := iter.Value()
if valErr != nil {
return nil, valErr
}
if matchesSelector(candidate) {
return candidate, nil
}
}
if err == nil && matchesSelector(frame) {
return frame, nil
}
return nil, store.ErrNotFound
}
func (e *AppConsensusEngine) authenticateProverFromContext(
ctx context.Context,
) (peer.ID, error) {

View File

@ -141,7 +141,7 @@ func (e *GlobalConsensusEngine) QuorumThresholdForRank(
total += p.Seniority
}
return (total * 2) / 3, nil
return (total * 4) / 6, nil
}
// Self implements consensus.DynamicCommittee.
@ -163,7 +163,7 @@ func (e *GlobalConsensusEngine) TimeoutThresholdForRank(
total += p.Seniority
}
return (total * 2) / 3, nil
return (total * 4) / 6, nil
}
func internalProversToWeightedIdentity(

View File

@ -71,7 +71,18 @@ func (p *GlobalLeaderProvider) ProveNextState(
filter []byte,
priorState models.Identity,
) (**protobufs.GlobalFrame, error) {
prior, err := p.engine.clockStore.GetLatestGlobalClockFrame()
latestQC, qcErr := p.engine.clockStore.GetLatestQuorumCertificate(nil)
if qcErr != nil {
p.engine.logger.Debug(
"could not fetch latest quorum certificate",
zap.Error(qcErr),
)
}
prior, err := p.engine.clockStore.GetGlobalClockFrameCandidate(
latestQC.FrameNumber,
[]byte(priorState),
)
if err != nil {
frameProvingTotal.WithLabelValues("error").Inc()
return nil, models.NewNoVoteErrorf("could not collect: %+w", err)
@ -82,14 +93,6 @@ func (p *GlobalLeaderProvider) ProveNextState(
return nil, models.NewNoVoteErrorf("missing prior frame")
}
latestQC, qcErr := p.engine.clockStore.GetLatestQuorumCertificate(nil)
if qcErr != nil {
p.engine.logger.Debug(
"could not fetch latest quorum certificate",
zap.Error(qcErr),
)
}
if prior.Identity() != priorState {
frameProvingTotal.WithLabelValues("error").Inc()

View File

@ -25,22 +25,6 @@ func (p *GlobalVotingProvider) FinalizeQuorumCertificate(
state *models.State[*protobufs.GlobalFrame],
aggregatedSignature models.AggregatedSignature,
) (models.QuorumCertificate, error) {
cloned := (*state.State).Clone().(*protobufs.GlobalFrame)
cloned.Header.PublicKeySignatureBls48581 =
&protobufs.BLS48581AggregateSignature{
Signature: aggregatedSignature.GetSignature(),
PublicKey: &protobufs.BLS48581G2PublicKey{
KeyValue: aggregatedSignature.GetPubKey(),
},
Bitmask: aggregatedSignature.GetBitmask(),
}
frameBytes, err := cloned.ToCanonicalBytes()
if err != nil {
return nil, errors.Wrap(err, "finalize quorum certificate")
}
p.engine.pubsub.PublishToBitmask(GLOBAL_FRAME_BITMASK, frameBytes)
return &protobufs.QuorumCertificate{
Rank: (*state.State).GetRank(),
FrameNumber: (*state.State).Header.FrameNumber,

View File

@ -0,0 +1,135 @@
package global
import (
"bytes"
"encoding/hex"
"errors"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/store"
)
// FrameChainReader captures the minimal subset of clock store functionality
// required for sequential frame verification.
type FrameChainReader interface {
GetGlobalClockFrame(uint64) (*protobufs.GlobalFrame, error)
GetGlobalClockFrameCandidate(uint64, []byte) (*protobufs.GlobalFrame, error)
}
// FrameChainChecker verifies whether a proposal's parent chain can be linked
// through stored frame candidates or sealed frames.
type FrameChainChecker struct {
store FrameChainReader
logger *zap.Logger
}
// NewFrameChainChecker creates a new FrameChainChecker.
func NewFrameChainChecker(
store FrameChainReader,
logger *zap.Logger,
) *FrameChainChecker {
if store == nil {
return nil
}
if logger == nil {
logger = zap.NewNop()
}
return &FrameChainChecker{store: store, logger: logger}
}
// CanProcessSequentialChain returns true if the proposal's ancestors can be
// chained back to an existing sealed frame or the finalized state.
func (c *FrameChainChecker) CanProcessSequentialChain(
finalized *models.State[*protobufs.GlobalFrame],
proposal *protobufs.GlobalProposal,
) bool {
if c == nil || c.store == nil || proposal == nil ||
proposal.State == nil || proposal.State.Header == nil {
return false
}
parentSelector := proposal.State.Header.ParentSelector
if len(parentSelector) == 0 {
return false
}
frameNumber := proposal.State.Header.FrameNumber
if frameNumber == 0 {
return false
}
for frameNumber > 0 && len(parentSelector) > 0 {
frameNumber--
if sealed, err := c.store.GetGlobalClockFrame(frameNumber); err == nil &&
sealed != nil &&
bytes.Equal([]byte(sealed.Identity()), parentSelector) {
c.logger.Debug(
"frame chain linked to sealed frame",
zap.Uint64("sealed_frame_number", frameNumber),
)
return true
} else if err != nil && !errors.Is(err, store.ErrNotFound) {
c.logger.Warn(
"failed to read sealed frame during chain validation",
zap.Uint64("frame_number", frameNumber),
zap.Error(err),
)
return false
}
candidate, err := c.store.GetGlobalClockFrameCandidate(
frameNumber,
parentSelector,
)
if err == nil && candidate != nil {
if candidate.Header == nil ||
candidate.Header.FrameNumber != frameNumber {
c.logger.Debug(
"candidate frame had mismatched header",
zap.Uint64("frame_number", frameNumber),
)
return false
}
c.logger.Debug(
"frame chain matched candidate",
zap.Uint64("candidate_frame_number", frameNumber),
)
parentSelector = candidate.Header.ParentSelector
continue
}
if err != nil && !errors.Is(err, store.ErrNotFound) {
c.logger.Warn(
"failed to read candidate frame during chain validation",
zap.Uint64("frame_number", frameNumber),
zap.Error(err),
)
return false
}
if finalized != nil && finalized.State != nil &&
(*finalized.State).Header != nil &&
frameNumber == (*finalized.State).Header.FrameNumber &&
bytes.Equal([]byte(finalized.Identifier), parentSelector) {
c.logger.Debug(
"frame chain linked to finalized frame",
zap.Uint64("finalized_frame_number", frameNumber),
)
return true
}
c.logger.Debug(
"missing ancestor frame while validating chain",
zap.Uint64("missing_frame_number", frameNumber),
zap.String(
"expected_parent_selector",
hex.EncodeToString(parentSelector),
),
)
return false
}
return false
}

View File

@ -0,0 +1,180 @@
package global
import (
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/store"
)
func TestFrameChainChecker_CanProcessSequentialChain(t *testing.T) {
store := newMockFrameChainStore()
checker := NewFrameChainChecker(store, zap.NewNop())
finalized := newTestFrame(10, nil)
store.addSealed(finalized)
finalizedState := &models.State[*protobufs.GlobalFrame]{
Rank: finalized.Header.Rank,
Identifier: finalized.Identity(),
State: &finalized,
}
candidate := newTestFrame(
11,
[]byte(finalized.Identity()),
)
store.addCandidate(candidate)
proposalFrame := newTestFrame(
12,
[]byte(candidate.Identity()),
)
proposal := &protobufs.GlobalProposal{
State: proposalFrame,
}
require.True(
t,
checker.CanProcessSequentialChain(finalizedState, proposal),
)
}
func TestFrameChainChecker_CanProcessSequentialChainMultipleCandidates(
t *testing.T,
) {
store := newMockFrameChainStore()
checker := NewFrameChainChecker(store, zap.NewNop())
finalized := newTestFrame(20, nil)
store.addSealed(finalized)
finalizedState := &models.State[*protobufs.GlobalFrame]{
Rank: finalized.Header.Rank,
Identifier: finalized.Identity(),
State: &finalized,
}
candidate21 := newTestFrame(
21,
[]byte(finalized.Identity()),
)
store.addCandidate(candidate21)
candidate22 := newTestFrame(
22,
[]byte(candidate21.Identity()),
)
store.addCandidate(candidate22)
proposal := &protobufs.GlobalProposal{
State: newTestFrame(
23,
[]byte(candidate22.Identity()),
),
}
require.True(
t,
checker.CanProcessSequentialChain(finalizedState, proposal),
)
}
func TestFrameChainChecker_CanProcessSequentialChainMissingParent(
t *testing.T,
) {
store := newMockFrameChainStore()
checker := NewFrameChainChecker(store, zap.NewNop())
finalized := newTestFrame(5, nil)
store.addSealed(finalized)
finalizedState := &models.State[*protobufs.GlobalFrame]{
Rank: finalized.Header.Rank,
Identifier: finalized.Identity(),
State: &finalized,
}
// Proposal references a parent that does not exist
proposal := &protobufs.GlobalProposal{
State: newTestFrame(
6,
[]byte("missing-parent"),
),
}
require.False(
t,
checker.CanProcessSequentialChain(finalizedState, proposal),
)
}
type mockFrameChainStore struct {
sealed map[uint64]*protobufs.GlobalFrame
candidates map[uint64]map[string]*protobufs.GlobalFrame
}
func newMockFrameChainStore() *mockFrameChainStore {
return &mockFrameChainStore{
sealed: make(map[uint64]*protobufs.GlobalFrame),
candidates: make(map[uint64]map[string]*protobufs.GlobalFrame),
}
}
func (m *mockFrameChainStore) addSealed(frame *protobufs.GlobalFrame) {
if frame == nil || frame.Header == nil {
return
}
m.sealed[frame.Header.FrameNumber] = frame
}
func (m *mockFrameChainStore) addCandidate(frame *protobufs.GlobalFrame) {
if frame == nil || frame.Header == nil {
return
}
key := frame.Header.FrameNumber
if _, ok := m.candidates[key]; !ok {
m.candidates[key] = make(map[string]*protobufs.GlobalFrame)
}
m.candidates[key][string(frame.Identity())] = frame
}
func (m *mockFrameChainStore) GetGlobalClockFrame(
frameNumber uint64,
) (*protobufs.GlobalFrame, error) {
frame, ok := m.sealed[frameNumber]
if !ok {
return nil, store.ErrNotFound
}
return frame, nil
}
func (m *mockFrameChainStore) GetGlobalClockFrameCandidate(
frameNumber uint64,
selector []byte,
) (*protobufs.GlobalFrame, error) {
candidates := m.candidates[frameNumber]
if candidates == nil {
return nil, store.ErrNotFound
}
frame, ok := candidates[string(selector)]
if !ok {
return nil, store.ErrNotFound
}
return frame, nil
}
func newTestFrame(
frameNumber uint64,
parentSelector []byte,
) *protobufs.GlobalFrame {
header := &protobufs.GlobalFrameHeader{
FrameNumber: frameNumber,
ParentSelector: parentSelector,
Output: []byte{byte(frameNumber)},
Rank: frameNumber,
}
return &protobufs.GlobalFrame{
Header: header,
}
}

View File

@ -96,6 +96,18 @@ type LockedTransaction struct {
Filled bool
}
func contextFromShutdownSignal(sig <-chan struct{}) context.Context {
ctx, cancel := context.WithCancel(context.Background())
go func() {
if sig == nil {
return
}
<-sig
cancel()
}()
return ctx
}
// GlobalConsensusEngine uses the generic state machine for consensus
type GlobalConsensusEngine struct {
*lifecycle.ComponentManager
@ -133,6 +145,7 @@ type GlobalConsensusEngine struct {
peerInfoManager tp2p.PeerInfoManager
workerManager worker.WorkerManager
proposer *provers.Manager
frameChainChecker *FrameChainChecker
currentRank uint64
alertPublicKey []byte
hasSentKeyBundle bool
@ -319,6 +332,7 @@ func NewGlobalConsensusEngine(
appShardCache: make(map[string]*appShardCacheEntry),
globalMessageSpillover: make(map[uint64][][]byte),
}
engine.frameChainChecker = NewFrameChainChecker(clockStore, logger)
if err := engine.initGlobalMessageAggregator(); err != nil {
return nil, err
@ -934,6 +948,13 @@ func NewGlobalConsensusEngine(
}
engine.ComponentManager = componentBuilder.Build()
if hgWithShutdown, ok := engine.hyperSync.(interface {
SetShutdownContext(context.Context)
}); ok {
hgWithShutdown.SetShutdownContext(
contextFromShutdownSignal(engine.ShutdownSignal()),
)
}
return engine, nil
}
@ -1595,8 +1616,8 @@ func (e *GlobalConsensusEngine) materialize(
return errors.Wrap(err, "materialize")
}
if e.verifyProverRoot(frameNumber, expectedProverRoot, proposer) {
if !e.config.Engine.ArchiveMode || e.config.P2P.Network == 99 {
if !e.config.Engine.ArchiveMode || e.config.P2P.Network == 99 {
if e.verifyProverRoot(frameNumber, expectedProverRoot, proposer) {
e.reconcileLocalWorkerAllocations()
}
}
@ -3217,7 +3238,7 @@ func (e *GlobalConsensusEngine) DecideWorkerJoins(
msg,
)
if err != nil {
e.logger.Error("could not construct join", zap.Error(err))
e.logger.Error("could not construct join decisions", zap.Error(err))
return errors.Wrap(err, "decide worker joins")
}
@ -3538,7 +3559,7 @@ func (e *GlobalConsensusEngine) OnQuorumCertificateTriggeredRankChange(
newRank uint64,
qc models.QuorumCertificate,
) {
e.logger.Debug("adding certified state", zap.Uint64("rank", newRank-1))
e.logger.Debug("processing certified state", zap.Uint64("rank", newRank-1))
parentQC, err := e.clockStore.GetLatestQuorumCertificate(nil)
if err != nil {
@ -3613,6 +3634,20 @@ func (e *GlobalConsensusEngine) OnQuorumCertificateTriggeredRankChange(
return
}
cloned := frame.Clone().(*protobufs.GlobalFrame)
cloned.Header.PublicKeySignatureBls48581 =
&protobufs.BLS48581AggregateSignature{
Signature: qc.GetAggregatedSignature().GetSignature(),
PublicKey: &protobufs.BLS48581G2PublicKey{
KeyValue: qc.GetAggregatedSignature().GetPubKey(),
},
Bitmask: qc.GetAggregatedSignature().GetBitmask(),
}
frameBytes, err := cloned.ToCanonicalBytes()
if err == nil {
e.pubsub.PublishToBitmask(GLOBAL_FRAME_BITMASK, frameBytes)
}
if !bytes.Equal(frame.Header.ParentSelector, parentQC.Selector) {
e.logger.Error(
"quorum certificate does not match frame parent",
@ -3650,73 +3685,14 @@ func (e *GlobalConsensusEngine) OnQuorumCertificateTriggeredRankChange(
frame.Header.PublicKeySignatureBls48581 = aggregateSig
latest, err := e.clockStore.GetLatestGlobalClockFrame()
if err != nil {
e.logger.Error("could not obtain latest frame", zap.Error(err))
return
proposal := &protobufs.GlobalProposal{
State: frame,
ParentQuorumCertificate: parentQC,
PriorRankTimeoutCertificate: priorRankTC,
Vote: vote,
}
if latest.Header.FrameNumber+1 != frame.Header.FrameNumber ||
!bytes.Equal([]byte(latest.Identity()), frame.Header.ParentSelector) {
e.logger.Debug(
"not next frame, cannot advance",
zap.Uint64("latest_frame_number", latest.Header.FrameNumber),
zap.Uint64("new_frame_number", frame.Header.FrameNumber),
zap.String(
"latest_frame_selector",
hex.EncodeToString([]byte(latest.Identity())),
),
zap.String(
"new_frame_number",
hex.EncodeToString(frame.Header.ParentSelector),
),
)
return
}
txn, err = e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
}
if err := e.materialize(
txn,
frame,
); err != nil {
_ = txn.Abort()
e.logger.Error("could not materialize frame requests", zap.Error(err))
return
}
if err := e.clockStore.PutGlobalClockFrame(frame, txn); err != nil {
_ = txn.Abort()
e.logger.Error("could not put global frame", zap.Error(err))
return
}
if err := e.clockStore.PutCertifiedGlobalState(
&protobufs.GlobalProposal{
State: frame,
ParentQuorumCertificate: parentQC,
PriorRankTimeoutCertificate: priorRankTC,
Vote: vote,
},
txn,
); err != nil {
e.logger.Error("could not insert certified state", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()
}
if err := e.checkShardCoverage(frame.GetFrameNumber()); err != nil {
e.logger.Error("could not check shard coverage", zap.Error(err))
return
}
e.globalProposalQueue <- proposal
}
// OnRankChange implements consensus.Consumer.
@ -4050,7 +4026,7 @@ func (e *GlobalConsensusEngine) getPendingProposals(
nextQC, err := e.clockStore.GetQuorumCertificate(nil, rank)
if err != nil {
e.logger.Debug("no qc for rank", zap.Error(err))
continue
break
}
value, err := e.clockStore.GetGlobalClockFrameCandidate(
@ -4059,8 +4035,7 @@ func (e *GlobalConsensusEngine) getPendingProposals(
)
if err != nil {
e.logger.Debug("no frame for qc", zap.Error(err))
parent = nextQC
continue
break
}
var priorTCModel models.TimeoutCertificate = nil

View File

@ -245,6 +245,8 @@ func (e *GlobalConsensusEngine) addGlobalMessage(data []byte) {
bundle.Requests = bundle.Requests[:maxGlobalMessagesPerFrame]
}
e.logBundleRequestTypes(bundle)
encoded, err := bundle.ToCanonicalBytes()
if err != nil {
if e.logger != nil {
@ -263,6 +265,132 @@ func (e *GlobalConsensusEngine) addGlobalMessage(data []byte) {
e.messageAggregator.Add(record)
}
func (e *GlobalConsensusEngine) logBundleRequestTypes(
bundle *protobufs.MessageBundle,
) {
requestTypes := make([]string, 0, len(bundle.Requests))
detailFields := make([]zap.Field, 0)
for idx, request := range bundle.Requests {
typeName, detailField, hasDetail := requestTypeNameAndDetail(idx, request)
requestTypes = append(requestTypes, typeName)
if hasDetail {
detailFields = append(detailFields, detailField)
}
}
fields := []zap.Field{
zap.Int("request_count", len(bundle.Requests)),
zap.Strings("request_types", requestTypes),
zap.Int64("bundle_timestamp", bundle.Timestamp),
}
fields = append(fields, detailFields...)
e.logger.Debug("collected global request bundle", fields...)
}
func requestTypeNameAndDetail(
idx int,
req *protobufs.MessageRequest,
) (string, zap.Field, bool) {
if req == nil || req.GetRequest() == nil {
return "nil_request", zap.Field{}, false
}
switch actual := req.GetRequest().(type) {
case *protobufs.MessageRequest_Join:
return "ProverJoin", zap.Field{}, false
case *protobufs.MessageRequest_Leave:
return "ProverLeave", zap.Field{}, false
case *protobufs.MessageRequest_Pause:
return "ProverPause", zap.Field{}, false
case *protobufs.MessageRequest_Resume:
return "ProverResume", zap.Field{}, false
case *protobufs.MessageRequest_Confirm:
return "ProverConfirm", zap.Field{}, false
case *protobufs.MessageRequest_Reject:
return "ProverReject", zap.Field{}, false
case *protobufs.MessageRequest_Kick:
return "ProverKick", zap.Field{}, false
case *protobufs.MessageRequest_Update:
return "ProverUpdate",
zap.Any(fmt.Sprintf("request_%d_prover_update", idx), actual.Update),
true
case *protobufs.MessageRequest_TokenDeploy:
return "TokenDeploy",
zap.Any(fmt.Sprintf("request_%d_token_deploy", idx), actual.TokenDeploy),
true
case *protobufs.MessageRequest_TokenUpdate:
return "TokenUpdate",
zap.Any(fmt.Sprintf("request_%d_token_update", idx), actual.TokenUpdate),
true
case *protobufs.MessageRequest_Transaction:
return "Transaction",
zap.Any(fmt.Sprintf("request_%d_transaction", idx), actual.Transaction),
true
case *protobufs.MessageRequest_PendingTransaction:
return "PendingTransaction",
zap.Any(
fmt.Sprintf("request_%d_pending_transaction", idx),
actual.PendingTransaction,
),
true
case *protobufs.MessageRequest_MintTransaction:
return "MintTransaction",
zap.Any(fmt.Sprintf("request_%d_mint_transaction", idx), actual.MintTransaction),
true
case *protobufs.MessageRequest_HypergraphDeploy:
return "HypergraphDeploy",
zap.Any(fmt.Sprintf("request_%d_hypergraph_deploy", idx), actual.HypergraphDeploy),
true
case *protobufs.MessageRequest_HypergraphUpdate:
return "HypergraphUpdate",
zap.Any(fmt.Sprintf("request_%d_hypergraph_update", idx), actual.HypergraphUpdate),
true
case *protobufs.MessageRequest_VertexAdd:
return "VertexAdd",
zap.Any(fmt.Sprintf("request_%d_vertex_add", idx), actual.VertexAdd),
true
case *protobufs.MessageRequest_VertexRemove:
return "VertexRemove",
zap.Any(fmt.Sprintf("request_%d_vertex_remove", idx), actual.VertexRemove),
true
case *protobufs.MessageRequest_HyperedgeAdd:
return "HyperedgeAdd",
zap.Any(fmt.Sprintf("request_%d_hyperedge_add", idx), actual.HyperedgeAdd),
true
case *protobufs.MessageRequest_HyperedgeRemove:
return "HyperedgeRemove",
zap.Any(fmt.Sprintf("request_%d_hyperedge_remove", idx), actual.HyperedgeRemove),
true
case *protobufs.MessageRequest_ComputeDeploy:
return "ComputeDeploy",
zap.Any(fmt.Sprintf("request_%d_compute_deploy", idx), actual.ComputeDeploy),
true
case *protobufs.MessageRequest_ComputeUpdate:
return "ComputeUpdate",
zap.Any(fmt.Sprintf("request_%d_compute_update", idx), actual.ComputeUpdate),
true
case *protobufs.MessageRequest_CodeDeploy:
return "CodeDeploy",
zap.Any(fmt.Sprintf("request_%d_code_deploy", idx), actual.CodeDeploy),
true
case *protobufs.MessageRequest_CodeExecute:
return "CodeExecute",
zap.Any(fmt.Sprintf("request_%d_code_execute", idx), actual.CodeExecute),
true
case *protobufs.MessageRequest_CodeFinalize:
return "CodeFinalize",
zap.Any(fmt.Sprintf("request_%d_code_finalize", idx), actual.CodeFinalize),
true
case *protobufs.MessageRequest_Shard:
return "ShardFrame",
zap.Any(fmt.Sprintf("request_%d_shard_frame", idx), actual.Shard),
true
default:
return "unknown_request", zap.Field{}, false
}
}
func (e *GlobalConsensusEngine) getMessageCollector(
rank uint64,
) (keyedaggregator.Collector[sequencedGlobalMessage], bool, error) {

View File

@ -954,9 +954,14 @@ func (e *GlobalConsensusEngine) handleGlobalProposal(
finalizedFrameNumber := (*finalized.State).Header.FrameNumber
frameNumber := proposal.State.Header.FrameNumber
// drop proposals if we already processed them
// drop proposals if we already processed them, unless we need to
// rehydrate the finalized frame in persistence
if frameNumber <= finalizedFrameNumber ||
proposal.State.Header.Rank <= finalizedRank {
if e.tryRecoverFinalizedFrame(proposal, finalized) {
return
}
e.logger.Debug(
"dropping stale (lower than finalized) proposal",
zap.Uint64("finalized_rank", finalizedRank),
@ -988,13 +993,11 @@ func (e *GlobalConsensusEngine) handleGlobalProposal(
// if we have a parent, cache and move on
if proposal.State.Header.FrameNumber != 0 {
// also check with persistence layer
parent, err := e.clockStore.GetGlobalClockFrame(
proposal.State.Header.FrameNumber - 1,
)
if err != nil || !bytes.Equal(
[]byte(parent.Identity()),
_, err := e.clockStore.GetGlobalClockFrameCandidate(
proposal.State.Header.FrameNumber-1,
proposal.State.Header.ParentSelector,
) {
)
if err != nil {
e.logger.Debug(
"parent frame not stored, requesting sync",
zap.Uint64("rank", proposal.GetRank()),
@ -1022,37 +1025,68 @@ func (e *GlobalConsensusEngine) handleGlobalProposal(
}
}
expectedFrame, err := e.clockStore.GetLatestGlobalClockFrame()
if err != nil {
e.logger.Error("could not obtain latest global frame", zap.Error(err))
return
}
expectedFrameNumber := expectedFrame.Header.FrameNumber + 1
if frameNumber < expectedFrameNumber {
e.logger.Debug(
"dropping proposal behind expected frame",
zap.Uint64("frame_number", frameNumber),
zap.Uint64("expected_frame_number", expectedFrameNumber),
)
return
}
if frameNumber == expectedFrameNumber {
if e.frameChainChecker != nil &&
e.frameChainChecker.CanProcessSequentialChain(finalized, proposal) {
e.deleteCachedProposal(frameNumber)
if e.processProposal(proposal) {
e.drainProposalCache(frameNumber + 1)
return
}
e.logger.Debug("failed to process expected proposal, caching")
e.logger.Debug("failed to process sequential proposal, caching")
e.cacheProposal(proposal)
return
}
e.cacheProposal(proposal)
e.drainProposalCache(expectedFrameNumber)
}
func (e *GlobalConsensusEngine) tryRecoverFinalizedFrame(
proposal *protobufs.GlobalProposal,
finalized *models.State[*protobufs.GlobalFrame],
) bool {
if proposal == nil ||
proposal.State == nil ||
proposal.State.Header == nil ||
finalized == nil ||
finalized.State == nil ||
(*finalized.State).Header == nil {
return false
}
frameNumber := proposal.State.Header.FrameNumber
finalizedFrameNumber := (*finalized.State).Header.FrameNumber
if frameNumber != finalizedFrameNumber {
return false
}
if !bytes.Equal(
[]byte(finalized.Identifier),
[]byte(proposal.State.Identity()),
) {
e.logger.Warn(
"received conflicting finalized frame during sync",
zap.Uint64("finalized_frame_number", finalizedFrameNumber),
zap.String(
"expected",
hex.EncodeToString([]byte(finalized.Identifier)),
),
zap.String(
"received",
hex.EncodeToString([]byte(proposal.State.Identity())),
),
)
return true
}
e.registerPendingCertifiedParent(proposal)
e.logger.Debug(
"cached finalized frame for descendant processing",
zap.Uint64("frame_number", frameNumber),
)
return true
}
func (e *GlobalConsensusEngine) processProposal(
@ -1272,6 +1306,19 @@ func (e *GlobalConsensusEngine) trySealParentWithChild(
return
}
finalized := e.forks.FinalizedState()
if finalized != nil && finalized.State != nil &&
parentFrame <= (*finalized.State).Header.FrameNumber {
e.logger.Debug(
"skipping sealing for already finalized parent",
zap.Uint64("parent_frame", parentFrame),
)
e.pendingCertifiedParentsMu.Lock()
delete(e.pendingCertifiedParents, parentFrame)
e.pendingCertifiedParentsMu.Unlock()
return
}
e.logger.Debug(
"sealing parent with descendant proposal",
zap.Uint64("parent_frame", parent.State.Header.FrameNumber),

View File

@ -96,7 +96,10 @@ func (e *GlobalConsensusEngine) GetGlobalProposal(
zap.Uint64("frame_number", request.FrameNumber),
zap.String("peer_id", peerID.String()),
)
frame, err := e.clockStore.GetGlobalClockFrame(request.FrameNumber)
frame, err := e.loadFrameMatchingSelector(
request.FrameNumber,
nil,
)
if err != nil {
return &protobufs.GlobalProposalResponse{}, nil
}
@ -110,7 +113,10 @@ func (e *GlobalConsensusEngine) GetGlobalProposal(
return &protobufs.GlobalProposalResponse{}, nil
}
parent, err := e.clockStore.GetGlobalClockFrame(request.FrameNumber - 1)
parent, err := e.loadFrameMatchingSelector(
request.FrameNumber-1,
frame.Header.ParentSelector,
)
if err != nil {
e.logger.Debug(
"received error while fetching global frame parent",
@ -147,6 +153,51 @@ func (e *GlobalConsensusEngine) GetGlobalProposal(
}, nil
}
func (e *GlobalConsensusEngine) loadFrameMatchingSelector(
frameNumber uint64,
expectedSelector []byte,
) (*protobufs.GlobalFrame, error) {
matchesSelector := func(frame *protobufs.GlobalFrame) bool {
if frame == nil || frame.Header == nil || len(expectedSelector) == 0 {
return true
}
return bytes.Equal([]byte(frame.Identity()), expectedSelector)
}
frame, err := e.clockStore.GetGlobalClockFrame(frameNumber)
if err == nil && matchesSelector(frame) {
return frame, nil
}
iter, iterErr := e.clockStore.RangeGlobalClockFrameCandidates(
frameNumber,
frameNumber,
)
if iterErr != nil {
if err != nil {
return nil, err
}
return nil, iterErr
}
defer iter.Close()
for ok := iter.First(); ok && iter.Valid(); ok = iter.Next() {
candidate, valErr := iter.Value()
if valErr != nil {
return nil, valErr
}
if matchesSelector(candidate) {
return candidate, nil
}
}
if err == nil && matchesSelector(frame) {
return frame, nil
}
return nil, store.ErrNotFound
}
func (e *GlobalConsensusEngine) GetAppShards(
ctx context.Context,
req *protobufs.GetAppShardsRequest,

View File

@ -417,6 +417,9 @@ func (m *Manager) DecideJoins(
if len(p) == 0 {
continue
}
if len(reject) > 99 {
break
}
pc := make([]byte, len(p))
copy(pc, p)
reject = append(reject, pc)
@ -431,6 +434,12 @@ func (m *Manager) DecideJoins(
if len(p) == 0 {
continue
}
if len(reject) > 99 {
break
}
if len(confirm) > 99 {
break
}
key := hex.EncodeToString(p)
rec, ok := byHex[key]
@ -456,22 +465,25 @@ func (m *Manager) DecideJoins(
}
}
if availableWorkers == 0 && len(confirm) > 0 {
m.logger.Info(
"skipping confirmations due to lack of available workers",
zap.Int("pending_confirmations", len(confirm)),
)
confirm = nil
} else if availableWorkers > 0 && len(confirm) > availableWorkers {
m.logger.Warn(
"limiting confirmations due to worker capacity",
zap.Int("pending_confirmations", len(confirm)),
zap.Int("available_workers", availableWorkers),
)
confirm = confirm[:availableWorkers]
if len(reject) > 0 {
return m.workerMgr.DecideAllocations(reject, nil)
} else {
if availableWorkers == 0 && len(confirm) > 0 {
m.logger.Info(
"skipping confirmations due to lack of available workers",
zap.Int("pending_confirmations", len(confirm)),
)
confirm = nil
} else if availableWorkers > 0 && len(confirm) > availableWorkers {
m.logger.Warn(
"limiting confirmations due to worker capacity",
zap.Int("pending_confirmations", len(confirm)),
zap.Int("available_workers", availableWorkers),
)
confirm = confirm[:availableWorkers]
}
return m.workerMgr.DecideAllocations(nil, confirm)
}
return m.workerMgr.DecideAllocations(reject, confirm)
}
func (m *Manager) unallocatedWorkerCount() (int, error) {

View File

@ -47,6 +47,7 @@ func setupTest(t *testing.T) (*hypergraph.HypergraphState, thypergraph.Hypergrap
incProver,
[]int{},
&tests.Nopthenticator{},
200,
)
st := hypergraph.NewHypergraphState(hg)
@ -96,6 +97,7 @@ func TestHypergraphState(t *testing.T) {
incProver,
[]int{},
&tests.Nopthenticator{},
200,
)
data := enc.Encrypt(make([]byte, 20), pub)
verenc := data[0].Compress()

View File

@ -140,9 +140,9 @@ func TestHypergraphSyncServer(t *testing.T) {
inclusionProver,
)
crdts := make([]application.Hypergraph, numParties)
crdts[0] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "server")), serverHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{})
crdts[1] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "client")), clientHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{})
crdts[2] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "control")), controlHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{})
crdts[0] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "server")), serverHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}, 200)
crdts[1] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "client")), clientHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}, 200)
crdts[2] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "control")), controlHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}, 200)
servertxn, _ := serverHypergraphStore.NewTransaction(false)
clienttxn, _ := clientHypergraphStore.NewTransaction(false)
@ -417,8 +417,8 @@ func TestHypergraphPartialSync(t *testing.T) {
inclusionProver,
)
crdts := make([]application.Hypergraph, numParties)
crdts[0] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "server")), serverHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{})
crdts[2] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "control")), controlHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{})
crdts[0] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "server")), serverHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}, 200)
crdts[2] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "control")), controlHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}, 200)
servertxn, _ := serverHypergraphStore.NewTransaction(false)
controltxn, _ := controlHypergraphStore.NewTransaction(false)
@ -452,7 +452,7 @@ func TestHypergraphPartialSync(t *testing.T) {
servertxn.Commit()
crdts[1] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "client")), clientHypergraphStore, inclusionProver, toIntSlice(toUint32Slice(branchfork)), &tests.Nopthenticator{})
crdts[1] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "client")), clientHypergraphStore, inclusionProver, toIntSlice(toUint32Slice(branchfork)), &tests.Nopthenticator{}, 200)
logger.Info("saved")
for _, op := range operations1 {
@ -807,6 +807,7 @@ func TestHypergraphSyncWithConcurrentCommits(t *testing.T) {
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
for i := 0; i < clientCount; i++ {
clientPath := filepath.Join(clientBase, fmt.Sprintf("client-%d", i))
@ -824,6 +825,7 @@ func TestHypergraphSyncWithConcurrentCommits(t *testing.T) {
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
}
defer func() {

View File

@ -28,6 +28,11 @@ type PebbleGlobalClockIterator struct {
db *PebbleClockStore
}
type PebbleGlobalClockCandidateIterator struct {
i store.Iterator
db *PebbleClockStore
}
type PebbleClockIterator struct {
filter []byte
start uint64
@ -36,6 +41,14 @@ type PebbleClockIterator struct {
db *PebbleClockStore
}
type PebbleStagedShardFrameIterator struct {
filter []byte
start uint64
end uint64
cur uint64
db *PebbleClockStore
}
type PebbleGlobalStateIterator struct {
i store.Iterator
db *PebbleClockStore
@ -66,7 +79,9 @@ type PebbleTimeoutCertificateIterator struct {
}
var _ store.TypedIterator[*protobufs.GlobalFrame] = (*PebbleGlobalClockIterator)(nil)
var _ store.TypedIterator[*protobufs.GlobalFrame] = (*PebbleGlobalClockCandidateIterator)(nil)
var _ store.TypedIterator[*protobufs.AppShardFrame] = (*PebbleClockIterator)(nil)
var _ store.TypedIterator[*protobufs.AppShardFrame] = (*PebbleStagedShardFrameIterator)(nil)
var _ store.TypedIterator[*protobufs.GlobalProposal] = (*PebbleGlobalStateIterator)(nil)
var _ store.TypedIterator[*protobufs.AppShardProposal] = (*PebbleAppShardStateIterator)(nil)
var _ store.TypedIterator[*protobufs.QuorumCertificate] = (*PebbleQuorumCertificateIterator)(nil)
@ -165,6 +180,95 @@ func (p *PebbleGlobalClockIterator) Close() error {
return errors.Wrap(p.i.Close(), "closing global clock iterator")
}
func (p *PebbleGlobalClockCandidateIterator) First() bool {
return p.i.First()
}
func (p *PebbleGlobalClockCandidateIterator) Next() bool {
return p.i.Next()
}
func (p *PebbleGlobalClockCandidateIterator) Valid() bool {
return p.i.Valid()
}
func (p *PebbleGlobalClockCandidateIterator) Value() (
*protobufs.GlobalFrame,
error,
) {
if !p.i.Valid() {
return nil, store.ErrNotFound
}
key := p.i.Key()
value := p.i.Value()
frameNumber, selector, err := extractFrameNumberAndSelectorFromCandidateKey(key)
if err != nil {
return nil, errors.Wrap(err, "get candidate clock frame iterator value")
}
header := &protobufs.GlobalFrameHeader{}
if err := proto.Unmarshal(value, header); err != nil {
return nil, errors.Wrap(err, "get candidate clock frame iterator value")
}
frame := &protobufs.GlobalFrame{
Header: header,
}
var requests []*protobufs.MessageBundle
requestIndex := uint16(0)
for {
requestKey := clockGlobalFrameRequestCandidateKey(
selector,
frameNumber,
requestIndex,
)
requestData, closer, err := p.db.db.Get(requestKey)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
break
}
return nil, errors.Wrap(err, "get candidate clock frame iterator value")
}
defer closer.Close()
request := &protobufs.MessageBundle{}
if err := proto.Unmarshal(requestData, request); err != nil {
return nil, errors.Wrap(err, "get candidate clock frame iterator value")
}
requests = append(requests, request)
requestIndex++
}
frame.Requests = requests
return frame, nil
}
func (p *PebbleGlobalClockCandidateIterator) TruncatedValue() (
*protobufs.GlobalFrame,
error,
) {
if !p.i.Valid() {
return nil, store.ErrNotFound
}
value := p.i.Value()
header := &protobufs.GlobalFrameHeader{}
if err := proto.Unmarshal(value, header); err != nil {
return nil, errors.Wrap(err, "get candidate clock frame iterator value")
}
return &protobufs.GlobalFrame{Header: header}, nil
}
func (p *PebbleGlobalClockCandidateIterator) Close() error {
return errors.Wrap(p.i.Close(), "closing global clock candidate iterator")
}
func (p *PebbleClockIterator) First() bool {
p.cur = p.start
return true
@ -208,6 +312,46 @@ func (p *PebbleClockIterator) Value() (*protobufs.AppShardFrame, error) {
return frame, nil
}
func (p *PebbleStagedShardFrameIterator) First() bool {
p.cur = p.start
return true
}
func (p *PebbleStagedShardFrameIterator) Next() bool {
p.cur++
return p.cur <= p.end
}
func (p *PebbleStagedShardFrameIterator) Valid() bool {
return p.cur >= p.start && p.cur <= p.end
}
func (p *PebbleStagedShardFrameIterator) Value() (*protobufs.AppShardFrame, error) {
if !p.Valid() {
return nil, store.ErrNotFound
}
frames, err := p.db.GetStagedShardClockFramesForFrameNumber(p.filter, p.cur)
if err != nil {
return nil, errors.Wrap(err, "get staged shard clocks")
}
if len(frames) == 0 {
return nil, store.ErrNotFound
}
return frames[len(frames)-1], nil
}
func (p *PebbleStagedShardFrameIterator) TruncatedValue() (
*protobufs.AppShardFrame,
error,
) {
return p.Value()
}
func (p *PebbleStagedShardFrameIterator) Close() error {
return nil
}
func (p *PebbleClockIterator) Close() error {
return nil
}
@ -554,6 +698,23 @@ func extractFrameNumberFromGlobalFrameKey(
return binary.BigEndian.Uint64(copied[2:10]), nil
}
func extractFrameNumberAndSelectorFromCandidateKey(
key []byte,
) (uint64, []byte, error) {
frameNumber, err := extractFrameNumberFromGlobalFrameKey(key)
if err != nil {
return 0, nil, err
}
if len(key) < 10 {
return 0, nil, errors.Wrap(
store.ErrInvalidData,
"extract selector from global frame candidate key",
)
}
selector := slices.Clone(key[10:])
return frameNumber, selector, nil
}
func clockShardFrameKey(
filter []byte,
frameNumber uint64,
@ -896,6 +1057,28 @@ func (p *PebbleClockStore) RangeGlobalClockFrames(
return &PebbleGlobalClockIterator{i: iter, db: p}, nil
}
// RangeGlobalClockFrameCandidates implements ClockStore.
func (p *PebbleClockStore) RangeGlobalClockFrameCandidates(
startFrameNumber uint64,
endFrameNumber uint64,
) (store.TypedIterator[*protobufs.GlobalFrame], error) {
if startFrameNumber > endFrameNumber {
temp := endFrameNumber
endFrameNumber = startFrameNumber
startFrameNumber = temp
}
iter, err := p.db.NewIter(
clockGlobalFrameCandidateKey(startFrameNumber, nil),
clockGlobalFrameCandidateKey(endFrameNumber+1, nil),
)
if err != nil {
return nil, errors.Wrap(err, "range global clock frame candidates")
}
return &PebbleGlobalClockCandidateIterator{i: iter, db: p}, nil
}
// PutGlobalClockFrame implements ClockStore.
func (p *PebbleClockStore) PutGlobalClockFrame(
frame *protobufs.GlobalFrame,
@ -1370,6 +1553,24 @@ func (p *PebbleClockStore) RangeShardClockFrames(
}, nil
}
func (p *PebbleClockStore) RangeStagedShardClockFrames(
filter []byte,
startFrameNumber uint64,
endFrameNumber uint64,
) (store.TypedIterator[*protobufs.AppShardFrame], error) {
if startFrameNumber > endFrameNumber {
startFrameNumber, endFrameNumber = endFrameNumber, startFrameNumber
}
return &PebbleStagedShardFrameIterator{
filter: filter,
start: startFrameNumber,
end: endFrameNumber,
cur: startFrameNumber,
db: p,
}, nil
}
func (p *PebbleClockStore) SetLatestShardClockFrameNumber(
filter []byte,
frameNumber uint64,

View File

@ -41,7 +41,7 @@ func (p *PebbleConsensusStore) GetConsensusState(filter []byte) (
)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
return nil, store.ErrNotFound
}
return nil, errors.Wrap(err, "get consensus state")
@ -119,7 +119,7 @@ func (p *PebbleConsensusStore) GetLivenessState(filter []byte) (
)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
return nil, store.ErrNotFound
}
return nil, errors.Wrap(err, "get liveness state")

View File

@ -465,6 +465,7 @@ func (p *PebbleHypergraphStore) SetCoveredPrefix(coveredPrefix []int) error {
func (p *PebbleHypergraphStore) LoadHypergraph(
authenticationProvider channel.AuthenticationProvider,
maxSyncSessions int,
) (
hypergraph.Hypergraph,
error,
@ -491,6 +492,7 @@ func (p *PebbleHypergraphStore) LoadHypergraph(
p.prover,
coveredPrefix,
authenticationProvider,
maxSyncSessions,
)
vertexAddsIter, err := p.db.NewIter(

View File

@ -33,6 +33,15 @@ var pebbleMigrations = []func(*pebble.Batch) error{
migration_2_1_0_10,
migration_2_1_0_10,
migration_2_1_0_11,
migration_2_1_0_14,
migration_2_1_0_141,
migration_2_1_0_142,
migration_2_1_0_143,
migration_2_1_0_144,
migration_2_1_0_145,
migration_2_1_0_146,
migration_2_1_0_147,
migration_2_1_0_148,
}
func NewPebbleDB(
@ -167,8 +176,8 @@ func (p *PebbleDB) migrate(logger *zap.Logger) error {
for i := int(storedVersion); i < len(pebbleMigrations); i++ {
logger.Warn(
"performing pebble store migration",
zap.Int("from_version", int(i)),
zap.Int("to_version", int(i+1)),
zap.Int("from_version", int(storedVersion)),
zap.Int("to_version", int(storedVersion+1)),
)
if err := pebbleMigrations[i](batch); err != nil {
batch.Close()
@ -177,8 +186,8 @@ func (p *PebbleDB) migrate(logger *zap.Logger) error {
}
logger.Info(
"migration step completed",
zap.Int("from_version", int(i)),
zap.Int("to_version", int(i+1)),
zap.Int("from_version", int(storedVersion)),
zap.Int("to_version", int(storedVersion+1)),
)
}
@ -329,92 +338,6 @@ func (t *PebbleTransaction) DeleteRange(
var _ store.Transaction = (*PebbleTransaction)(nil)
type pebbleSnapshotDB struct {
snap *pebble.Snapshot
}
func (p *pebbleSnapshotDB) Get(key []byte) ([]byte, io.Closer, error) {
return p.snap.Get(key)
}
func (p *pebbleSnapshotDB) Set(key, value []byte) error {
return errors.New("pebble snapshot is read-only")
}
func (p *pebbleSnapshotDB) Delete(key []byte) error {
return errors.New("pebble snapshot is read-only")
}
func (p *pebbleSnapshotDB) NewBatch(indexed bool) store.Transaction {
return &snapshotTransaction{}
}
func (p *pebbleSnapshotDB) NewIter(lowerBound []byte, upperBound []byte) (
store.Iterator,
error,
) {
return p.snap.NewIter(&pebble.IterOptions{
LowerBound: lowerBound,
UpperBound: upperBound,
})
}
func (p *pebbleSnapshotDB) Compact(start, end []byte, parallelize bool) error {
return errors.New("pebble snapshot is read-only")
}
func (p *pebbleSnapshotDB) Close() error {
return p.snap.Close()
}
func (p *pebbleSnapshotDB) DeleteRange(start, end []byte) error {
return errors.New("pebble snapshot is read-only")
}
func (p *pebbleSnapshotDB) CompactAll() error {
return errors.New("pebble snapshot is read-only")
}
var _ store.KVDB = (*pebbleSnapshotDB)(nil)
type snapshotTransaction struct{}
func (s *snapshotTransaction) Get(key []byte) ([]byte, io.Closer, error) {
return nil, nil, errors.New("pebble snapshot transaction is read-only")
}
func (s *snapshotTransaction) Set(key []byte, value []byte) error {
return errors.New("pebble snapshot transaction is read-only")
}
func (s *snapshotTransaction) Commit() error {
return errors.New("pebble snapshot transaction is read-only")
}
func (s *snapshotTransaction) Delete(key []byte) error {
return errors.New("pebble snapshot transaction is read-only")
}
func (s *snapshotTransaction) Abort() error {
return nil
}
func (s *snapshotTransaction) NewIter(
lowerBound []byte,
upperBound []byte,
) (store.Iterator, error) {
return nil, errors.New("pebble snapshot transaction is read-only")
}
func (s *snapshotTransaction) DeleteRange(
lowerBound []byte,
upperBound []byte,
) error {
return errors.New("pebble snapshot transaction is read-only")
}
var _ store.Transaction = (*snapshotTransaction)(nil)
func rightAlign(data []byte, size int) []byte {
l := len(data)
@ -560,3 +483,125 @@ func migration_2_1_0_10(b *pebble.Batch) error {
func migration_2_1_0_11(b *pebble.Batch) error {
return nil
}
func migration_2_1_0_14(b *pebble.Batch) error {
return nil
}
func migration_2_1_0_141(b *pebble.Batch) error {
return migration_2_1_0_14(b)
}
func migration_2_1_0_142(b *pebble.Batch) error {
return migration_2_1_0_14(b)
}
func migration_2_1_0_143(b *pebble.Batch) error {
return migration_2_1_0_14(b)
}
func migration_2_1_0_144(b *pebble.Batch) error {
return migration_2_1_0_14(b)
}
func migration_2_1_0_145(b *pebble.Batch) error {
return migration_2_1_0_14(b)
}
func migration_2_1_0_146(b *pebble.Batch) error {
return migration_2_1_0_14(b)
}
func migration_2_1_0_147(b *pebble.Batch) error {
return migration_2_1_0_14(b)
}
func migration_2_1_0_148(b *pebble.Batch) error {
return migration_2_1_0_14(b)
}
type pebbleSnapshotDB struct {
snap *pebble.Snapshot
}
func (p *pebbleSnapshotDB) Get(key []byte) ([]byte, io.Closer, error) {
return p.snap.Get(key)
}
func (p *pebbleSnapshotDB) Set(key, value []byte) error {
return errors.New("pebble snapshot is read-only")
}
func (p *pebbleSnapshotDB) Delete(key []byte) error {
return errors.New("pebble snapshot is read-only")
}
func (p *pebbleSnapshotDB) NewBatch(indexed bool) store.Transaction {
return &snapshotTransaction{}
}
func (p *pebbleSnapshotDB) NewIter(lowerBound []byte, upperBound []byte) (
store.Iterator,
error,
) {
return p.snap.NewIter(&pebble.IterOptions{
LowerBound: lowerBound,
UpperBound: upperBound,
})
}
func (p *pebbleSnapshotDB) Compact(start, end []byte, parallelize bool) error {
return errors.New("pebble snapshot is read-only")
}
func (p *pebbleSnapshotDB) Close() error {
return p.snap.Close()
}
func (p *pebbleSnapshotDB) DeleteRange(start, end []byte) error {
return errors.New("pebble snapshot is read-only")
}
func (p *pebbleSnapshotDB) CompactAll() error {
return errors.New("pebble snapshot is read-only")
}
var _ store.KVDB = (*pebbleSnapshotDB)(nil)
type snapshotTransaction struct{}
func (s *snapshotTransaction) Get(key []byte) ([]byte, io.Closer, error) {
return nil, nil, errors.New("pebble snapshot transaction is read-only")
}
func (s *snapshotTransaction) Set(key []byte, value []byte) error {
return errors.New("pebble snapshot transaction is read-only")
}
func (s *snapshotTransaction) Commit() error {
return errors.New("pebble snapshot transaction is read-only")
}
func (s *snapshotTransaction) Delete(key []byte) error {
return errors.New("pebble snapshot transaction is read-only")
}
func (s *snapshotTransaction) Abort() error {
return nil
}
func (s *snapshotTransaction) NewIter(
lowerBound []byte,
upperBound []byte,
) (store.Iterator, error) {
return nil, errors.New("pebble snapshot transaction is read-only")
}
func (s *snapshotTransaction) DeleteRange(
lowerBound []byte,
upperBound []byte,
) error {
return errors.New("pebble snapshot transaction is read-only")
}
var _ store.Transaction = (*snapshotTransaction)(nil)

View File

@ -117,7 +117,7 @@ func (p *PebbleTokenStore) GetCoinsForOwner(
)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
err = ErrNotFound
err = store.ErrNotFound
return nil, nil, nil, err
}
err = errors.Wrap(err, "get coins for owner")
@ -156,7 +156,7 @@ func (p *PebbleTokenStore) GetCoinByAddress(
coinBytes, closer, err := p.db.Get(coinKey(address))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
err = ErrNotFound
err = store.ErrNotFound
return 0, nil, err
}
err = errors.Wrap(err, "get coin by address")
@ -297,7 +297,7 @@ func (p *PebbleTokenStore) GetPendingTransactionByAddress(
)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
err = ErrNotFound
err = store.ErrNotFound
return nil, err
}
err = errors.Wrap(err, "get pending transaction by address")
@ -325,7 +325,7 @@ func (p *PebbleTokenStore) GetPendingTransactionsForOwner(
)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
err = ErrNotFound
err = store.ErrNotFound
return nil, err
}
err = errors.Wrap(err, "get pending transactions for owner")
@ -355,7 +355,7 @@ func (p *PebbleTokenStore) GetTransactionByAddress(
txnBytes, closer, err := p.db.Get(transactionKey(domain, address))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
err = ErrNotFound
err = store.ErrNotFound
return nil, err
}
err = errors.Wrap(err, "get transaction by address")
@ -383,7 +383,7 @@ func (p *PebbleTokenStore) GetTransactionsForOwner(
)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
err = ErrNotFound
err = store.ErrNotFound
return nil, err
}
err = errors.Wrap(err, "get transactions for owner")

View File

@ -11,8 +11,6 @@ import (
"source.quilibrium.com/quilibrium/monorepo/types/store"
)
var ErrNotFound = errors.New("worker not found")
var _ store.WorkerStore = (*PebbleWorkerStore)(nil)
type PebbleWorkerStore struct {
@ -55,7 +53,7 @@ func (p *PebbleWorkerStore) GetWorker(coreId uint) (*store.WorkerInfo, error) {
data, closer, err := p.db.Get(workerKey(coreId))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
return nil, store.ErrNotFound
}
return nil, errors.Wrap(err, "get worker")
}
@ -80,7 +78,7 @@ func (p *PebbleWorkerStore) GetWorkerByFilter(filter []byte) (
data, closer, err := p.db.Get(workerByFilterKey(filter))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
return nil, store.ErrNotFound
}
return nil, errors.Wrap(err, "get worker by filter")
}

View File

@ -109,7 +109,7 @@ func TestConvergence(t *testing.T) {
store0 = s
}
hgs := pebblestore.NewPebbleHypergraphStore(&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, s, logger, enc, incProver)
crdts[i] = hg.NewHypergraph(logger, hgs, incProver, []int{}, &Nopthenticator{})
crdts[i] = hg.NewHypergraph(logger, hgs, incProver, []int{}, &Nopthenticator{}, 200)
hgs.MarkHypergraphAsComplete()
}
@ -191,7 +191,7 @@ func TestConvergence(t *testing.T) {
logger, _ := zap.NewDevelopment()
hgs := pebblestore.NewPebbleHypergraphStore(&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, store0, logger, enc, incProver)
compload, err := hgs.LoadHypergraph(&Nopthenticator{})
compload, err := hgs.LoadHypergraph(&Nopthenticator{}, 200)
if err != nil {
t.Errorf("Could not load hg, %v", err)
}

View File

@ -76,6 +76,7 @@ func TestHypergraph(t *testing.T) {
prover,
[]int{},
&Nopthenticator{},
200,
)
data := enc.Encrypt(make([]byte, 20), pub)
verenc := data[0].Compress()
@ -145,6 +146,7 @@ func TestHypergraph(t *testing.T) {
prover,
[]int{},
&Nopthenticator{},
200,
)
data := enc.Encrypt(make([]byte, 20), pub)
verenc := data[0].Compress()
@ -210,6 +212,7 @@ func TestHypergraph(t *testing.T) {
prover,
[]int{},
&Nopthenticator{},
200,
)
data := enc.Encrypt(make([]byte, 20), pub)
verenc := data[0].Compress()
@ -269,6 +272,7 @@ func TestHypergraph(t *testing.T) {
prover,
[]int{},
&Nopthenticator{},
200,
)
data := enc.Encrypt(make([]byte, 20), pub)
verenc := data[0].Compress()
@ -325,6 +329,7 @@ func TestHypergraph(t *testing.T) {
prover,
[]int{},
&Nopthenticator{},
200,
)
data := enc.Encrypt(make([]byte, 20), pub)
verenc := data[0].Compress()
@ -370,6 +375,7 @@ func TestHypergraph(t *testing.T) {
prover,
[]int{},
&Nopthenticator{},
200,
)
data := enc.Encrypt(make([]byte, 20), pub)
verenc := data[0].Compress()

View File

@ -21,7 +21,6 @@ import (
"google.golang.org/grpc"
"source.quilibrium.com/quilibrium/monorepo/config"
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/node/store"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/channel"
typesStore "source.quilibrium.com/quilibrium/monorepo/types/store"
@ -309,7 +308,7 @@ func (w *WorkerManager) registerWorker(info *typesStore.WorkerInfo) error {
existing, err := w.store.GetWorker(info.CoreId)
creating := false
if err != nil {
if errors.Is(err, store.ErrNotFound) {
if errors.Is(err, typesStore.ErrNotFound) {
creating = true
} else {
workerOperationsTotal.WithLabelValues("register", "error").Inc()
@ -408,7 +407,7 @@ func (w *WorkerManager) AllocateWorker(coreId uint, filter []byte) error {
worker, err := w.store.GetWorker(coreId)
if err != nil {
workerOperationsTotal.WithLabelValues("allocate", "error").Inc()
if errors.Is(err, store.ErrNotFound) {
if errors.Is(err, typesStore.ErrNotFound) {
return errors.Wrap(
errors.New("worker not found"),
"allocate worker",
@ -486,7 +485,7 @@ func (w *WorkerManager) DeallocateWorker(coreId uint) error {
worker, err := w.store.GetWorker(coreId)
if err != nil {
workerOperationsTotal.WithLabelValues("deallocate", "error").Inc()
if errors.Is(err, store.ErrNotFound) {
if errors.Is(err, typesStore.ErrNotFound) {
return errors.New("worker not found")
}
return errors.Wrap(err, "deallocate worker")
@ -572,7 +571,7 @@ func (w *WorkerManager) GetWorkerIdByFilter(filter []byte) (uint, error) {
// Fallback to store
worker, err := w.store.GetWorkerByFilter(filter)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
if errors.Is(err, typesStore.ErrNotFound) {
return 0, errors.Wrap(
errors.New("no worker found for filter"),
"get worker id by filter",
@ -608,7 +607,7 @@ func (w *WorkerManager) GetFilterByWorkerId(coreId uint) ([]byte, error) {
// Fallback to store
worker, err := w.store.GetWorker(coreId)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
if errors.Is(err, typesStore.ErrNotFound) {
return nil, errors.Wrap(
errors.New("worker not found"),
"get filter by worker id",
@ -849,7 +848,7 @@ func (w *WorkerManager) ensureWorkerRegistered(
if err == nil {
return nil
}
if !errors.Is(err, store.ErrNotFound) {
if !errors.Is(err, typesStore.ErrNotFound) {
return err
}

View File

@ -19,7 +19,6 @@ import (
"source.quilibrium.com/quilibrium/monorepo/config"
qgrpc "source.quilibrium.com/quilibrium/monorepo/node/internal/grpc"
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/node/store"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/channel"
typesStore "source.quilibrium.com/quilibrium/monorepo/types/store"
@ -63,7 +62,7 @@ func (m *mockWorkerStore) GetWorker(coreId uint) (*typesStore.WorkerInfo, error)
defer m.mu.Unlock()
worker, exists := m.workers[coreId]
if !exists {
return nil, store.ErrNotFound
return nil, typesStore.ErrNotFound
}
workerCopy := *worker
@ -78,7 +77,7 @@ func (m *mockWorkerStore) GetWorkerByFilter(filter []byte) (*typesStore.WorkerIn
}
worker, exists := m.workersByFilter[string(filter)]
if !exists {
return nil, store.ErrNotFound
return nil, typesStore.ErrNotFound
}
return worker, nil
}
@ -106,7 +105,7 @@ func (m *mockWorkerStore) DeleteWorker(txn typesStore.Transaction, coreId uint)
defer m.mu.Unlock()
worker, exists := m.workers[coreId]
if !exists {
return store.ErrNotFound
return typesStore.ErrNotFound
}
delete(m.workers, coreId)
if len(worker.Filter) > 0 {

View File

@ -10,6 +10,8 @@ type SyncController struct {
globalSync atomic.Bool
statusMu sync.RWMutex
syncStatus map[string]*SyncInfo
maxActiveSessions int32
activeSessions atomic.Int32
}
func (s *SyncController) TryEstablishSyncSession(peerID string) bool {
@ -18,7 +20,16 @@ func (s *SyncController) TryEstablishSyncSession(peerID string) bool {
}
info := s.getOrCreate(peerID)
return !info.inProgress.Swap(true)
if info.inProgress.Swap(true) {
return false
}
if !s.incrementActiveSessions() {
info.inProgress.Store(false)
return false
}
return true
}
func (s *SyncController) EndSyncSession(peerID string) {
@ -31,7 +42,9 @@ func (s *SyncController) EndSyncSession(peerID string) {
info := s.syncStatus[peerID]
s.statusMu.RUnlock()
if info != nil {
info.inProgress.Store(false)
if info.inProgress.Swap(false) {
s.decrementActiveSessions()
}
}
}
@ -71,8 +84,45 @@ type SyncInfo struct {
inProgress atomic.Bool
}
func NewSyncController() *SyncController {
func NewSyncController(maxActiveSessions int) *SyncController {
var max int32
if maxActiveSessions > 0 {
max = int32(maxActiveSessions)
}
return &SyncController{
syncStatus: map[string]*SyncInfo{},
syncStatus: map[string]*SyncInfo{},
maxActiveSessions: max,
}
}
func (s *SyncController) incrementActiveSessions() bool {
if s.maxActiveSessions <= 0 {
return true
}
for {
current := s.activeSessions.Load()
if current >= s.maxActiveSessions {
return false
}
if s.activeSessions.CompareAndSwap(current, current+1) {
return true
}
}
}
func (s *SyncController) decrementActiveSessions() {
if s.maxActiveSessions <= 0 {
return
}
for {
current := s.activeSessions.Load()
if current == 0 {
return
}
if s.activeSessions.CompareAndSwap(current, current-1) {
return
}
}
}

View File

@ -16,6 +16,36 @@ type MockClockStore struct {
mock.Mock
}
// RangeStagedShardClockFrames implements store.ClockStore.
func (m *MockClockStore) RangeStagedShardClockFrames(
filter []byte,
startFrameNumber uint64,
endFrameNumber uint64,
) (store.TypedIterator[*protobufs.AppShardFrame], error) {
args := m.Called(
filter,
startFrameNumber,
endFrameNumber,
)
return args.Get(0).(store.TypedIterator[*protobufs.AppShardFrame]),
args.Error(1)
}
// RangeGlobalClockFrameCandidates implements store.ClockStore.
func (m *MockClockStore) RangeGlobalClockFrameCandidates(
startFrameNumber uint64,
endFrameNumber uint64,
) (store.TypedIterator[*protobufs.GlobalFrame], error) {
args := m.Called(
startFrameNumber,
endFrameNumber,
)
return args.Get(0).(store.TypedIterator[*protobufs.GlobalFrame]),
args.Error(1)
}
// GetGlobalClockFrameCandidate implements store.ClockStore.
func (m *MockClockStore) GetGlobalClockFrameCandidate(
frameNumber uint64,

View File

@ -16,6 +16,10 @@ type ClockStore interface {
startFrameNumber uint64,
endFrameNumber uint64,
) (TypedIterator[*protobufs.GlobalFrame], error)
RangeGlobalClockFrameCandidates(
startFrameNumber uint64,
endFrameNumber uint64,
) (TypedIterator[*protobufs.GlobalFrame], error)
PutGlobalClockFrame(frame *protobufs.GlobalFrame, txn Transaction) error
PutGlobalClockFrameCandidate(
frame *protobufs.GlobalFrame,
@ -107,6 +111,11 @@ type ClockStore interface {
parentSelector []byte,
truncate bool,
) (*protobufs.AppShardFrame, error)
RangeStagedShardClockFrames(
filter []byte,
startFrameNumber uint64,
endFrameNumber uint64,
) (TypedIterator[*protobufs.AppShardFrame], error)
GetStagedShardClockFramesForFrameNumber(
filter []byte,
frameNumber uint64,

View File

@ -23,6 +23,7 @@ type HypergraphStore interface {
SetCoveredPrefix(coveredPrefix []int) error
LoadHypergraph(
authenticationProvider channel.AuthenticationProvider,
numSyncWorkers int,
) (
hypergraph.Hypergraph,
error,