rollup of pre-liveness check lock interactions

This commit is contained in:
Cassandra Heart 2025-10-11 04:59:11 -05:00
parent 5023e019e6
commit 70efacbac1
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
13 changed files with 1501 additions and 784 deletions

View File

@ -117,6 +117,7 @@ func (f *AppConsensusEngineFactory) CreateAppConsensusEngine(
appAddress,
f.proverRegistry,
f.clockStore,
f.config.Engine.ArchiveMode,
)
if err != nil {
return nil, errors.Wrap(err, "create app time reel")

View File

@ -53,9 +53,8 @@ func (e *AppConsensusEngine) validateConsensusMessage(
}
if !bytes.Equal(frame.Header.Address, e.appAddress) {
e.logger.Debug("frame address incorrect")
frameValidationTotal.WithLabelValues(e.appAddressHex, "reject").Inc()
return p2p.ValidationResultReject
return p2p.ValidationResultIgnore
}
if frame.Header.PublicKeySignatureBls48581 != nil {
@ -86,6 +85,12 @@ func (e *AppConsensusEngine) validateConsensusMessage(
return p2p.ValidationResultReject
}
now := time.Now().UnixMilli()
if livenessCheck.Timestamp > now+5000 ||
livenessCheck.Timestamp < now-5000 {
return p2p.ValidationResultIgnore
}
if err := livenessCheck.Validate(); err != nil {
e.logger.Debug("failed to validate liveness check", zap.Error(err))
return p2p.ValidationResultReject
@ -98,6 +103,11 @@ func (e *AppConsensusEngine) validateConsensusMessage(
return p2p.ValidationResultReject
}
now := time.Now().UnixMilli()
if vote.Timestamp > now+5000 || vote.Timestamp < now-5000 {
return p2p.ValidationResultIgnore
}
if err := vote.Validate(); err != nil {
e.logger.Debug("failed to validate vote", zap.Error(err))
return p2p.ValidationResultReject

View File

@ -114,6 +114,7 @@ type GlobalConsensusEngine struct {
globalPeerInfoMessageQueue chan *pb.Message
globalAlertMessageQueue chan *pb.Message
appFramesMessageQueue chan *pb.Message
shardConsensusMessageQueue chan *pb.Message
// Emergency halt
haltCtx context.Context
@ -235,9 +236,10 @@ func NewGlobalConsensusEngine(
globalConsensusMessageQueue: make(chan *pb.Message, 1000),
globalFrameMessageQueue: make(chan *pb.Message, 100),
globalProverMessageQueue: make(chan *pb.Message, 1000),
appFramesMessageQueue: make(chan *pb.Message, 1000000),
appFramesMessageQueue: make(chan *pb.Message, 10000),
globalPeerInfoMessageQueue: make(chan *pb.Message, 1000),
globalAlertMessageQueue: make(chan *pb.Message, 100),
shardConsensusMessageQueue: make(chan *pb.Message, 10000),
currentDifficulty: config.Engine.Difficulty,
lastProvenFrameTime: time.Now(),
blacklistMap: make(map[string]bool),
@ -519,6 +521,13 @@ func (e *GlobalConsensusEngine) Start(quit chan struct{}) <-chan error {
return errChan
}
err = e.subscribeToShardConsensusMessages()
if err != nil {
errChan <- errors.Wrap(err, "start")
close(errChan)
return errChan
}
// Subscribe to frames
err = e.subscribeToFrameMessages()
if err != nil {
@ -555,6 +564,10 @@ func (e *GlobalConsensusEngine) Start(quit chan struct{}) <-chan error {
e.wg.Add(1)
go e.processGlobalConsensusMessageQueue()
// Start shard consensus message queue processor
e.wg.Add(1)
go e.processShardConsensusMessageQueue()
// Start frame message queue processor
e.wg.Add(1)
go e.processFrameMessageQueue()
@ -757,6 +770,14 @@ func (e *GlobalConsensusEngine) Stop(force bool) <-chan error {
e.pubsub.UnregisterValidator(bytes.Repeat([]byte{0xff}, 32))
}
e.pubsub.Unsubscribe(slices.Concat(
[]byte{0},
bytes.Repeat([]byte{0xff}, 32),
), false)
e.pubsub.UnregisterValidator(slices.Concat(
[]byte{0},
bytes.Repeat([]byte{0xff}, 32),
))
e.pubsub.Unsubscribe(GLOBAL_FRAME_BITMASK, false)
e.pubsub.UnregisterValidator(GLOBAL_FRAME_BITMASK)
e.pubsub.Unsubscribe(GLOBAL_PROVER_BITMASK, false)

View File

@ -2,6 +2,7 @@ package global
import (
"bytes"
"slices"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
@ -87,6 +88,46 @@ func (e *GlobalConsensusEngine) subscribeToGlobalConsensus() error {
return nil
}
func (e *GlobalConsensusEngine) subscribeToShardConsensusMessages() error {
if err := e.pubsub.Subscribe(
slices.Concat(
[]byte{0},
bytes.Repeat([]byte{0xff}, 32),
),
func(message *pb.Message) error {
select {
case <-e.haltCtx.Done():
return nil
case e.shardConsensusMessageQueue <- message:
return nil
case <-e.ctx.Done():
return errors.New("context cancelled")
default:
e.logger.Warn("shard consensus queue full, dropping message")
return nil
}
},
); err != nil {
return errors.Wrap(err, "subscribe to shard consensus messages")
}
// Register frame validator
if err := e.pubsub.RegisterValidator(
slices.Concat(
[]byte{0},
bytes.Repeat([]byte{0xff}, 32),
),
func(peerID peer.ID, message *pb.Message) tp2p.ValidationResult {
return e.validateShardConsensusMessage(peerID, message)
},
true,
); err != nil {
return errors.Wrap(err, "subscribe to shard consensus messages")
}
return nil
}
func (e *GlobalConsensusEngine) subscribeToFrameMessages() error {
if err := e.pubsub.Subscribe(
GLOBAL_FRAME_BITMASK,

View File

@ -73,6 +73,12 @@ func (e *GlobalConsensusEngine) validateGlobalConsensusMessage(
return tp2p.ValidationResultReject
}
now := time.Now().UnixMilli()
if livenessCheck.Timestamp > now+5000 ||
livenessCheck.Timestamp < now-5000 {
return tp2p.ValidationResultIgnore
}
// Validate the liveness check
if err := livenessCheck.Validate(); err != nil {
e.logger.Debug("invalid liveness check", zap.Error(err))
@ -86,6 +92,11 @@ func (e *GlobalConsensusEngine) validateGlobalConsensusMessage(
return tp2p.ValidationResultReject
}
now := time.Now().UnixMilli()
if vote.Timestamp > now+5000 || vote.Timestamp < now-5000 {
return tp2p.ValidationResultIgnore
}
// Validate the vote
if err := vote.Validate(); err != nil {
e.logger.Debug("invalid vote", zap.Error(err))
@ -114,6 +125,105 @@ func (e *GlobalConsensusEngine) validateGlobalConsensusMessage(
return tp2p.ValidationResultAccept
}
func (e *GlobalConsensusEngine) validateShardConsensusMessage(
peerID peer.ID,
message *pb.Message,
) tp2p.ValidationResult {
// Check if data is long enough to contain type prefix
if len(message.Data) < 4 {
e.logger.Debug(
"message too short",
zap.Int("data_length", len(message.Data)),
)
return tp2p.ValidationResultReject
}
// Read type prefix from first 4 bytes
typePrefix := binary.BigEndian.Uint32(message.Data[:4])
switch typePrefix {
case protobufs.AppShardFrameType:
frame := &protobufs.AppShardFrame{}
if err := frame.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal frame", zap.Error(err))
return tp2p.ValidationResultReject
}
if frame.Header == nil {
e.logger.Debug("frame has no header")
return tp2p.ValidationResultReject
}
if frame.Header.PublicKeySignatureBls48581 != nil {
e.logger.Debug("frame validation has signature")
return tp2p.ValidationResultReject
}
valid, err := e.appFrameValidator.Validate(frame)
if err != nil {
e.logger.Debug("frame validation error", zap.Error(err))
return tp2p.ValidationResultReject
}
if !valid {
e.logger.Debug("invalid app frame")
return tp2p.ValidationResultReject
}
case protobufs.ProverLivenessCheckType:
livenessCheck := &protobufs.ProverLivenessCheck{}
if err := livenessCheck.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal liveness check", zap.Error(err))
return tp2p.ValidationResultReject
}
now := time.Now().UnixMilli()
if livenessCheck.Timestamp > now+5000 ||
livenessCheck.Timestamp < now-5000 {
return tp2p.ValidationResultIgnore
}
if err := livenessCheck.Validate(); err != nil {
e.logger.Debug("failed to validate liveness check", zap.Error(err))
return tp2p.ValidationResultReject
}
case protobufs.FrameVoteType:
vote := &protobufs.FrameVote{}
if err := vote.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal vote", zap.Error(err))
return tp2p.ValidationResultReject
}
now := time.Now().UnixMilli()
if vote.Timestamp > now+5000 || vote.Timestamp < now-5000 {
return tp2p.ValidationResultIgnore
}
if err := vote.Validate(); err != nil {
e.logger.Debug("failed to validate vote", zap.Error(err))
return tp2p.ValidationResultReject
}
case protobufs.FrameConfirmationType:
confirmation := &protobufs.FrameConfirmation{}
if err := confirmation.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal confirmation", zap.Error(err))
return tp2p.ValidationResultReject
}
if err := confirmation.Validate(); err != nil {
e.logger.Debug("failed to validate confirmation", zap.Error(err))
return tp2p.ValidationResultReject
}
default:
return tp2p.ValidationResultReject
}
return tp2p.ValidationResultAccept
}
func (e *GlobalConsensusEngine) validateProverMessage(
peerID peer.ID,
message *pb.Message,

View File

@ -108,6 +108,9 @@ type AppTimeReel struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
// Archive mode: whether to hold historic frame data
archiveMode bool
}
// NewAppTimeReel creates a new app time reel for a specific shard address
@ -116,6 +119,7 @@ func NewAppTimeReel(
address []byte,
proverRegistry consensus.ProverRegistry,
clockStore store.ClockStore,
archiveMode bool,
) (*AppTimeReel, error) {
cache, err := lru.New[string, *FrameNode](defaultAppCacheSize)
if err != nil {
@ -148,9 +152,10 @@ func NewAppTimeReel(
) error {
return nil
},
store: clockStore,
ctx: ctx,
cancel: cancel,
store: clockStore,
ctx: ctx,
cancel: cancel,
archiveMode: archiveMode,
}, nil
}
@ -309,11 +314,42 @@ func (a *AppTimeReel) Insert(
return a.insertGenesisFrame(frame, frameID)
}
// Non-archive: if we have no in-memory frames yet, accept the first as
// pseudo-root
if !a.archiveMode && a.root == nil && len(a.nodes) == 0 {
a.logger.Info("non-archive: accepting first frame as pseudo-root",
zap.String("address", fmt.Sprintf("%x", a.address)),
zap.Uint64("frame_number", frame.Header.FrameNumber))
return a.insertGenesisFrame(frame, frameID)
}
// Try to find parent
parentSelector := string(frame.Header.ParentSelector)
parentNode := a.findNodeBySelector(frame.Header.ParentSelector)
if parentNode == nil {
if !a.archiveMode && a.head != nil &&
frame.Header.FrameNumber > a.head.Frame.Header.FrameNumber {
// ahead-of-head orphan: stage as pending and pre-insert as orphan node
a.addPendingFrame(frame, parentSelector)
orphan := &FrameNode{
Frame: frame,
Parent: nil, // reparent later when parent arrives
Children: make(map[string]*FrameNode),
Depth: 1,
}
a.nodes[frameID] = orphan
a.framesByNumber[frame.Header.FrameNumber] =
append(a.framesByNumber[frame.Header.FrameNumber], orphan)
a.cache.Add(frameID, orphan)
// Evaluate fork choice (may snap ahead if gap > 360)
a.evaluateForkChoice(orphan)
return nil
}
// Parent not found, add to pending frames
a.addPendingFrame(frame, parentSelector)
return nil
@ -452,8 +488,30 @@ func (a *AppTimeReel) processPendingFrames(
for _, pending := range pendingList {
frameID := a.ComputeFrameID(pending.Frame)
// Skip if already processed
if _, exists := a.nodes[frameID]; exists {
if existing, ok := a.nodes[frameID]; ok {
// Re-parent previously pre-inserted orphan
if existing.Parent == nil {
existing.Parent = parentNode
existing.Depth = parentNode.Depth + 1
parentNode.Children[frameID] = existing
a.framesByNumber[pending.Frame.Header.FrameNumber] =
append(
a.framesByNumber[pending.Frame.Header.FrameNumber],
existing,
)
a.cache.Add(frameID, existing)
a.logger.Debug("reparented pending orphan frame",
zap.String("address", fmt.Sprintf("%x", a.address)),
zap.Uint64("frame_number", pending.Frame.Header.FrameNumber),
zap.String("id", frameID))
a.processPendingFrames(frameID, existing)
a.evaluateForkChoice(existing)
}
// Skip if already processed
continue
}
@ -498,9 +556,9 @@ func (a *AppTimeReel) findNodeBySelector(selector []byte) *FrameNode {
// evaluateForkChoice evaluates fork choice and updates head if necessary
func (a *AppTimeReel) evaluateForkChoice(newNode *FrameNode) {
if a.head == nil ||
(newNode.Frame.Header.FrameNumber > a.head.Frame.Header.FrameNumber &&
newNode.Frame.Header.FrameNumber-a.head.Frame.Header.FrameNumber > 360) {
if a.head == nil || (!a.archiveMode &&
newNode.Frame.Header.FrameNumber > a.head.Frame.Header.FrameNumber &&
newNode.Frame.Header.FrameNumber-a.head.Frame.Header.FrameNumber > 360) {
oldHead := a.head
a.head = newNode
a.sendHeadEvent(newNode, oldHead)
@ -708,17 +766,45 @@ func (a *AppTimeReel) evaluateForkChoice(newNode *FrameNode) {
}
}
// findLeafNodes returns all leaf nodes (nodes with no children)
func (a *AppTimeReel) findLeafNodes() []*FrameNode {
// findLeafNodes returns all leaf nodes (nodes with no children) that are in the
// same connected component as the current head. This prevents spurious fork
// choice across disconnected forests (e.g., after a non-archive snap-ahead).
func (g *AppTimeReel) findLeafNodes() []*FrameNode {
var leaves []*FrameNode
for _, node := range a.nodes {
if len(node.Children) == 0 {
if g.head == nil {
// Fallback: no head yet, return all leaves
for _, node := range g.nodes {
if len(node.Children) == 0 {
leaves = append(leaves, node)
}
}
return leaves
}
headRoot := g.findRoot(g.head)
for _, node := range g.nodes {
if len(node.Children) != 0 {
continue
}
if g.findRoot(node) == headRoot {
leaves = append(leaves, node)
}
}
return leaves
}
// findRoot walks parents to identify the root of a node
func (a *AppTimeReel) findRoot(n *FrameNode) *FrameNode {
cur := n
for cur != nil && cur.Parent != nil {
cur = cur.Parent
}
return cur
}
// nodeToBranch converts a node and its lineage to a Branch for fork choice
func (a *AppTimeReel) nodeToBranch(node *FrameNode) Branch {
// Build lineage from this node backwards, but limit to 360 frames
@ -1435,13 +1521,14 @@ func (a *AppTimeReel) bootstrapFromStore() error {
latest, _, err := a.store.GetLatestShardClockFrame(a.address)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return nil // fresh DB
return nil
}
return errors.Wrap(err, "bootstrap from store")
}
latestNum := latest.Header.FrameNumber
var start uint64
if latestNum+1 > maxTreeDepth {
if !a.archiveMode && latestNum+1 > maxTreeDepth {
start = latestNum - (maxTreeDepth - 1)
} else {
start = 0
@ -1481,18 +1568,22 @@ func (a *AppTimeReel) bootstrapFromStore() error {
node.Parent = p
node.Depth = p.Depth + 1
p.Children[frameID] = node
} else if !a.archiveMode && frame.Header.FrameNumber == start {
// treat it as pseudo-root
node.Depth = 0
a.logger.Info("non-archive: treating first loaded frame as pseudo-root",
zap.String("address", fmt.Sprintf("%x", a.address)),
zap.Uint64("frame_number", frame.Header.FrameNumber))
}
}
if a.root == nil {
if a.root == nil || (!a.archiveMode && frame.Header.FrameNumber == start) {
a.root = node
}
a.nodes[frameID] = node
a.framesByNumber[frame.Header.FrameNumber] = append(
a.framesByNumber[frame.Header.FrameNumber],
node,
)
a.framesByNumber[frame.Header.FrameNumber] =
append(a.framesByNumber[frame.Header.FrameNumber], node)
a.cache.Add(frameID, node)
prev = node
@ -1502,12 +1593,18 @@ func (a *AppTimeReel) bootstrapFromStore() error {
a.updateTreeMetrics()
if a.head != nil {
a.logger.Info(
"bootstrapped app reel from store",
a.logger.Info("bootstrapped app reel from store",
zap.String("address", fmt.Sprintf("%x", a.address)),
zap.Uint64("loaded_from", start),
zap.Uint64("loaded_to", a.head.Frame.Header.FrameNumber),
zap.Int("loaded_count", len(a.nodes)),
zap.Bool("archive_mode", a.archiveMode),
)
if !a.archiveMode && a.root != nil {
a.logger.Info("non-archive: accepting last 360 frames as valid chain",
zap.Uint64("pseudo_root_frame", a.root.Frame.Header.FrameNumber),
zap.Uint64("head_frame", a.head.Frame.Header.FrameNumber))
}
}
return nil
}
@ -1552,15 +1649,14 @@ func (a *AppTimeReel) persistCanonicalFrames(
if len(frames) == 0 {
return nil
}
txn, err := a.store.NewTransaction(false)
if err != nil {
return errors.Wrap(err, "persist canonical frames")
}
for _, f := range frames {
if err := a.materializeFunc(
txn,
f,
); err != nil {
if err := a.materializeFunc(txn, f); err != nil {
_ = txn.Abort()
return errors.Wrap(err, "persist canonical frames")
}
@ -1577,8 +1673,24 @@ func (a *AppTimeReel) persistCanonicalFrames(
return errors.Wrap(err, "persist canonical frames")
}
}
if err := txn.Commit(); err != nil {
return errors.Wrap(err, "persist canonical frames")
}
// prune old frames in non-archive mode
if !a.archiveMode && a.head != nil &&
a.head.Frame.Header.FrameNumber > maxTreeDepth {
oldestToKeep := a.head.Frame.Header.FrameNumber - maxTreeDepth + 1
if err := a.store.DeleteShardClockFrameRange(
a.address,
0,
oldestToKeep,
); err != nil {
a.logger.Error("unable to delete shard frame range",
zap.String("address", fmt.Sprintf("%x", a.address)),
zap.Error(err))
}
}
return nil
}

View File

@ -48,7 +48,7 @@ func TestAppTimeReel_BasicOperations(t *testing.T) {
logger, _ := zap.NewDevelopment()
address := []byte("test_app_address")
s := setupTestClockStore(t)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s, true)
require.NoError(t, err)
err = atr.Start()
@ -132,7 +132,7 @@ func TestAppTimeReel_WrongAddress(t *testing.T) {
logger, _ := zap.NewDevelopment()
address := []byte("correct_address")
s := setupTestClockStore(t)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s, true)
require.NoError(t, err)
err = atr.Start()
@ -163,7 +163,7 @@ func TestAppTimeReel_Equivocation(t *testing.T) {
logger, _ := zap.NewDevelopment()
address := []byte("test_app_address")
s := setupTestClockStore(t)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s, true)
require.NoError(t, err)
err = atr.Start()
@ -265,7 +265,7 @@ func TestAppTimeReel_Fork(t *testing.T) {
logger, _ := zap.NewDevelopment()
address := []byte("test_app_address")
s := setupTestClockStore(t)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s, true)
require.NoError(t, err)
err = atr.Start()
@ -339,7 +339,7 @@ func TestAppTimeReel_ParentValidation(t *testing.T) {
logger, _ := zap.NewDevelopment()
address := []byte("test_app_address")
s := setupTestClockStore(t)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s, true)
require.NoError(t, err)
err = atr.Start()
@ -406,7 +406,7 @@ func TestAppTimeReel_ForkDetection(t *testing.T) {
logger, _ := zap.NewDevelopment()
address := []byte("test_app_address")
s := setupTestClockStore(t)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s, true)
require.NoError(t, err)
err = atr.Start()
@ -489,7 +489,7 @@ func TestAppTimeReel_ForkChoice_MoreSignatures(t *testing.T) {
logger, _ := zap.NewDevelopment()
address := []byte("test_app_address")
s := setupTestClockStore(t)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s, true)
require.NoError(t, err)
err = atr.Start()
@ -600,7 +600,7 @@ func TestAppTimeReel_ForkChoice_NoReplacement(t *testing.T) {
logger, _ := zap.NewDevelopment()
address := []byte("test_app_address")
s := setupTestClockStore(t)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s, true)
require.NoError(t, err)
err = atr.Start()
@ -711,7 +711,7 @@ func TestAppTimeReel_DeepForkChoice_ReverseInsertion(t *testing.T) {
address := []byte("test_app_address")
reg := createTestProverRegistry(false)
s := setupTestClockStore(t)
atr, err := NewAppTimeReel(logger, address, reg, s)
atr, err := NewAppTimeReel(logger, address, reg, s, true)
require.NoError(t, err)
err = atr.Start()
@ -1044,7 +1044,7 @@ func TestAppTimeReel_MultipleProvers(t *testing.T) {
logger, _ := zap.NewDevelopment()
address := []byte("test_app_address")
s := setupTestClockStore(t)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s, true)
require.NoError(t, err)
err = atr.Start()
@ -1172,7 +1172,7 @@ func TestAppTimeReel_ComplexForkWithOutOfOrderInsertion(t *testing.T) {
proverRegistry.On("GetOrderedProvers", mock.Anything, mock.Anything).Return(nil, errors.New("unknown parent selector"))
s := setupTestClockStore(t)
atr, err := NewAppTimeReel(logger, address, proverRegistry, s)
atr, err := NewAppTimeReel(logger, address, proverRegistry, s, true)
require.NoError(t, err)
err = atr.Start()
@ -1389,7 +1389,7 @@ func TestAppTimeReel_TreePruning(t *testing.T) {
logger, _ := zap.NewDevelopment()
address := []byte("test_app_address")
s := setupTestClockStore(t)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s, true)
require.NoError(t, err)
err = atr.Start()
@ -1478,7 +1478,7 @@ func TestAppTimeReel_TreePruningWithForks(t *testing.T) {
logger, _ := zap.NewDevelopment()
address := []byte("test_app_address")
s := setupTestClockStore(t)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s, true)
require.NoError(t, err)
err = atr.Start()
@ -1598,7 +1598,7 @@ func TestAppTimeReel_ForkChoiceInsertionOrder(t *testing.T) {
address := []byte("test_app_address")
reg := createTestProverRegistry(false)
s := setupTestClockStore(t)
atr, err := NewAppTimeReel(logger, address, reg, s)
atr, err := NewAppTimeReel(logger, address, reg, s, true)
require.NoError(t, err)
err = atr.Start()
@ -1814,7 +1814,7 @@ func TestAppTimeReel_ForkEventsWithReplay(t *testing.T) {
logger, _ := zap.NewDevelopment()
address := []byte("test_app_address")
s := setupTestClockStore(t)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s, true)
require.NoError(t, err)
err = atr.Start()
@ -1997,7 +1997,7 @@ func TestAppTimeReel_ComprehensiveEquivocation(t *testing.T) {
logger, _ := zap.NewDevelopment()
address := []byte("test_app_address")
s := setupTestClockStore(t)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s)
atr, err := NewAppTimeReel(logger, address, createTestProverRegistry(true), s, true)
require.NoError(t, err)
err = atr.Start()
@ -2159,7 +2159,7 @@ func TestAppTimeReel_ProverRegistryForkChoice(t *testing.T) {
proverRegistry.On("GetOrderedProvers", mock.Anything, mock.Anything).Return(nil, errors.New("unknown parent selector"))
s := setupTestClockStore(t)
atr, err := NewAppTimeReel(logger, address, proverRegistry, s)
atr, err := NewAppTimeReel(logger, address, proverRegistry, s, true)
require.NoError(t, err)
err = atr.Start()
@ -2290,7 +2290,7 @@ func TestAppTimeReel_ProverRegistryWithOrderedProvers(t *testing.T) {
proverRegistry.On("GetOrderedProvers", mock.Anything, mock.Anything).Return([][]byte{[]byte("default_prover")}, nil)
s := setupTestClockStore(t)
atr, err := NewAppTimeReel(logger, address, proverRegistry, s)
atr, err := NewAppTimeReel(logger, address, proverRegistry, s, true)
require.NoError(t, err)
err = atr.Start()

View File

@ -446,7 +446,7 @@ func createTestAppTimeReel(
) *consensustime.AppTimeReel {
logger := zap.NewNop()
proverRegistry := createTestProverRegistry()
appTimeReel, err := consensustime.NewAppTimeReel(logger, appAddress, proverRegistry, clockStore)
appTimeReel, err := consensustime.NewAppTimeReel(logger, appAddress, proverRegistry, clockStore, true)
require.NoError(t, err)
return appTimeReel
}

View File

@ -2116,6 +2116,10 @@ func (f *FrameVote) ToCanonicalBytes() ([]byte, error) {
return nil, errors.Wrap(err, "to canonical bytes")
}
if err := binary.Write(buf, binary.BigEndian, f.Timestamp); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
// Write public_key_signature_bls48581
if f.PublicKeySignatureBls48581 != nil {
sigBytes, err := f.PublicKeySignatureBls48581.ToCanonicalBytes()
@ -2178,6 +2182,11 @@ func (f *FrameVote) FromCanonicalBytes(data []byte) error {
}
f.Approve = approve != 0
// Read timestamp
if err := binary.Read(buf, binary.BigEndian, &f.Timestamp); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
// Read public_key_signature_bls48581
var sigLen uint32
if err := binary.Read(buf, binary.BigEndian, &sigLen); err != nil {

File diff suppressed because it is too large Load Diff

View File

@ -133,6 +133,40 @@ func local_request_GlobalService_GetGlobalShards_0(ctx context.Context, marshale
}
func request_GlobalService_GetLockedAddresses_0(ctx context.Context, marshaler runtime.Marshaler, client GlobalServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq GetLockedAddressesRequest
var metadata runtime.ServerMetadata
newReader, berr := utilities.IOReaderFactory(req.Body)
if berr != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr)
}
if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.GetLockedAddresses(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_GlobalService_GetLockedAddresses_0(ctx context.Context, marshaler runtime.Marshaler, server GlobalServiceServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq GetLockedAddressesRequest
var metadata runtime.ServerMetadata
newReader, berr := utilities.IOReaderFactory(req.Body)
if berr != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr)
}
if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := server.GetLockedAddresses(ctx, &protoReq)
return msg, metadata, err
}
func request_AppShardService_GetAppShardFrame_0(ctx context.Context, marshaler runtime.Marshaler, client AppShardServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq GetAppShardFrameRequest
var metadata runtime.ServerMetadata
@ -980,6 +1014,31 @@ func RegisterGlobalServiceHandlerServer(ctx context.Context, mux *runtime.ServeM
})
mux.Handle("POST", pattern_GlobalService_GetLockedAddresses_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/quilibrium.node.global.pb.GlobalService/GetLockedAddresses", runtime.WithHTTPPathPattern("/quilibrium.node.global.pb.GlobalService/GetLockedAddresses"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_GlobalService_GetLockedAddresses_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_GlobalService_GetLockedAddresses_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
@ -1646,6 +1705,28 @@ func RegisterGlobalServiceHandlerClient(ctx context.Context, mux *runtime.ServeM
})
mux.Handle("POST", pattern_GlobalService_GetLockedAddresses_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/quilibrium.node.global.pb.GlobalService/GetLockedAddresses", runtime.WithHTTPPathPattern("/quilibrium.node.global.pb.GlobalService/GetLockedAddresses"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_GlobalService_GetLockedAddresses_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_GlobalService_GetLockedAddresses_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
@ -1655,6 +1736,8 @@ var (
pattern_GlobalService_GetAppShards_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"quilibrium.node.global.pb.GlobalService", "GetAppShards"}, ""))
pattern_GlobalService_GetGlobalShards_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"quilibrium.node.global.pb.GlobalService", "GetGlobalShards"}, ""))
pattern_GlobalService_GetLockedAddresses_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"quilibrium.node.global.pb.GlobalService", "GetLockedAddresses"}, ""))
)
var (
@ -1663,6 +1746,8 @@ var (
forward_GlobalService_GetAppShards_0 = runtime.ForwardResponseMessage
forward_GlobalService_GetGlobalShards_0 = runtime.ForwardResponseMessage
forward_GlobalService_GetLockedAddresses_0 = runtime.ForwardResponseMessage
)
// RegisterAppShardServiceHandlerFromEndpoint is same as RegisterAppShardServiceHandler but

View File

@ -211,8 +211,10 @@ message FrameVote {
bytes proposer = 2;
// Whether the voter approves the frame
bool approve = 3;
// The timestamp when the vote was created
int64 timestamp = 4;
// The BLS signature with the voter's address
quilibrium.node.keys.pb.BLS48581AddressedSignature public_key_signature_bls48581 = 4;
quilibrium.node.keys.pb.BLS48581AddressedSignature public_key_signature_bls48581 = 5;
}
message FrameConfirmation {
@ -284,10 +286,34 @@ message GetGlobalShardsResponse {
repeated bytes commitment = 2;
}
message GetLockedAddressesRequest {
// The shard address identifier, in split L2||nibbles form
bytes shard_address = 1;
// The frame number of the locked address request, to disambiguate early/late
// requests
uint64 frame_number = 2;
}
message LockedTransaction {
// The hash of the locked transaction
bytes transaction_hash = 1;
// The shard address identifier the lock covers, in split L2||nibbles form
repeated bytes shard_addresses = 2;
// Whether all shard addresses impacted have committed to this value
bool committed = 3;
// Whether the proposed frames had a winning weight witness
bool filled = 4;
}
message GetLockedAddressesResponse {
repeated LockedTransaction transactions = 1;
}
service GlobalService {
rpc GetGlobalFrame (GetGlobalFrameRequest) returns (GlobalFrameResponse);
rpc GetAppShards(GetAppShardsRequest) returns (GetAppShardsResponse);
rpc GetGlobalShards(GetGlobalShardsRequest) returns (GetGlobalShardsResponse);
rpc GetLockedAddresses(GetLockedAddressesRequest) returns (GetLockedAddressesResponse);
}
service AppShardService {
@ -532,4 +558,4 @@ service DispatchService {
// Synchronize dispatch data
rpc Sync(quilibrium.node.channel.pb.DispatchSyncRequest)
returns (quilibrium.node.channel.pb.DispatchSyncResponse);
}
}

View File

@ -20,9 +20,10 @@ import (
const _ = grpc.SupportPackageIsVersion7
const (
GlobalService_GetGlobalFrame_FullMethodName = "/quilibrium.node.global.pb.GlobalService/GetGlobalFrame"
GlobalService_GetAppShards_FullMethodName = "/quilibrium.node.global.pb.GlobalService/GetAppShards"
GlobalService_GetGlobalShards_FullMethodName = "/quilibrium.node.global.pb.GlobalService/GetGlobalShards"
GlobalService_GetGlobalFrame_FullMethodName = "/quilibrium.node.global.pb.GlobalService/GetGlobalFrame"
GlobalService_GetAppShards_FullMethodName = "/quilibrium.node.global.pb.GlobalService/GetAppShards"
GlobalService_GetGlobalShards_FullMethodName = "/quilibrium.node.global.pb.GlobalService/GetGlobalShards"
GlobalService_GetLockedAddresses_FullMethodName = "/quilibrium.node.global.pb.GlobalService/GetLockedAddresses"
)
// GlobalServiceClient is the client API for GlobalService service.
@ -32,6 +33,7 @@ type GlobalServiceClient interface {
GetGlobalFrame(ctx context.Context, in *GetGlobalFrameRequest, opts ...grpc.CallOption) (*GlobalFrameResponse, error)
GetAppShards(ctx context.Context, in *GetAppShardsRequest, opts ...grpc.CallOption) (*GetAppShardsResponse, error)
GetGlobalShards(ctx context.Context, in *GetGlobalShardsRequest, opts ...grpc.CallOption) (*GetGlobalShardsResponse, error)
GetLockedAddresses(ctx context.Context, in *GetLockedAddressesRequest, opts ...grpc.CallOption) (*GetLockedAddressesResponse, error)
}
type globalServiceClient struct {
@ -69,6 +71,15 @@ func (c *globalServiceClient) GetGlobalShards(ctx context.Context, in *GetGlobal
return out, nil
}
func (c *globalServiceClient) GetLockedAddresses(ctx context.Context, in *GetLockedAddressesRequest, opts ...grpc.CallOption) (*GetLockedAddressesResponse, error) {
out := new(GetLockedAddressesResponse)
err := c.cc.Invoke(ctx, GlobalService_GetLockedAddresses_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// GlobalServiceServer is the server API for GlobalService service.
// All implementations must embed UnimplementedGlobalServiceServer
// for forward compatibility
@ -76,6 +87,7 @@ type GlobalServiceServer interface {
GetGlobalFrame(context.Context, *GetGlobalFrameRequest) (*GlobalFrameResponse, error)
GetAppShards(context.Context, *GetAppShardsRequest) (*GetAppShardsResponse, error)
GetGlobalShards(context.Context, *GetGlobalShardsRequest) (*GetGlobalShardsResponse, error)
GetLockedAddresses(context.Context, *GetLockedAddressesRequest) (*GetLockedAddressesResponse, error)
mustEmbedUnimplementedGlobalServiceServer()
}
@ -92,6 +104,9 @@ func (UnimplementedGlobalServiceServer) GetAppShards(context.Context, *GetAppSha
func (UnimplementedGlobalServiceServer) GetGlobalShards(context.Context, *GetGlobalShardsRequest) (*GetGlobalShardsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetGlobalShards not implemented")
}
func (UnimplementedGlobalServiceServer) GetLockedAddresses(context.Context, *GetLockedAddressesRequest) (*GetLockedAddressesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetLockedAddresses not implemented")
}
func (UnimplementedGlobalServiceServer) mustEmbedUnimplementedGlobalServiceServer() {}
// UnsafeGlobalServiceServer may be embedded to opt out of forward compatibility for this service.
@ -159,6 +174,24 @@ func _GlobalService_GetGlobalShards_Handler(srv interface{}, ctx context.Context
return interceptor(ctx, in, info, handler)
}
func _GlobalService_GetLockedAddresses_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetLockedAddressesRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServiceServer).GetLockedAddresses(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: GlobalService_GetLockedAddresses_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServiceServer).GetLockedAddresses(ctx, req.(*GetLockedAddressesRequest))
}
return interceptor(ctx, in, info, handler)
}
// GlobalService_ServiceDesc is the grpc.ServiceDesc for GlobalService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@ -178,6 +211,10 @@ var GlobalService_ServiceDesc = grpc.ServiceDesc{
MethodName: "GetGlobalShards",
Handler: _GlobalService_GetGlobalShards_Handler,
},
{
MethodName: "GetLockedAddresses",
Handler: _GlobalService_GetLockedAddresses_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "global.proto",