mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 10:27:26 +08:00
v2.1.0.18 preview
This commit is contained in:
parent
e26c40a6d9
commit
370813d59d
@ -43,7 +43,7 @@ func FormatVersion(version []byte) string {
|
||||
}
|
||||
|
||||
func GetPatchNumber() byte {
|
||||
return 0x11
|
||||
return 0x12
|
||||
}
|
||||
|
||||
func GetRCNumber() byte {
|
||||
|
||||
@ -583,48 +583,74 @@ func NewAppConsensusEngine(
|
||||
initializeCertifiedGenesis(true)
|
||||
}
|
||||
} else {
|
||||
stateRestored := false
|
||||
qc, err := engine.clockStore.GetQuorumCertificate(
|
||||
engine.appAddress,
|
||||
latest.FinalizedRank,
|
||||
)
|
||||
if err != nil || qc.GetFrameNumber() == 0 {
|
||||
initializeCertifiedGenesis(true)
|
||||
} else {
|
||||
frame, _, err := engine.clockStore.GetShardClockFrame(
|
||||
if err == nil && qc.GetFrameNumber() != 0 {
|
||||
frame, _, frameErr := engine.clockStore.GetShardClockFrame(
|
||||
engine.appAddress,
|
||||
qc.GetFrameNumber(),
|
||||
false,
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
if frameErr != nil {
|
||||
// Frame data was deleted (e.g., non-archive mode cleanup) but
|
||||
// QC/consensus state still exists. Re-initialize genesis and
|
||||
// let sync recover the state.
|
||||
logger.Warn(
|
||||
"frame missing for finalized QC, re-initializing genesis",
|
||||
zap.Uint64("finalized_rank", latest.FinalizedRank),
|
||||
zap.Uint64("qc_frame_number", qc.GetFrameNumber()),
|
||||
zap.Error(frameErr),
|
||||
)
|
||||
} else {
|
||||
parentFrame, _, parentFrameErr := engine.clockStore.GetShardClockFrame(
|
||||
engine.appAddress,
|
||||
qc.GetFrameNumber()-1,
|
||||
false,
|
||||
)
|
||||
if parentFrameErr != nil {
|
||||
// Parent frame missing - same recovery path
|
||||
logger.Warn(
|
||||
"parent frame missing for finalized QC, re-initializing genesis",
|
||||
zap.Uint64("finalized_rank", latest.FinalizedRank),
|
||||
zap.Uint64("qc_frame_number", qc.GetFrameNumber()),
|
||||
zap.Error(parentFrameErr),
|
||||
)
|
||||
} else {
|
||||
parentQC, parentQCErr := engine.clockStore.GetQuorumCertificate(
|
||||
engine.appAddress,
|
||||
parentFrame.GetRank(),
|
||||
)
|
||||
if parentQCErr != nil {
|
||||
// Parent QC missing - same recovery path
|
||||
logger.Warn(
|
||||
"parent QC missing, re-initializing genesis",
|
||||
zap.Uint64("finalized_rank", latest.FinalizedRank),
|
||||
zap.Uint64("parent_rank", parentFrame.GetRank()),
|
||||
zap.Error(parentQCErr),
|
||||
)
|
||||
} else {
|
||||
state = &models.CertifiedState[*protobufs.AppShardFrame]{
|
||||
State: &models.State[*protobufs.AppShardFrame]{
|
||||
Rank: frame.GetRank(),
|
||||
Identifier: frame.Identity(),
|
||||
ProposerID: frame.Source(),
|
||||
ParentQuorumCertificate: parentQC,
|
||||
Timestamp: frame.GetTimestamp(),
|
||||
State: &frame,
|
||||
},
|
||||
CertifyingQuorumCertificate: qc,
|
||||
}
|
||||
pending = engine.getPendingProposals(frame.Header.FrameNumber)
|
||||
stateRestored = true
|
||||
}
|
||||
}
|
||||
}
|
||||
parentFrame, _, err := engine.clockStore.GetShardClockFrame(
|
||||
engine.appAddress,
|
||||
qc.GetFrameNumber()-1,
|
||||
false,
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
parentQC, err := engine.clockStore.GetQuorumCertificate(
|
||||
engine.appAddress,
|
||||
parentFrame.GetRank(),
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
state = &models.CertifiedState[*protobufs.AppShardFrame]{
|
||||
State: &models.State[*protobufs.AppShardFrame]{
|
||||
Rank: frame.GetRank(),
|
||||
Identifier: frame.Identity(),
|
||||
ProposerID: frame.Source(),
|
||||
ParentQuorumCertificate: parentQC,
|
||||
Timestamp: frame.GetTimestamp(),
|
||||
State: &frame,
|
||||
},
|
||||
CertifyingQuorumCertificate: qc,
|
||||
}
|
||||
pending = engine.getPendingProposals(frame.Header.FrameNumber)
|
||||
}
|
||||
if !stateRestored {
|
||||
initializeCertifiedGenesis(true)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -278,6 +278,12 @@ func (e *GlobalConsensusEngine) eventDistributorLoop(
|
||||
|
||||
const pendingFilterGraceFrames = 720
|
||||
|
||||
// proposalTimeoutFrames is the number of frames to wait for a join proposal
|
||||
// to appear on-chain before clearing the worker's filter. If a proposal is
|
||||
// submitted but never lands (e.g., network issues, not included in frame),
|
||||
// we should reset the filter so the worker can try again.
|
||||
const proposalTimeoutFrames = 10
|
||||
|
||||
func (e *GlobalConsensusEngine) emitCoverageEvent(
|
||||
eventType typesconsensus.ControlEventType,
|
||||
data *typesconsensus.CoverageEventData,
|
||||
@ -550,12 +556,20 @@ func (e *GlobalConsensusEngine) reconcileWorkerAllocations(
|
||||
}
|
||||
|
||||
seenFilters := make(map[string]struct{})
|
||||
rejectedFilters := make(map[string]struct{})
|
||||
if self != nil {
|
||||
for _, alloc := range self.Allocations {
|
||||
if len(alloc.ConfirmationFilter) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Track rejected allocations separately - we need to clear their
|
||||
// workers immediately without waiting for the grace period
|
||||
if alloc.Status == typesconsensus.ProverStatusRejected {
|
||||
rejectedFilters[string(alloc.ConfirmationFilter)] = struct{}{}
|
||||
continue
|
||||
}
|
||||
|
||||
key := string(alloc.ConfirmationFilter)
|
||||
worker, ok := filtersToWorkers[key]
|
||||
if !ok {
|
||||
@ -604,11 +618,30 @@ func (e *GlobalConsensusEngine) reconcileWorkerAllocations(
|
||||
continue
|
||||
}
|
||||
|
||||
// Immediately clear workers whose allocations were rejected
|
||||
// (no grace period needed - the rejection is definitive)
|
||||
if _, rejected := rejectedFilters[string(worker.Filter)]; rejected {
|
||||
worker.Filter = nil
|
||||
worker.Allocated = false
|
||||
worker.PendingFilterFrame = 0
|
||||
if err := e.workerManager.RegisterWorker(worker); err != nil {
|
||||
e.logger.Warn(
|
||||
"failed to clear rejected worker filter",
|
||||
zap.Uint("core_id", worker.CoreId),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if worker.PendingFilterFrame != 0 {
|
||||
if frameNumber <= worker.PendingFilterFrame {
|
||||
continue
|
||||
}
|
||||
if frameNumber-worker.PendingFilterFrame < pendingFilterGraceFrames {
|
||||
// Worker has a filter set from a proposal, but no on-chain allocation
|
||||
// exists for this filter. Use shorter timeout since the proposal
|
||||
// likely didn't land at all.
|
||||
if frameNumber-worker.PendingFilterFrame < proposalTimeoutFrames {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
413
node/consensus/global/event_distributor_test.go
Normal file
413
node/consensus/global/event_distributor_test.go
Normal file
@ -0,0 +1,413 @@
|
||||
package global
|
||||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
typesconsensus "source.quilibrium.com/quilibrium/monorepo/types/consensus"
|
||||
"source.quilibrium.com/quilibrium/monorepo/types/store"
|
||||
"source.quilibrium.com/quilibrium/monorepo/types/worker"
|
||||
)
|
||||
|
||||
// mockWorkerManager is a simple mock for testing reconcileWorkerAllocations
|
||||
type mockWorkerManager struct {
|
||||
workers map[uint]*store.WorkerInfo
|
||||
}
|
||||
|
||||
func newMockWorkerManager() *mockWorkerManager {
|
||||
return &mockWorkerManager{
|
||||
workers: make(map[uint]*store.WorkerInfo),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockWorkerManager) Start(ctx context.Context) error { return nil }
|
||||
func (m *mockWorkerManager) Stop() error { return nil }
|
||||
func (m *mockWorkerManager) AllocateWorker(coreId uint, filter []byte) error {
|
||||
if w, ok := m.workers[coreId]; ok {
|
||||
w.Filter = slices.Clone(filter)
|
||||
w.Allocated = true
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (m *mockWorkerManager) DeallocateWorker(coreId uint) error {
|
||||
if w, ok := m.workers[coreId]; ok {
|
||||
w.Filter = nil
|
||||
w.Allocated = false
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (m *mockWorkerManager) CheckWorkersConnected() ([]uint, error) { return nil, nil }
|
||||
func (m *mockWorkerManager) GetWorkerIdByFilter(filter []byte) (uint, error) {
|
||||
for _, w := range m.workers {
|
||||
if string(w.Filter) == string(filter) {
|
||||
return w.CoreId, nil
|
||||
}
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
func (m *mockWorkerManager) GetFilterByWorkerId(coreId uint) ([]byte, error) {
|
||||
if w, ok := m.workers[coreId]; ok {
|
||||
return w.Filter, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
func (m *mockWorkerManager) RegisterWorker(info *store.WorkerInfo) error {
|
||||
m.workers[info.CoreId] = info
|
||||
return nil
|
||||
}
|
||||
func (m *mockWorkerManager) ProposeAllocations(coreIds []uint, filters [][]byte) error {
|
||||
return nil
|
||||
}
|
||||
func (m *mockWorkerManager) DecideAllocations(reject [][]byte, confirm [][]byte) error {
|
||||
return nil
|
||||
}
|
||||
func (m *mockWorkerManager) RangeWorkers() ([]*store.WorkerInfo, error) {
|
||||
result := make([]*store.WorkerInfo, 0, len(m.workers))
|
||||
for _, w := range m.workers {
|
||||
result = append(result, w)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
var _ worker.WorkerManager = (*mockWorkerManager)(nil)
|
||||
|
||||
func TestReconcileWorkerAllocations_RejectedAllocationClearsFilter(t *testing.T) {
|
||||
logger := zap.NewNop()
|
||||
wm := newMockWorkerManager()
|
||||
|
||||
// Create a worker with an assigned filter (simulating a pending join)
|
||||
filter1 := []byte("shard-filter-1")
|
||||
worker1 := &store.WorkerInfo{
|
||||
CoreId: 1,
|
||||
Filter: slices.Clone(filter1),
|
||||
Allocated: false,
|
||||
PendingFilterFrame: 100, // join was proposed at frame 100
|
||||
}
|
||||
require.NoError(t, wm.RegisterWorker(worker1))
|
||||
|
||||
// Create the engine with just the worker manager
|
||||
engine := &GlobalConsensusEngine{
|
||||
logger: logger,
|
||||
workerManager: wm,
|
||||
}
|
||||
|
||||
// Case 1: Allocation is rejected - filter should be cleared
|
||||
selfWithRejected := &typesconsensus.ProverInfo{
|
||||
Address: []byte("prover-address"),
|
||||
Allocations: []typesconsensus.ProverAllocationInfo{
|
||||
{
|
||||
Status: typesconsensus.ProverStatusRejected,
|
||||
ConfirmationFilter: filter1,
|
||||
JoinFrameNumber: 100,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Run reconciliation at frame 200 (past the join frame but within grace period)
|
||||
engine.reconcileWorkerAllocations(200, selfWithRejected)
|
||||
|
||||
// Verify the worker's filter was cleared because the allocation is rejected
|
||||
workers, err := wm.RangeWorkers()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, workers, 1)
|
||||
assert.Nil(t, workers[0].Filter, "rejected allocation should cause filter to be cleared")
|
||||
assert.False(t, workers[0].Allocated, "rejected allocation should not be allocated")
|
||||
assert.Equal(t, uint64(0), workers[0].PendingFilterFrame, "pending frame should be cleared")
|
||||
}
|
||||
|
||||
func TestReconcileWorkerAllocations_ActiveAllocationKeepsFilter(t *testing.T) {
|
||||
logger := zap.NewNop()
|
||||
wm := newMockWorkerManager()
|
||||
|
||||
// Create a worker with an assigned filter
|
||||
filter1 := []byte("shard-filter-1")
|
||||
worker1 := &store.WorkerInfo{
|
||||
CoreId: 1,
|
||||
Filter: slices.Clone(filter1),
|
||||
Allocated: true,
|
||||
PendingFilterFrame: 0,
|
||||
}
|
||||
require.NoError(t, wm.RegisterWorker(worker1))
|
||||
|
||||
engine := &GlobalConsensusEngine{
|
||||
logger: logger,
|
||||
workerManager: wm,
|
||||
}
|
||||
|
||||
// Case 2: Allocation is active - filter should be kept
|
||||
selfWithActive := &typesconsensus.ProverInfo{
|
||||
Address: []byte("prover-address"),
|
||||
Allocations: []typesconsensus.ProverAllocationInfo{
|
||||
{
|
||||
Status: typesconsensus.ProverStatusActive,
|
||||
ConfirmationFilter: filter1,
|
||||
JoinFrameNumber: 100,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
engine.reconcileWorkerAllocations(200, selfWithActive)
|
||||
|
||||
workers, err := wm.RangeWorkers()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, workers, 1)
|
||||
assert.Equal(t, filter1, workers[0].Filter, "active allocation should keep filter")
|
||||
assert.True(t, workers[0].Allocated, "active allocation should be allocated")
|
||||
}
|
||||
|
||||
func TestReconcileWorkerAllocations_JoiningAllocationKeepsFilter(t *testing.T) {
|
||||
logger := zap.NewNop()
|
||||
wm := newMockWorkerManager()
|
||||
|
||||
// Create a worker with an assigned filter
|
||||
filter1 := []byte("shard-filter-1")
|
||||
worker1 := &store.WorkerInfo{
|
||||
CoreId: 1,
|
||||
Filter: slices.Clone(filter1),
|
||||
Allocated: false,
|
||||
PendingFilterFrame: 100,
|
||||
}
|
||||
require.NoError(t, wm.RegisterWorker(worker1))
|
||||
|
||||
engine := &GlobalConsensusEngine{
|
||||
logger: logger,
|
||||
workerManager: wm,
|
||||
}
|
||||
|
||||
// Case 3: Allocation is joining - filter should be kept
|
||||
selfWithJoining := &typesconsensus.ProverInfo{
|
||||
Address: []byte("prover-address"),
|
||||
Allocations: []typesconsensus.ProverAllocationInfo{
|
||||
{
|
||||
Status: typesconsensus.ProverStatusJoining,
|
||||
ConfirmationFilter: filter1,
|
||||
JoinFrameNumber: 100,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
engine.reconcileWorkerAllocations(200, selfWithJoining)
|
||||
|
||||
workers, err := wm.RangeWorkers()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, workers, 1)
|
||||
assert.Equal(t, filter1, workers[0].Filter, "joining allocation should keep filter")
|
||||
assert.False(t, workers[0].Allocated, "joining allocation should not be allocated yet")
|
||||
assert.Equal(t, uint64(100), workers[0].PendingFilterFrame, "pending frame should be join frame")
|
||||
}
|
||||
|
||||
func TestReconcileWorkerAllocations_MultipleWorkersWithMixedStates(t *testing.T) {
|
||||
logger := zap.NewNop()
|
||||
wm := newMockWorkerManager()
|
||||
|
||||
// Create workers with different filters
|
||||
filter1 := []byte("shard-filter-1")
|
||||
filter2 := []byte("shard-filter-2")
|
||||
filter3 := []byte("shard-filter-3")
|
||||
|
||||
worker1 := &store.WorkerInfo{
|
||||
CoreId: 1,
|
||||
Filter: slices.Clone(filter1),
|
||||
Allocated: true,
|
||||
PendingFilterFrame: 0,
|
||||
}
|
||||
worker2 := &store.WorkerInfo{
|
||||
CoreId: 2,
|
||||
Filter: slices.Clone(filter2),
|
||||
Allocated: false,
|
||||
PendingFilterFrame: 100,
|
||||
}
|
||||
worker3 := &store.WorkerInfo{
|
||||
CoreId: 3,
|
||||
Filter: slices.Clone(filter3),
|
||||
Allocated: false,
|
||||
PendingFilterFrame: 100,
|
||||
}
|
||||
require.NoError(t, wm.RegisterWorker(worker1))
|
||||
require.NoError(t, wm.RegisterWorker(worker2))
|
||||
require.NoError(t, wm.RegisterWorker(worker3))
|
||||
|
||||
engine := &GlobalConsensusEngine{
|
||||
logger: logger,
|
||||
workerManager: wm,
|
||||
}
|
||||
|
||||
// Mixed states: one active, one joining, one rejected
|
||||
self := &typesconsensus.ProverInfo{
|
||||
Address: []byte("prover-address"),
|
||||
Allocations: []typesconsensus.ProverAllocationInfo{
|
||||
{
|
||||
Status: typesconsensus.ProverStatusActive,
|
||||
ConfirmationFilter: filter1,
|
||||
JoinFrameNumber: 50,
|
||||
},
|
||||
{
|
||||
Status: typesconsensus.ProverStatusJoining,
|
||||
ConfirmationFilter: filter2,
|
||||
JoinFrameNumber: 100,
|
||||
},
|
||||
{
|
||||
Status: typesconsensus.ProverStatusRejected,
|
||||
ConfirmationFilter: filter3,
|
||||
JoinFrameNumber: 100,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
engine.reconcileWorkerAllocations(200, self)
|
||||
|
||||
workers, err := wm.RangeWorkers()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, workers, 3)
|
||||
|
||||
// Find each worker by core ID
|
||||
workerMap := make(map[uint]*store.WorkerInfo)
|
||||
for _, w := range workers {
|
||||
workerMap[w.CoreId] = w
|
||||
}
|
||||
|
||||
// Worker 1: active allocation - should keep filter and be allocated
|
||||
w1 := workerMap[1]
|
||||
assert.Equal(t, filter1, w1.Filter, "active worker should keep filter")
|
||||
assert.True(t, w1.Allocated, "active worker should be allocated")
|
||||
|
||||
// Worker 2: joining allocation - should keep filter but not be allocated
|
||||
w2 := workerMap[2]
|
||||
assert.Equal(t, filter2, w2.Filter, "joining worker should keep filter")
|
||||
assert.False(t, w2.Allocated, "joining worker should not be allocated")
|
||||
|
||||
// Worker 3: rejected allocation - should have filter cleared
|
||||
w3 := workerMap[3]
|
||||
assert.Nil(t, w3.Filter, "rejected worker should have filter cleared")
|
||||
assert.False(t, w3.Allocated, "rejected worker should not be allocated")
|
||||
}
|
||||
|
||||
func TestReconcileWorkerAllocations_RejectedWithNoFreeWorker(t *testing.T) {
|
||||
logger := zap.NewNop()
|
||||
wm := newMockWorkerManager()
|
||||
|
||||
// Create a worker with no filter initially
|
||||
worker1 := &store.WorkerInfo{
|
||||
CoreId: 1,
|
||||
Filter: nil,
|
||||
Allocated: false,
|
||||
PendingFilterFrame: 0,
|
||||
}
|
||||
require.NoError(t, wm.RegisterWorker(worker1))
|
||||
|
||||
engine := &GlobalConsensusEngine{
|
||||
logger: logger,
|
||||
workerManager: wm,
|
||||
}
|
||||
|
||||
// A rejected allocation shouldn't try to assign a worker
|
||||
filter1 := []byte("shard-filter-1")
|
||||
self := &typesconsensus.ProverInfo{
|
||||
Address: []byte("prover-address"),
|
||||
Allocations: []typesconsensus.ProverAllocationInfo{
|
||||
{
|
||||
Status: typesconsensus.ProverStatusRejected,
|
||||
ConfirmationFilter: filter1,
|
||||
JoinFrameNumber: 100,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
engine.reconcileWorkerAllocations(200, self)
|
||||
|
||||
workers, err := wm.RangeWorkers()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, workers, 1)
|
||||
|
||||
// The free worker should remain free - rejected allocation should not consume it
|
||||
assert.Nil(t, workers[0].Filter, "free worker should remain free when only rejected allocations exist")
|
||||
assert.False(t, workers[0].Allocated, "free worker should not be allocated")
|
||||
}
|
||||
|
||||
func TestReconcileWorkerAllocations_UnconfirmedProposalClearsAfterTimeout(t *testing.T) {
|
||||
logger := zap.NewNop()
|
||||
wm := newMockWorkerManager()
|
||||
|
||||
// Create a worker with a filter set from a join proposal that never landed
|
||||
filter1 := []byte("shard-filter-1")
|
||||
worker1 := &store.WorkerInfo{
|
||||
CoreId: 1,
|
||||
Filter: slices.Clone(filter1),
|
||||
Allocated: false,
|
||||
PendingFilterFrame: 100, // proposal was made at frame 100
|
||||
}
|
||||
require.NoError(t, wm.RegisterWorker(worker1))
|
||||
|
||||
engine := &GlobalConsensusEngine{
|
||||
logger: logger,
|
||||
workerManager: wm,
|
||||
}
|
||||
|
||||
// Prover has no allocations at all - the proposal never landed on-chain
|
||||
self := &typesconsensus.ProverInfo{
|
||||
Address: []byte("prover-address"),
|
||||
Allocations: []typesconsensus.ProverAllocationInfo{},
|
||||
}
|
||||
|
||||
// At frame 105 (5 frames after proposal), filter should NOT be cleared yet
|
||||
engine.reconcileWorkerAllocations(105, self)
|
||||
|
||||
workers, err := wm.RangeWorkers()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, workers, 1)
|
||||
assert.Equal(t, filter1, workers[0].Filter, "filter should be kept within timeout window")
|
||||
assert.Equal(t, uint64(100), workers[0].PendingFilterFrame, "pending frame should be preserved")
|
||||
|
||||
// At frame 111 (11 frames after proposal, past the 10 frame timeout), filter SHOULD be cleared
|
||||
engine.reconcileWorkerAllocations(111, self)
|
||||
|
||||
workers, err = wm.RangeWorkers()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, workers, 1)
|
||||
assert.Nil(t, workers[0].Filter, "filter should be cleared after proposal timeout")
|
||||
assert.False(t, workers[0].Allocated, "worker should not be allocated")
|
||||
assert.Equal(t, uint64(0), workers[0].PendingFilterFrame, "pending frame should be cleared")
|
||||
}
|
||||
|
||||
func TestReconcileWorkerAllocations_UnconfirmedProposalWithNilSelf(t *testing.T) {
|
||||
logger := zap.NewNop()
|
||||
wm := newMockWorkerManager()
|
||||
|
||||
// Create a worker with a filter set from a join proposal
|
||||
filter1 := []byte("shard-filter-1")
|
||||
worker1 := &store.WorkerInfo{
|
||||
CoreId: 1,
|
||||
Filter: slices.Clone(filter1),
|
||||
Allocated: false,
|
||||
PendingFilterFrame: 100,
|
||||
}
|
||||
require.NoError(t, wm.RegisterWorker(worker1))
|
||||
|
||||
engine := &GlobalConsensusEngine{
|
||||
logger: logger,
|
||||
workerManager: wm,
|
||||
}
|
||||
|
||||
// Even with nil self (no prover info yet), after timeout the filter should be cleared
|
||||
// This handles the case where we proposed but haven't synced prover info yet
|
||||
|
||||
// At frame 105, still within timeout - should keep filter
|
||||
engine.reconcileWorkerAllocations(105, nil)
|
||||
|
||||
workers, err := wm.RangeWorkers()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, workers, 1)
|
||||
assert.Equal(t, filter1, workers[0].Filter, "filter should be kept within timeout window even with nil self")
|
||||
|
||||
// At frame 111, past timeout - should clear filter
|
||||
engine.reconcileWorkerAllocations(111, nil)
|
||||
|
||||
workers, err = wm.RangeWorkers()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, workers, 1)
|
||||
assert.Nil(t, workers[0].Filter, "filter should be cleared after timeout even with nil self")
|
||||
}
|
||||
@ -118,6 +118,7 @@ type GlobalConsensusEngine struct {
|
||||
config *config.Config
|
||||
pubsub tp2p.PubSub
|
||||
hypergraph hypergraph.Hypergraph
|
||||
hypergraphStore store.HypergraphStore
|
||||
keyManager typeskeys.KeyManager
|
||||
keyStore store.KeyStore
|
||||
clockStore store.ClockStore
|
||||
@ -298,6 +299,7 @@ func NewGlobalConsensusEngine(
|
||||
config: config,
|
||||
pubsub: ps,
|
||||
hypergraph: hypergraph,
|
||||
hypergraphStore: hypergraphStore,
|
||||
keyManager: keyManager,
|
||||
keyStore: keyStore,
|
||||
clockStore: clockStore,
|
||||
@ -566,6 +568,43 @@ func NewGlobalConsensusEngine(
|
||||
componentBuilder.AddWorker(engine.globalTimeReel.Start)
|
||||
componentBuilder.AddWorker(engine.startGlobalMessageAggregator)
|
||||
|
||||
adds := engine.hypergraph.(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(
|
||||
tries.ShardKey{
|
||||
L1: [3]byte{},
|
||||
L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)),
|
||||
},
|
||||
)
|
||||
|
||||
if lc, _ := adds.GetTree().GetMetadata(); lc == 0 {
|
||||
if config.P2P.Network == 0 {
|
||||
genesisData := engine.getMainnetGenesisJSON()
|
||||
if genesisData == nil {
|
||||
panic("no genesis data")
|
||||
}
|
||||
|
||||
state := hgstate.NewHypergraphState(engine.hypergraph)
|
||||
|
||||
err = engine.establishMainnetGenesisProvers(state, genesisData)
|
||||
if err != nil {
|
||||
engine.logger.Error("failed to establish provers", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = state.Commit()
|
||||
if err != nil {
|
||||
engine.logger.Error("failed to commit", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
} else {
|
||||
engine.establishTestnetGenesisProvers()
|
||||
}
|
||||
|
||||
err := engine.proverRegistry.Refresh()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
if engine.config.P2P.Network == 99 || engine.config.Engine.ArchiveMode {
|
||||
latest, err := engine.consensusStore.GetConsensusState(nil)
|
||||
var state *models.CertifiedState[*protobufs.GlobalFrame]
|
||||
@ -597,42 +636,6 @@ func NewGlobalConsensusEngine(
|
||||
if err != nil {
|
||||
establishGenesis()
|
||||
} else {
|
||||
adds := engine.hypergraph.(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(
|
||||
tries.ShardKey{
|
||||
L1: [3]byte{},
|
||||
L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)),
|
||||
},
|
||||
)
|
||||
|
||||
if lc, _ := adds.GetTree().GetMetadata(); lc == 0 {
|
||||
if config.P2P.Network == 0 {
|
||||
genesisData := engine.getMainnetGenesisJSON()
|
||||
if genesisData == nil {
|
||||
panic("no genesis data")
|
||||
}
|
||||
|
||||
state := hgstate.NewHypergraphState(engine.hypergraph)
|
||||
|
||||
err = engine.establishMainnetGenesisProvers(state, genesisData)
|
||||
if err != nil {
|
||||
engine.logger.Error("failed to establish provers", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = state.Commit()
|
||||
if err != nil {
|
||||
engine.logger.Error("failed to commit", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
} else {
|
||||
engine.establishTestnetGenesisProvers()
|
||||
}
|
||||
|
||||
err := engine.proverRegistry.Refresh()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
if latest.LatestTimeout != nil {
|
||||
logger.Info(
|
||||
"obtained latest consensus state",
|
||||
@ -1703,20 +1706,29 @@ func (e *GlobalConsensusEngine) materialize(
|
||||
return err
|
||||
}
|
||||
|
||||
err = e.proverRegistry.PruneOrphanJoins(frameNumber)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "materialize")
|
||||
}
|
||||
|
||||
if err := state.Commit(); err != nil {
|
||||
return errors.Wrap(err, "materialize")
|
||||
}
|
||||
|
||||
// Persist any alt shard updates from this frame
|
||||
if err := e.persistAltShardUpdates(frameNumber, requests); err != nil {
|
||||
e.logger.Error(
|
||||
"failed to persist alt shard updates",
|
||||
zap.Uint64("frame_number", frameNumber),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
|
||||
err = e.proverRegistry.ProcessStateTransition(state, frameNumber)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "materialize")
|
||||
}
|
||||
|
||||
err = e.proverRegistry.PruneOrphanJoins(frameNumber)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "materialize")
|
||||
}
|
||||
|
||||
shouldVerifyRoot := !e.config.Engine.ArchiveMode || e.config.P2P.Network == 99
|
||||
localProverRoot, localRootErr := e.computeLocalProverRoot(frameNumber)
|
||||
if localRootErr != nil {
|
||||
@ -1771,6 +1783,91 @@ func (e *GlobalConsensusEngine) materialize(
|
||||
return nil
|
||||
}
|
||||
|
||||
// persistAltShardUpdates iterates through frame requests to find and persist
|
||||
// any AltShardUpdate messages to the hypergraph store.
|
||||
func (e *GlobalConsensusEngine) persistAltShardUpdates(
|
||||
frameNumber uint64,
|
||||
requests []*protobufs.MessageBundle,
|
||||
) error {
|
||||
var altUpdates []*protobufs.AltShardUpdate
|
||||
|
||||
// Collect all alt shard updates from the frame's requests
|
||||
for _, bundle := range requests {
|
||||
if bundle == nil {
|
||||
continue
|
||||
}
|
||||
for _, req := range bundle.Requests {
|
||||
if req == nil {
|
||||
continue
|
||||
}
|
||||
if altUpdate := req.GetAltShardUpdate(); altUpdate != nil {
|
||||
altUpdates = append(altUpdates, altUpdate)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(altUpdates) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create a transaction for the hypergraph store
|
||||
txn, err := e.hypergraphStore.NewTransaction(false)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "persist alt shard updates")
|
||||
}
|
||||
|
||||
for _, update := range altUpdates {
|
||||
// Derive shard address from public key
|
||||
if len(update.PublicKey) == 0 {
|
||||
e.logger.Warn("alt shard update with empty public key, skipping")
|
||||
continue
|
||||
}
|
||||
|
||||
addrBI, err := poseidon.HashBytes(update.PublicKey)
|
||||
if err != nil {
|
||||
e.logger.Warn(
|
||||
"failed to hash alt shard public key",
|
||||
zap.Error(err),
|
||||
)
|
||||
continue
|
||||
}
|
||||
shardAddress := addrBI.FillBytes(make([]byte, 32))
|
||||
|
||||
// Persist the alt shard commit
|
||||
err = e.hypergraphStore.SetAltShardCommit(
|
||||
txn,
|
||||
frameNumber,
|
||||
shardAddress,
|
||||
update.VertexAddsRoot,
|
||||
update.VertexRemovesRoot,
|
||||
update.HyperedgeAddsRoot,
|
||||
update.HyperedgeRemovesRoot,
|
||||
)
|
||||
if err != nil {
|
||||
txn.Abort()
|
||||
return errors.Wrap(err, "persist alt shard updates")
|
||||
}
|
||||
|
||||
e.logger.Debug(
|
||||
"persisted alt shard update",
|
||||
zap.Uint64("frame_number", frameNumber),
|
||||
zap.String("shard_address", hex.EncodeToString(shardAddress)),
|
||||
)
|
||||
}
|
||||
|
||||
if err := txn.Commit(); err != nil {
|
||||
return errors.Wrap(err, "persist alt shard updates")
|
||||
}
|
||||
|
||||
e.logger.Info(
|
||||
"persisted alt shard updates",
|
||||
zap.Uint64("frame_number", frameNumber),
|
||||
zap.Int("count", len(altUpdates)),
|
||||
)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *GlobalConsensusEngine) computeLocalProverRoot(
|
||||
frameNumber uint64,
|
||||
) ([]byte, error) {
|
||||
@ -4033,6 +4130,82 @@ func (e *GlobalConsensusEngine) rebuildShardCommitments(
|
||||
e.shardCommitmentKeySets[idx] = currKeys
|
||||
}
|
||||
|
||||
// Apply alt shard overrides - these have externally-managed roots
|
||||
if e.hypergraphStore != nil {
|
||||
altShardAddrs, err := e.hypergraphStore.RangeAltShardAddresses()
|
||||
if err != nil {
|
||||
e.logger.Warn("failed to get alt shard addresses", zap.Error(err))
|
||||
} else {
|
||||
for _, shardAddr := range altShardAddrs {
|
||||
vertexAdds, vertexRemoves, hyperedgeAdds, hyperedgeRemoves, err :=
|
||||
e.hypergraphStore.GetLatestAltShardCommit(shardAddr)
|
||||
if err != nil {
|
||||
e.logger.Debug(
|
||||
"failed to get alt shard commit",
|
||||
zap.Binary("shard_address", shardAddr),
|
||||
zap.Error(err),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
// Calculate L1 indices (bloom filter) for this shard address
|
||||
l1Indices := up2p.GetBloomFilterIndices(shardAddr, 256, 3)
|
||||
|
||||
// Insert each phase's root into the commitment trees
|
||||
roots := [][]byte{vertexAdds, vertexRemoves, hyperedgeAdds, hyperedgeRemoves}
|
||||
for phaseSet, root := range roots {
|
||||
if len(root) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
foldedShardKey := make([]byte, 32)
|
||||
copy(foldedShardKey, shardAddr)
|
||||
foldedShardKey[0] |= byte(phaseSet << 6)
|
||||
keyStr := string(foldedShardKey)
|
||||
|
||||
for _, l1Idx := range l1Indices {
|
||||
index := int(l1Idx)
|
||||
if index >= len(e.shardCommitmentTrees) {
|
||||
continue
|
||||
}
|
||||
|
||||
if e.shardCommitmentTrees[index] == nil {
|
||||
e.shardCommitmentTrees[index] = &tries.VectorCommitmentTree{}
|
||||
}
|
||||
|
||||
if currentKeySets[index] == nil {
|
||||
currentKeySets[index] = make(map[string]struct{})
|
||||
}
|
||||
currentKeySets[index][keyStr] = struct{}{}
|
||||
|
||||
tree := e.shardCommitmentTrees[index]
|
||||
if existing, err := tree.Get(foldedShardKey); err == nil &&
|
||||
bytes.Equal(existing, root) {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := tree.Insert(
|
||||
foldedShardKey,
|
||||
slices.Clone(root),
|
||||
nil,
|
||||
big.NewInt(int64(len(root))),
|
||||
); err != nil {
|
||||
e.logger.Warn(
|
||||
"failed to insert alt shard root",
|
||||
zap.Binary("shard_address", shardAddr),
|
||||
zap.Int("phase", phaseSet),
|
||||
zap.Error(err),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
changedTrees[index] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < len(e.shardCommitmentTrees); i++ {
|
||||
if e.shardCommitmentTrees[i] == nil {
|
||||
e.shardCommitmentTrees[i] = &tries.VectorCommitmentTree{}
|
||||
|
||||
@ -523,31 +523,37 @@ func (r *ProverRegistry) pruneAllocationVertex(
|
||||
if info == nil {
|
||||
return errors.New("missing info")
|
||||
}
|
||||
if len(info.PublicKey) == 0 {
|
||||
r.logger.Warn(
|
||||
"unable to prune allocation without public key",
|
||||
zap.String("address", hex.EncodeToString(info.Address)),
|
||||
)
|
||||
return errors.New("invalid record")
|
||||
}
|
||||
|
||||
allocationHash, err := poseidon.HashBytes(
|
||||
slices.Concat(
|
||||
[]byte("PROVER_ALLOCATION"),
|
||||
info.PublicKey,
|
||||
allocation.ConfirmationFilter,
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "prune allocation hash")
|
||||
}
|
||||
|
||||
var vertexID [64]byte
|
||||
copy(vertexID[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:])
|
||||
copy(
|
||||
vertexID[32:],
|
||||
allocationHash.FillBytes(make([]byte, 32)),
|
||||
)
|
||||
|
||||
// Use pre-computed VertexAddress if available, otherwise derive from public
|
||||
// key
|
||||
if len(allocation.VertexAddress) == 32 {
|
||||
copy(vertexID[32:], allocation.VertexAddress)
|
||||
} else if len(info.PublicKey) == 0 {
|
||||
r.logger.Warn(
|
||||
"unable to prune allocation without vertex address or public key",
|
||||
zap.String("address", hex.EncodeToString(info.Address)),
|
||||
)
|
||||
return nil
|
||||
} else {
|
||||
// Fallback: derive vertex address from public key (legacy path)
|
||||
allocationHash, err := poseidon.HashBytes(
|
||||
slices.Concat(
|
||||
[]byte("PROVER_ALLOCATION"),
|
||||
info.PublicKey,
|
||||
allocation.ConfirmationFilter,
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "prune allocation hash")
|
||||
}
|
||||
copy(
|
||||
vertexID[32:],
|
||||
allocationHash.FillBytes(make([]byte, 32)),
|
||||
)
|
||||
}
|
||||
|
||||
shardKey := tries.ShardKey{
|
||||
L1: [3]byte{0x00, 0x00, 0x00},
|
||||
@ -1224,7 +1230,7 @@ func (r *ProverRegistry) extractGlobalState() error {
|
||||
lastActiveFrameNumber = binary.BigEndian.Uint64(bytes)
|
||||
}
|
||||
|
||||
// Create allocation info
|
||||
// Create allocation info - key[32:] contains the allocation vertex address
|
||||
allocationInfo := consensus.ProverAllocationInfo{
|
||||
Status: mappedStatus,
|
||||
ConfirmationFilter: confirmationFilter,
|
||||
@ -1239,6 +1245,7 @@ func (r *ProverRegistry) extractGlobalState() error {
|
||||
LeaveConfirmFrameNumber: leaveConfirmFrameNumber,
|
||||
LeaveRejectFrameNumber: leaveRejectFrameNumber,
|
||||
LastActiveFrameNumber: lastActiveFrameNumber,
|
||||
VertexAddress: append([]byte(nil), key[32:]...),
|
||||
}
|
||||
|
||||
// Create or update ProverInfo
|
||||
@ -1723,6 +1730,11 @@ func (r *ProverRegistry) processProverChange(
|
||||
leaveRejectFrameNumber
|
||||
proverInfo.Allocations[i].LastActiveFrameNumber =
|
||||
lastActiveFrameNumber
|
||||
// Ensure VertexAddress is set (for backwards compatibility)
|
||||
if len(proverInfo.Allocations[i].VertexAddress) == 0 {
|
||||
proverInfo.Allocations[i].VertexAddress =
|
||||
append([]byte(nil), proverAddress...)
|
||||
}
|
||||
found = true
|
||||
}
|
||||
}
|
||||
@ -1744,6 +1756,7 @@ func (r *ProverRegistry) processProverChange(
|
||||
LeaveConfirmFrameNumber: leaveConfirmFrameNumber,
|
||||
LeaveRejectFrameNumber: leaveRejectFrameNumber,
|
||||
LastActiveFrameNumber: lastActiveFrameNumber,
|
||||
VertexAddress: append([]byte(nil), proverAddress...),
|
||||
},
|
||||
)
|
||||
}
|
||||
@ -2053,6 +2066,7 @@ func (r *ProverRegistry) GetAllActiveAppShardProvers() (
|
||||
LeaveConfirmFrameNumber: allocation.LeaveConfirmFrameNumber,
|
||||
LeaveRejectFrameNumber: allocation.LeaveRejectFrameNumber,
|
||||
LastActiveFrameNumber: allocation.LastActiveFrameNumber,
|
||||
VertexAddress: make([]byte, len(allocation.VertexAddress)),
|
||||
}
|
||||
copy(
|
||||
proverCopy.Allocations[i].ConfirmationFilter,
|
||||
@ -2062,6 +2076,10 @@ func (r *ProverRegistry) GetAllActiveAppShardProvers() (
|
||||
proverCopy.Allocations[i].RejectionFilter,
|
||||
allocation.RejectionFilter,
|
||||
)
|
||||
copy(
|
||||
proverCopy.Allocations[i].VertexAddress,
|
||||
allocation.VertexAddress,
|
||||
)
|
||||
}
|
||||
|
||||
result = append(result, proverCopy)
|
||||
|
||||
@ -1399,3 +1399,227 @@ func TestPruneOrphanJoins_IncompleteState(t *testing.T) {
|
||||
t.Logf(" - Prover removed after all allocations pruned")
|
||||
t.Logf(" - Registry methods confirm prover is gone")
|
||||
}
|
||||
|
||||
// TestPruneOrphanJoins_OrphanedAllocation tests the scenario where an allocation
|
||||
// vertex exists but the prover vertex is missing. The allocation should still be
|
||||
// pruned if it's eligible (old join frame, joining status).
|
||||
func TestPruneOrphanJoins_OrphanedAllocation(t *testing.T) {
|
||||
logger := zap.NewNop()
|
||||
|
||||
// Create stores with in-memory pebble DB
|
||||
pebbleDB := store.NewPebbleDB(
|
||||
logger,
|
||||
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/prune_orphaned_alloc"},
|
||||
0,
|
||||
)
|
||||
defer pebbleDB.Close()
|
||||
|
||||
// Create inclusion prover and verifiable encryptor
|
||||
inclusionProver := bls48581.NewKZGInclusionProver(logger)
|
||||
verifiableEncryptor := verenc.NewMPCitHVerifiableEncryptor(1)
|
||||
|
||||
// Create hypergraph store and hypergraph
|
||||
hypergraphStore := store.NewPebbleHypergraphStore(
|
||||
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/prune_orphaned_alloc"},
|
||||
pebbleDB,
|
||||
logger,
|
||||
verifiableEncryptor,
|
||||
inclusionProver,
|
||||
)
|
||||
hg, err := hypergraphStore.LoadHypergraph(&tests.Nopthenticator{}, 1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create RDF multiprover for setting up test data
|
||||
rdfMultiprover := schema.NewRDFMultiprover(
|
||||
&schema.TurtleRDFParser{},
|
||||
inclusionProver,
|
||||
)
|
||||
|
||||
const currentFrame = uint64(1000)
|
||||
const oldJoinFrame = uint64(100) // Will be pruned
|
||||
|
||||
// Helper to create ONLY an allocation vertex (no prover vertex)
|
||||
// This simulates the case where the prover was deleted but the allocation remains
|
||||
createOrphanedAllocation := func(
|
||||
publicKey []byte,
|
||||
filter []byte,
|
||||
joinFrame uint64,
|
||||
) (proverAddress []byte, allocationAddress []byte, err error) {
|
||||
proverAddressBI, err := poseidon.HashBytes(publicKey)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
proverAddr := proverAddressBI.FillBytes(make([]byte, 32))
|
||||
|
||||
allocationAddressBI, err := poseidon.HashBytes(
|
||||
slices.Concat([]byte("PROVER_ALLOCATION"), publicKey, filter),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
allocAddr := allocationAddressBI.FillBytes(make([]byte, 32))
|
||||
|
||||
hgCRDT := hg.(*hgcrdt.HypergraphCRDT)
|
||||
txn, err := hgCRDT.NewTransaction(false)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Create ONLY the allocation vertex (no prover vertex)
|
||||
allocationTree := &tries.VectorCommitmentTree{}
|
||||
_ = rdfMultiprover.Set(global.GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
|
||||
"allocation:ProverAllocation", "Prover", proverAddr, allocationTree)
|
||||
_ = rdfMultiprover.Set(global.GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
|
||||
"allocation:ProverAllocation", "Status", []byte{0}, allocationTree) // Joining
|
||||
_ = rdfMultiprover.Set(global.GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
|
||||
"allocation:ProverAllocation", "ConfirmationFilter", filter, allocationTree)
|
||||
|
||||
frameNumberBytes := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(frameNumberBytes, joinFrame)
|
||||
_ = rdfMultiprover.Set(global.GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
|
||||
"allocation:ProverAllocation", "JoinFrameNumber", frameNumberBytes, allocationTree)
|
||||
|
||||
allocationVertex := hgcrdt.NewVertex(
|
||||
intrinsics.GLOBAL_INTRINSIC_ADDRESS,
|
||||
[32]byte(allocAddr),
|
||||
allocationTree.Commit(inclusionProver, false),
|
||||
big.NewInt(0),
|
||||
)
|
||||
if err := hg.AddVertex(txn, allocationVertex); err != nil {
|
||||
txn.Abort()
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
var allocationVertexID [64]byte
|
||||
copy(allocationVertexID[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:])
|
||||
copy(allocationVertexID[32:], allocAddr)
|
||||
if err := hg.SetVertexData(txn, allocationVertexID, allocationTree); err != nil {
|
||||
txn.Abort()
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if err := txn.Commit(); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return proverAddr, allocAddr, nil
|
||||
}
|
||||
|
||||
// Helper to check if vertex exists
|
||||
vertexExists := func(vertexID [64]byte) bool {
|
||||
_, err := hg.GetVertex(vertexID)
|
||||
return err == nil
|
||||
}
|
||||
|
||||
// Helper to check if vertex data exists
|
||||
vertexDataExists := func(vertexID [64]byte) bool {
|
||||
data, err := hg.GetVertexData(vertexID)
|
||||
return err == nil && data != nil
|
||||
}
|
||||
|
||||
// Helper to compute vertex ID from address
|
||||
getVertexID := func(address []byte) [64]byte {
|
||||
var id [64]byte
|
||||
copy(id[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:])
|
||||
copy(id[32:], address)
|
||||
return id
|
||||
}
|
||||
|
||||
// Create 5 orphaned allocations (no prover vertex exists)
|
||||
publicKeys := make([][]byte, 5)
|
||||
proverAddresses := make([][]byte, 5)
|
||||
allocationAddresses := make([][]byte, 5)
|
||||
filters := make([][]byte, 5)
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
publicKeys[i] = bytes.Repeat([]byte{byte(0x70 + i)}, 585)
|
||||
filters[i] = []byte(fmt.Sprintf("orphan_filter_%d", i))
|
||||
|
||||
proverAddr, allocAddr, err := createOrphanedAllocation(
|
||||
publicKeys[i],
|
||||
filters[i],
|
||||
oldJoinFrame,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
proverAddresses[i] = proverAddr
|
||||
allocationAddresses[i] = allocAddr
|
||||
t.Logf("Created orphaned allocation %d: prover=%s, allocation=%s",
|
||||
i, hex.EncodeToString(proverAddr), hex.EncodeToString(allocAddr))
|
||||
}
|
||||
|
||||
// Verify initial state: allocation vertices exist, prover vertices do NOT exist
|
||||
for i := 0; i < 5; i++ {
|
||||
proverVertexID := getVertexID(proverAddresses[i])
|
||||
allocVertexID := getVertexID(allocationAddresses[i])
|
||||
|
||||
assert.False(t, vertexExists(proverVertexID),
|
||||
"Prover %d vertex should NOT exist (orphaned allocation)", i)
|
||||
assert.False(t, vertexDataExists(proverVertexID),
|
||||
"Prover %d vertex data should NOT exist (orphaned allocation)", i)
|
||||
|
||||
assert.True(t, vertexExists(allocVertexID),
|
||||
"Allocation %d vertex should exist before prune", i)
|
||||
assert.True(t, vertexDataExists(allocVertexID),
|
||||
"Allocation %d vertex data should exist before prune", i)
|
||||
}
|
||||
|
||||
// Create registry - this will load allocations from vertex data iterator
|
||||
// The allocations will be loaded even though their prover vertices don't exist
|
||||
registry, err := NewProverRegistry(logger, hg)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify the allocations created ProverInfo entries in the cache
|
||||
// (with Address but no PublicKey since prover vertex doesn't exist)
|
||||
for i := 0; i < 5; i++ {
|
||||
info, err := registry.GetProverInfo(proverAddresses[i])
|
||||
require.NoError(t, err)
|
||||
if info != nil {
|
||||
t.Logf("Prover %d in cache: address=%s, publicKey len=%d, allocations=%d",
|
||||
i, hex.EncodeToString(info.Address), len(info.PublicKey), len(info.Allocations))
|
||||
// The prover info should have no public key since the prover vertex doesn't exist
|
||||
assert.Empty(t, info.PublicKey,
|
||||
"Prover %d should have no public key (prover vertex missing)", i)
|
||||
assert.Len(t, info.Allocations, 1,
|
||||
"Prover %d should have 1 allocation", i)
|
||||
// Verify VertexAddress is set on the allocation
|
||||
assert.Len(t, info.Allocations[0].VertexAddress, 32,
|
||||
"Allocation %d should have VertexAddress set", i)
|
||||
}
|
||||
}
|
||||
|
||||
// Run pruning
|
||||
err = registry.PruneOrphanJoins(currentFrame)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify post-prune state: all orphaned allocations should be pruned
|
||||
for i := 0; i < 5; i++ {
|
||||
allocVertexID := getVertexID(allocationAddresses[i])
|
||||
|
||||
assert.False(t, vertexExists(allocVertexID),
|
||||
"Allocation %d vertex should be DELETED after prune", i)
|
||||
assert.False(t, vertexDataExists(allocVertexID),
|
||||
"Allocation %d vertex data should be DELETED after prune", i)
|
||||
}
|
||||
|
||||
// Verify registry cache state: provers should be removed
|
||||
for i := 0; i < 5; i++ {
|
||||
info, err := registry.GetProverInfo(proverAddresses[i])
|
||||
require.NoError(t, err)
|
||||
assert.Nil(t, info,
|
||||
"Prover %d should be removed from registry cache after prune", i)
|
||||
}
|
||||
|
||||
// Verify through GetProvers that the provers are gone from all filters
|
||||
for i := 0; i < 5; i++ {
|
||||
provers, err := registry.GetProvers(filters[i])
|
||||
require.NoError(t, err)
|
||||
for _, p := range provers {
|
||||
assert.NotEqual(t, hex.EncodeToString(proverAddresses[i]), hex.EncodeToString(p.Address),
|
||||
"Prover %d should not appear in GetProvers for filter %s", i, string(filters[i]))
|
||||
}
|
||||
}
|
||||
|
||||
t.Logf("Orphaned allocation prune test completed successfully")
|
||||
t.Logf(" - 5 allocations with missing prover vertices: all pruned using VertexAddress")
|
||||
t.Logf(" - Registry cache cleaned up")
|
||||
}
|
||||
|
||||
260
node/execution/intrinsics/global/global_alt_shard_update.go
Normal file
260
node/execution/intrinsics/global/global_alt_shard_update.go
Normal file
@ -0,0 +1,260 @@
|
||||
package global
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"math/big"
|
||||
"slices"
|
||||
|
||||
"github.com/iden3/go-iden3-crypto/poseidon"
|
||||
"github.com/pkg/errors"
|
||||
"source.quilibrium.com/quilibrium/monorepo/types/crypto"
|
||||
"source.quilibrium.com/quilibrium/monorepo/types/execution/intrinsics"
|
||||
"source.quilibrium.com/quilibrium/monorepo/types/execution/state"
|
||||
"source.quilibrium.com/quilibrium/monorepo/types/hypergraph"
|
||||
"source.quilibrium.com/quilibrium/monorepo/types/keys"
|
||||
)
|
||||
|
||||
// AltShardUpdate represents an update to an alternative shard's roots.
|
||||
// The shard address is derived from the poseidon hash of the BLS48-581 public key.
|
||||
// This allows external entities to maintain their own state trees with provable
|
||||
// ownership through signature verification.
|
||||
type AltShardUpdate struct {
|
||||
// The BLS48-581 public key that owns this shard
|
||||
// The shard address is poseidon(PublicKey)
|
||||
PublicKey []byte
|
||||
|
||||
// The frame number when this update was signed
|
||||
// Must be within 2 frames of the verification frame number
|
||||
FrameNumber uint64
|
||||
|
||||
// The root hash for vertex adds tree
|
||||
VertexAddsRoot []byte
|
||||
|
||||
// The root hash for vertex removes tree
|
||||
VertexRemovesRoot []byte
|
||||
|
||||
// The root hash for hyperedge adds tree
|
||||
HyperedgeAddsRoot []byte
|
||||
|
||||
// The root hash for hyperedge removes tree
|
||||
HyperedgeRemovesRoot []byte
|
||||
|
||||
// The BLS48-581 signature over (FrameNumber || VertexAddsRoot ||
|
||||
// VertexRemovesRoot || HyperedgeAddsRoot || HyperedgeRemovesRoot)
|
||||
Signature []byte
|
||||
|
||||
// Private dependencies
|
||||
hypergraph hypergraph.Hypergraph
|
||||
keyManager keys.KeyManager
|
||||
signer crypto.Signer
|
||||
}
|
||||
|
||||
// NewAltShardUpdate creates a new AltShardUpdate instance
|
||||
func NewAltShardUpdate(
|
||||
frameNumber uint64,
|
||||
vertexAddsRoot []byte,
|
||||
vertexRemovesRoot []byte,
|
||||
hyperedgeAddsRoot []byte,
|
||||
hyperedgeRemovesRoot []byte,
|
||||
hypergraph hypergraph.Hypergraph,
|
||||
keyManager keys.KeyManager,
|
||||
signer crypto.Signer,
|
||||
) (*AltShardUpdate, error) {
|
||||
return &AltShardUpdate{
|
||||
FrameNumber: frameNumber,
|
||||
VertexAddsRoot: vertexAddsRoot,
|
||||
VertexRemovesRoot: vertexRemovesRoot,
|
||||
HyperedgeAddsRoot: hyperedgeAddsRoot,
|
||||
HyperedgeRemovesRoot: hyperedgeRemovesRoot,
|
||||
hypergraph: hypergraph,
|
||||
keyManager: keyManager,
|
||||
signer: signer,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetCost returns the cost of this operation (zero for shard updates)
|
||||
func (a *AltShardUpdate) GetCost() (*big.Int, error) {
|
||||
return big.NewInt(0), nil
|
||||
}
|
||||
|
||||
// getSignedMessage constructs the message that is signed
|
||||
func (a *AltShardUpdate) getSignedMessage() []byte {
|
||||
frameBytes := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(frameBytes, a.FrameNumber)
|
||||
|
||||
return slices.Concat(
|
||||
frameBytes,
|
||||
a.VertexAddsRoot,
|
||||
a.VertexRemovesRoot,
|
||||
a.HyperedgeAddsRoot,
|
||||
a.HyperedgeRemovesRoot,
|
||||
)
|
||||
}
|
||||
|
||||
// getShardAddress derives the shard address from the public key
|
||||
func (a *AltShardUpdate) getShardAddress() ([]byte, error) {
|
||||
if len(a.PublicKey) == 0 {
|
||||
return nil, errors.New("public key is empty")
|
||||
}
|
||||
|
||||
addrBI, err := poseidon.HashBytes(a.PublicKey)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "hash public key")
|
||||
}
|
||||
|
||||
return addrBI.FillBytes(make([]byte, 32)), nil
|
||||
}
|
||||
|
||||
// Prove signs the update with the signer's BLS48-581 key
|
||||
func (a *AltShardUpdate) Prove(frameNumber uint64) error {
|
||||
if a.signer == nil {
|
||||
return errors.New("signer is nil")
|
||||
}
|
||||
|
||||
a.PublicKey = a.signer.Public().([]byte)
|
||||
|
||||
// Create domain for signature
|
||||
domainPreimage := slices.Concat(
|
||||
intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
|
||||
[]byte("ALT_SHARD_UPDATE"),
|
||||
)
|
||||
domain, err := poseidon.HashBytes(domainPreimage)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "prove")
|
||||
}
|
||||
|
||||
message := a.getSignedMessage()
|
||||
signature, err := a.signer.SignWithDomain(
|
||||
message,
|
||||
domain.FillBytes(make([]byte, 32)),
|
||||
)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "prove")
|
||||
}
|
||||
|
||||
a.Signature = signature
|
||||
return nil
|
||||
}
|
||||
|
||||
// Verify validates the signature and frame number constraints
|
||||
func (a *AltShardUpdate) Verify(frameNumber uint64) (bool, error) {
|
||||
if a.keyManager == nil {
|
||||
return false, errors.New("key manager is nil")
|
||||
}
|
||||
|
||||
// Validate public key length (BLS48-581 public key is 585 bytes)
|
||||
if len(a.PublicKey) != 585 {
|
||||
return false, errors.Errorf(
|
||||
"invalid public key length: expected 585, got %d",
|
||||
len(a.PublicKey),
|
||||
)
|
||||
}
|
||||
|
||||
// Validate signature length (BLS48-581 signature is 74 bytes)
|
||||
if len(a.Signature) != 74 {
|
||||
return false, errors.Errorf(
|
||||
"invalid signature length: expected 74, got %d",
|
||||
len(a.Signature),
|
||||
)
|
||||
}
|
||||
|
||||
// Validate root lengths (must be 64 or 74 bytes)
|
||||
isValidRootLen := func(length int) bool {
|
||||
return length == 64 || length == 74
|
||||
}
|
||||
if !isValidRootLen(len(a.VertexAddsRoot)) {
|
||||
return false, errors.Errorf(
|
||||
"vertex adds root must be 64 or 74 bytes, got %d",
|
||||
len(a.VertexAddsRoot),
|
||||
)
|
||||
}
|
||||
if !isValidRootLen(len(a.VertexRemovesRoot)) {
|
||||
return false, errors.Errorf(
|
||||
"vertex removes root must be 64 or 74 bytes, got %d",
|
||||
len(a.VertexRemovesRoot),
|
||||
)
|
||||
}
|
||||
if !isValidRootLen(len(a.HyperedgeAddsRoot)) {
|
||||
return false, errors.Errorf(
|
||||
"hyperedge adds root must be 64 or 74 bytes, got %d",
|
||||
len(a.HyperedgeAddsRoot),
|
||||
)
|
||||
}
|
||||
if !isValidRootLen(len(a.HyperedgeRemovesRoot)) {
|
||||
return false, errors.Errorf(
|
||||
"hyperedge removes root must be 64 or 74 bytes, got %d",
|
||||
len(a.HyperedgeRemovesRoot),
|
||||
)
|
||||
}
|
||||
|
||||
// Frame number must be within 2 frames of the verification frame
|
||||
// and not in the future
|
||||
if a.FrameNumber > frameNumber {
|
||||
return false, errors.New("frame number is in the future")
|
||||
}
|
||||
if frameNumber-a.FrameNumber > 2 {
|
||||
return false, errors.New("frame number is too old (more than 2 frames)")
|
||||
}
|
||||
|
||||
// Create domain for signature verification
|
||||
domainPreimage := slices.Concat(
|
||||
intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
|
||||
[]byte("ALT_SHARD_UPDATE"),
|
||||
)
|
||||
domain, err := poseidon.HashBytes(domainPreimage)
|
||||
if err != nil {
|
||||
return false, errors.Wrap(err, "verify")
|
||||
}
|
||||
|
||||
message := a.getSignedMessage()
|
||||
valid, err := a.keyManager.ValidateSignature(
|
||||
crypto.KeyTypeBLS48581G1,
|
||||
a.PublicKey,
|
||||
message,
|
||||
a.Signature,
|
||||
domain.FillBytes(make([]byte, 32)),
|
||||
)
|
||||
if err != nil {
|
||||
return false, errors.Wrap(err, "verify")
|
||||
}
|
||||
if !valid {
|
||||
return false, errors.New("invalid signature")
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// GetReadAddresses returns the addresses this operation reads from
|
||||
func (a *AltShardUpdate) GetReadAddresses(
|
||||
frameNumber uint64,
|
||||
) ([][]byte, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// GetWriteAddresses returns the addresses this operation writes to
|
||||
func (a *AltShardUpdate) GetWriteAddresses(
|
||||
frameNumber uint64,
|
||||
) ([][]byte, error) {
|
||||
shardAddress, err := a.getShardAddress()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "get write addresses")
|
||||
}
|
||||
|
||||
// We write to four trees under this shard address, all at the zero key
|
||||
// The full address is shardAddress (app) + 00...00 (data)
|
||||
zeroKey := bytes.Repeat([]byte{0x00}, 32)
|
||||
fullAddress := slices.Concat(shardAddress, zeroKey)
|
||||
|
||||
return [][]byte{fullAddress}, nil
|
||||
}
|
||||
|
||||
// Materialize applies the shard update to the state
|
||||
func (a *AltShardUpdate) Materialize(
|
||||
frameNumber uint64,
|
||||
state state.State,
|
||||
) (state.State, error) {
|
||||
return state, nil
|
||||
}
|
||||
|
||||
var _ intrinsics.IntrinsicOperation = (*AltShardUpdate)(nil)
|
||||
@ -177,27 +177,44 @@ func TestHypergraphSyncServer(t *testing.T) {
|
||||
servertxn.Commit()
|
||||
clienttxn.Commit()
|
||||
|
||||
// Seed an orphan vertex that only exists on the client so pruning can remove it.
|
||||
orphanData := make([]byte, 32)
|
||||
_, _ = rand.Read(orphanData)
|
||||
var orphanAddr [32]byte
|
||||
copy(orphanAddr[:], orphanData)
|
||||
orphanVertex := hgcrdt.NewVertex(
|
||||
vertices1[0].GetAppAddress(),
|
||||
orphanAddr,
|
||||
dataTree1.Commit(inclusionProver, false),
|
||||
dataTree1.GetSize(),
|
||||
)
|
||||
orphanShard := application.GetShardKey(orphanVertex)
|
||||
require.Equal(t, shardKey, orphanShard, "orphan vertex must share shard")
|
||||
// Seed many orphan vertices that only exist on the client so pruning can
|
||||
// remove them. We create enough orphans with varied addresses to trigger
|
||||
// tree restructuring (node merges) when they get deleted during sync.
|
||||
// This tests the fix for the FullPrefix bug in lazy_proof_tree.go Delete().
|
||||
numOrphans := 200
|
||||
orphanVertices := make([]application.Vertex, numOrphans)
|
||||
orphanIDs := make([][64]byte, numOrphans)
|
||||
|
||||
orphanTxn, err := clientHypergraphStore.NewTransaction(false)
|
||||
require.NoError(t, err)
|
||||
orphanID := orphanVertex.GetID()
|
||||
require.NoError(t, clientHypergraphStore.SaveVertexTree(orphanTxn, orphanID[:], dataTree1))
|
||||
require.NoError(t, crdts[1].AddVertex(orphanTxn, orphanVertex))
|
||||
|
||||
for i := 0; i < numOrphans; i++ {
|
||||
orphanData := make([]byte, 32)
|
||||
_, _ = rand.Read(orphanData)
|
||||
// Mix in the index to ensure varied distribution across tree branches
|
||||
binary.BigEndian.PutUint32(orphanData[28:], uint32(i))
|
||||
|
||||
var orphanAddr [32]byte
|
||||
copy(orphanAddr[:], orphanData)
|
||||
orphanVertices[i] = hgcrdt.NewVertex(
|
||||
vertices1[0].GetAppAddress(),
|
||||
orphanAddr,
|
||||
dataTree1.Commit(inclusionProver, false),
|
||||
dataTree1.GetSize(),
|
||||
)
|
||||
orphanShard := application.GetShardKey(orphanVertices[i])
|
||||
require.Equal(t, shardKey, orphanShard, "orphan vertex %d must share shard", i)
|
||||
|
||||
orphanIDs[i] = orphanVertices[i].GetID()
|
||||
require.NoError(t, clientHypergraphStore.SaveVertexTree(orphanTxn, orphanIDs[i][:], dataTree1))
|
||||
require.NoError(t, crdts[1].AddVertex(orphanTxn, orphanVertices[i]))
|
||||
}
|
||||
require.NoError(t, orphanTxn.Commit())
|
||||
|
||||
clientSet := crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey)
|
||||
require.True(t, clientSet.Has(orphanID), "client must start with orphan leaf")
|
||||
for i := 0; i < numOrphans; i++ {
|
||||
require.True(t, clientSet.Has(orphanIDs[i]), "client must start with orphan leaf %d", i)
|
||||
}
|
||||
logger.Info("saved")
|
||||
|
||||
for _, op := range operations1 {
|
||||
@ -299,7 +316,11 @@ func TestHypergraphSyncServer(t *testing.T) {
|
||||
log.Fatalf("Client: failed to sync 1: %v", err)
|
||||
}
|
||||
str.CloseSend()
|
||||
require.False(t, clientSet.Has(orphanID), "orphan vertex should be pruned after sync")
|
||||
|
||||
// Verify all orphan vertices were pruned after sync
|
||||
for i := 0; i < numOrphans; i++ {
|
||||
require.False(t, clientSet.Has(orphanIDs[i]), "orphan vertex %d should be pruned after sync", i)
|
||||
}
|
||||
leaves := crypto.CompareLeaves(
|
||||
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree(),
|
||||
crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree(),
|
||||
@ -310,7 +331,7 @@ func TestHypergraphSyncServer(t *testing.T) {
|
||||
clientTree := crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree()
|
||||
coveredPrefixPath := clientTree.CoveredPrefix
|
||||
if len(coveredPrefixPath) == 0 {
|
||||
coveredPrefixPath = tries.GetFullPath(orphanID[:])[:0]
|
||||
coveredPrefixPath = tries.GetFullPath(orphanIDs[0][:])[:0]
|
||||
}
|
||||
allLeaves := tries.GetAllLeaves(
|
||||
clientTree.SetType,
|
||||
|
||||
@ -115,6 +115,9 @@ const (
|
||||
HYPERGRAPH_VERTEX_REMOVES_SHARD_COMMIT = 0xE1
|
||||
HYPERGRAPH_HYPEREDGE_ADDS_SHARD_COMMIT = 0xE2
|
||||
HYPERGRAPH_HYPEREDGE_REMOVES_SHARD_COMMIT = 0xE3
|
||||
HYPERGRAPH_ALT_SHARD_COMMIT = 0xE4
|
||||
HYPERGRAPH_ALT_SHARD_COMMIT_LATEST = 0xE5
|
||||
HYPERGRAPH_ALT_SHARD_ADDRESS_INDEX = 0xE6
|
||||
VERTEX_DATA = 0xF0
|
||||
VERTEX_TOMBSTONE = 0xF1
|
||||
HYPERGRAPH_COVERED_PREFIX = 0xFA
|
||||
|
||||
@ -401,6 +401,35 @@ func hypergraphCoveredPrefixKey() []byte {
|
||||
return key
|
||||
}
|
||||
|
||||
// hypergraphAltShardCommitKey returns the key for storing alt shard roots at a
|
||||
// specific frame number. The value stored at this key contains all four roots
|
||||
// concatenated (32 bytes each = 128 bytes total).
|
||||
func hypergraphAltShardCommitKey(
|
||||
frameNumber uint64,
|
||||
shardAddress []byte,
|
||||
) []byte {
|
||||
key := []byte{HYPERGRAPH_SHARD, HYPERGRAPH_ALT_SHARD_COMMIT}
|
||||
key = binary.BigEndian.AppendUint64(key, frameNumber)
|
||||
key = append(key, shardAddress...)
|
||||
return key
|
||||
}
|
||||
|
||||
// hypergraphAltShardCommitLatestKey returns the key for storing the latest
|
||||
// frame number for an alt shard. The value is an 8-byte big-endian frame number.
|
||||
func hypergraphAltShardCommitLatestKey(shardAddress []byte) []byte {
|
||||
key := []byte{HYPERGRAPH_SHARD, HYPERGRAPH_ALT_SHARD_COMMIT_LATEST}
|
||||
key = append(key, shardAddress...)
|
||||
return key
|
||||
}
|
||||
|
||||
// hypergraphAltShardAddressIndexKey returns the key for marking that an alt
|
||||
// shard address exists. Used for iterating all alt shard addresses.
|
||||
func hypergraphAltShardAddressIndexKey(shardAddress []byte) []byte {
|
||||
key := []byte{HYPERGRAPH_SHARD, HYPERGRAPH_ALT_SHARD_ADDRESS_INDEX}
|
||||
key = append(key, shardAddress...)
|
||||
return key
|
||||
}
|
||||
|
||||
func (p *PebbleHypergraphStore) copyShardData(
|
||||
dst store.KVDB,
|
||||
shardKey tries.ShardKey,
|
||||
@ -1700,7 +1729,7 @@ func (p *PebbleHypergraphStore) GetRootCommits(
|
||||
) (map[tries.ShardKey][][]byte, error) {
|
||||
iter, err := p.db.NewIter(
|
||||
hypergraphVertexAddsShardCommitKey(frameNumber, nil),
|
||||
hypergraphHyperedgeAddsShardCommitKey(
|
||||
hypergraphHyperedgeRemovesShardCommitKey(
|
||||
frameNumber,
|
||||
bytes.Repeat([]byte{0xff}, 65),
|
||||
),
|
||||
@ -2027,3 +2056,198 @@ func (p *PebbleHypergraphStore) InsertRawLeaf(
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetAltShardCommit stores the four roots for an alt shard at a given frame
|
||||
// number and updates the latest index if this is the newest frame.
|
||||
func (p *PebbleHypergraphStore) SetAltShardCommit(
|
||||
txn tries.TreeBackingStoreTransaction,
|
||||
frameNumber uint64,
|
||||
shardAddress []byte,
|
||||
vertexAddsRoot []byte,
|
||||
vertexRemovesRoot []byte,
|
||||
hyperedgeAddsRoot []byte,
|
||||
hyperedgeRemovesRoot []byte,
|
||||
) error {
|
||||
if txn == nil {
|
||||
return errors.Wrap(
|
||||
errors.New("requires transaction"),
|
||||
"set alt shard commit",
|
||||
)
|
||||
}
|
||||
|
||||
// Validate roots are valid sizes (64 or 74 bytes)
|
||||
for _, root := range [][]byte{
|
||||
vertexAddsRoot, vertexRemovesRoot, hyperedgeAddsRoot, hyperedgeRemovesRoot,
|
||||
} {
|
||||
if len(root) != 64 && len(root) != 74 {
|
||||
return errors.Wrap(
|
||||
errors.New("roots must be 64 or 74 bytes"),
|
||||
"set alt shard commit",
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Store as length-prefixed values: 1 byte length + data for each root
|
||||
value := make([]byte, 0, 4+len(vertexAddsRoot)+len(vertexRemovesRoot)+
|
||||
len(hyperedgeAddsRoot)+len(hyperedgeRemovesRoot))
|
||||
value = append(value, byte(len(vertexAddsRoot)))
|
||||
value = append(value, vertexAddsRoot...)
|
||||
value = append(value, byte(len(vertexRemovesRoot)))
|
||||
value = append(value, vertexRemovesRoot...)
|
||||
value = append(value, byte(len(hyperedgeAddsRoot)))
|
||||
value = append(value, hyperedgeAddsRoot...)
|
||||
value = append(value, byte(len(hyperedgeRemovesRoot)))
|
||||
value = append(value, hyperedgeRemovesRoot...)
|
||||
|
||||
// Store the commit at the frame-specific key
|
||||
commitKey := hypergraphAltShardCommitKey(frameNumber, shardAddress)
|
||||
if err := txn.Set(commitKey, value); err != nil {
|
||||
return errors.Wrap(err, "set alt shard commit")
|
||||
}
|
||||
|
||||
// Update the latest index if this frame is newer
|
||||
latestKey := hypergraphAltShardCommitLatestKey(shardAddress)
|
||||
existing, closer, err := p.db.Get(latestKey)
|
||||
if err != nil && !errors.Is(err, pebble.ErrNotFound) {
|
||||
return errors.Wrap(err, "set alt shard commit: get latest")
|
||||
}
|
||||
|
||||
shouldUpdate := true
|
||||
if err == nil {
|
||||
defer closer.Close()
|
||||
if len(existing) == 8 {
|
||||
existingFrame := binary.BigEndian.Uint64(existing)
|
||||
if existingFrame >= frameNumber {
|
||||
shouldUpdate = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if shouldUpdate {
|
||||
frameBytes := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(frameBytes, frameNumber)
|
||||
if err := txn.Set(latestKey, frameBytes); err != nil {
|
||||
return errors.Wrap(err, "set alt shard commit: update latest")
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the address is in the index for RangeAltShardAddresses
|
||||
indexKey := hypergraphAltShardAddressIndexKey(shardAddress)
|
||||
if err := txn.Set(indexKey, []byte{0x01}); err != nil {
|
||||
return errors.Wrap(err, "set alt shard commit: update index")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetLatestAltShardCommit retrieves the most recent roots for an alt shard.
|
||||
func (p *PebbleHypergraphStore) GetLatestAltShardCommit(
|
||||
shardAddress []byte,
|
||||
) (
|
||||
vertexAddsRoot []byte,
|
||||
vertexRemovesRoot []byte,
|
||||
hyperedgeAddsRoot []byte,
|
||||
hyperedgeRemovesRoot []byte,
|
||||
err error,
|
||||
) {
|
||||
// Get the latest frame number for this shard
|
||||
latestKey := hypergraphAltShardCommitLatestKey(shardAddress)
|
||||
frameBytes, closer, err := p.db.Get(latestKey)
|
||||
if err != nil {
|
||||
if errors.Is(err, pebble.ErrNotFound) {
|
||||
return nil, nil, nil, nil, errors.Wrap(
|
||||
store.ErrNotFound,
|
||||
"get latest alt shard commit",
|
||||
)
|
||||
}
|
||||
return nil, nil, nil, nil, errors.Wrap(err, "get latest alt shard commit")
|
||||
}
|
||||
defer closer.Close()
|
||||
|
||||
if len(frameBytes) != 8 {
|
||||
return nil, nil, nil, nil, errors.Wrap(
|
||||
store.ErrInvalidData,
|
||||
"get latest alt shard commit: invalid frame number",
|
||||
)
|
||||
}
|
||||
|
||||
frameNumber := binary.BigEndian.Uint64(frameBytes)
|
||||
|
||||
// Get the commit at that frame
|
||||
commitKey := hypergraphAltShardCommitKey(frameNumber, shardAddress)
|
||||
value, commitCloser, err := p.db.Get(commitKey)
|
||||
if err != nil {
|
||||
if errors.Is(err, pebble.ErrNotFound) {
|
||||
return nil, nil, nil, nil, errors.Wrap(
|
||||
store.ErrNotFound,
|
||||
"get latest alt shard commit: commit not found",
|
||||
)
|
||||
}
|
||||
return nil, nil, nil, nil, errors.Wrap(err, "get latest alt shard commit")
|
||||
}
|
||||
defer commitCloser.Close()
|
||||
|
||||
// Parse length-prefixed format
|
||||
offset := 0
|
||||
parseRoot := func() ([]byte, error) {
|
||||
if offset >= len(value) {
|
||||
return nil, errors.New("unexpected end of data")
|
||||
}
|
||||
length := int(value[offset])
|
||||
offset++
|
||||
if offset+length > len(value) {
|
||||
return nil, errors.New("root length exceeds data")
|
||||
}
|
||||
root := make([]byte, length)
|
||||
copy(root, value[offset:offset+length])
|
||||
offset += length
|
||||
return root, nil
|
||||
}
|
||||
|
||||
var parseErr error
|
||||
vertexAddsRoot, parseErr = parseRoot()
|
||||
if parseErr != nil {
|
||||
return nil, nil, nil, nil, errors.Wrap(parseErr, "get latest alt shard commit")
|
||||
}
|
||||
vertexRemovesRoot, parseErr = parseRoot()
|
||||
if parseErr != nil {
|
||||
return nil, nil, nil, nil, errors.Wrap(parseErr, "get latest alt shard commit")
|
||||
}
|
||||
hyperedgeAddsRoot, parseErr = parseRoot()
|
||||
if parseErr != nil {
|
||||
return nil, nil, nil, nil, errors.Wrap(parseErr, "get latest alt shard commit")
|
||||
}
|
||||
hyperedgeRemovesRoot, parseErr = parseRoot()
|
||||
if parseErr != nil {
|
||||
return nil, nil, nil, nil, errors.Wrap(parseErr, "get latest alt shard commit")
|
||||
}
|
||||
|
||||
return vertexAddsRoot, vertexRemovesRoot, hyperedgeAddsRoot, hyperedgeRemovesRoot, nil
|
||||
}
|
||||
|
||||
// RangeAltShardAddresses returns all alt shard addresses that have stored
|
||||
// commits.
|
||||
func (p *PebbleHypergraphStore) RangeAltShardAddresses() ([][]byte, error) {
|
||||
startKey := []byte{HYPERGRAPH_SHARD, HYPERGRAPH_ALT_SHARD_ADDRESS_INDEX}
|
||||
endKey := []byte{HYPERGRAPH_SHARD, HYPERGRAPH_ALT_SHARD_ADDRESS_INDEX + 1}
|
||||
|
||||
iter, err := p.db.NewIter(startKey, endKey)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "range alt shard addresses")
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
var addresses [][]byte
|
||||
prefixLen := len(startKey)
|
||||
|
||||
for iter.First(); iter.Valid(); iter.Next() {
|
||||
key := iter.Key()
|
||||
if len(key) > prefixLen {
|
||||
addr := make([]byte, len(key)-prefixLen)
|
||||
copy(addr, key[prefixLen:])
|
||||
addresses = append(addresses, addr)
|
||||
}
|
||||
}
|
||||
|
||||
return addresses, nil
|
||||
}
|
||||
|
||||
125
node/store/hypergraph_test.go
Normal file
125
node/store/hypergraph_test.go
Normal file
@ -0,0 +1,125 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"source.quilibrium.com/quilibrium/monorepo/config"
|
||||
"source.quilibrium.com/quilibrium/monorepo/types/tries"
|
||||
)
|
||||
|
||||
func setupTestHypergraphStore(t *testing.T) *PebbleHypergraphStore {
|
||||
logger := zap.NewNop()
|
||||
cfg := &config.DBConfig{
|
||||
InMemoryDONOTUSE: true,
|
||||
Path: ".test/hypergraph",
|
||||
}
|
||||
db := NewPebbleDB(logger, cfg, 0)
|
||||
require.NotNil(t, db)
|
||||
t.Cleanup(func() { db.Close() })
|
||||
return NewPebbleHypergraphStore(cfg, db, logger, nil, nil)
|
||||
}
|
||||
|
||||
func TestGetRootCommits_IncludesAllCommitTypes(t *testing.T) {
|
||||
hgStore := setupTestHypergraphStore(t)
|
||||
|
||||
// Create a test shard address
|
||||
shardAddress := bytes.Repeat([]byte{0x42}, 32)
|
||||
frameNumber := uint64(100)
|
||||
|
||||
// Create test commits (64 bytes each)
|
||||
vertexAddsCommit := bytes.Repeat([]byte{0xAA}, 64)
|
||||
vertexRemovesCommit := bytes.Repeat([]byte{0xBB}, 64)
|
||||
hyperedgeAddsCommit := bytes.Repeat([]byte{0xCC}, 64)
|
||||
hyperedgeRemovesCommit := bytes.Repeat([]byte{0xDD}, 64)
|
||||
|
||||
// Start a transaction and write all four commit types
|
||||
txn, err := hgStore.NewTransaction(false)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = hgStore.SetShardCommit(txn, frameNumber, "adds", "vertex", shardAddress, vertexAddsCommit)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = hgStore.SetShardCommit(txn, frameNumber, "removes", "vertex", shardAddress, vertexRemovesCommit)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = hgStore.SetShardCommit(txn, frameNumber, "adds", "hyperedge", shardAddress, hyperedgeAddsCommit)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = hgStore.SetShardCommit(txn, frameNumber, "removes", "hyperedge", shardAddress, hyperedgeRemovesCommit)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = txn.Commit()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Now retrieve all commits using GetRootCommits
|
||||
commits, err := hgStore.GetRootCommits(frameNumber)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Find the shard key for our test address
|
||||
var foundShardKey *tries.ShardKey
|
||||
for sk := range commits {
|
||||
if bytes.Equal(sk.L2[:], shardAddress) {
|
||||
foundShardKey = &sk
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
require.NotNil(t, foundShardKey, "Should find the shard in commits")
|
||||
|
||||
shardCommits := commits[*foundShardKey]
|
||||
require.Len(t, shardCommits, 4, "Should have 4 commit slots")
|
||||
|
||||
// Verify each commit type was retrieved
|
||||
assert.Equal(t, vertexAddsCommit, shardCommits[0], "Vertex adds commit should match")
|
||||
assert.Equal(t, vertexRemovesCommit, shardCommits[1], "Vertex removes commit should match")
|
||||
assert.Equal(t, hyperedgeAddsCommit, shardCommits[2], "Hyperedge adds commit should match")
|
||||
assert.Equal(t, hyperedgeRemovesCommit, shardCommits[3], "Hyperedge removes commit should match")
|
||||
}
|
||||
|
||||
func TestGetRootCommits_HyperedgeRemovesOnly(t *testing.T) {
|
||||
// This test specifically checks if hyperedge removes are retrieved
|
||||
// when they are the only commit type for a shard
|
||||
hgStore := setupTestHypergraphStore(t)
|
||||
|
||||
// Create a test shard address
|
||||
shardAddress := bytes.Repeat([]byte{0x99}, 32)
|
||||
frameNumber := uint64(200)
|
||||
|
||||
// Only write hyperedge removes commit
|
||||
hyperedgeRemovesCommit := bytes.Repeat([]byte{0xEE}, 64)
|
||||
|
||||
txn, err := hgStore.NewTransaction(false)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = hgStore.SetShardCommit(txn, frameNumber, "removes", "hyperedge", shardAddress, hyperedgeRemovesCommit)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = txn.Commit()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Now retrieve all commits using GetRootCommits
|
||||
commits, err := hgStore.GetRootCommits(frameNumber)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Find the shard key for our test address
|
||||
var foundShardKey *tries.ShardKey
|
||||
for sk := range commits {
|
||||
if bytes.Equal(sk.L2[:], shardAddress) {
|
||||
foundShardKey = &sk
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// This assertion will fail if hyperedge removes are not included in the range scan
|
||||
require.NotNil(t, foundShardKey, "Should find the shard with only hyperedge removes in commits")
|
||||
|
||||
shardCommits := commits[*foundShardKey]
|
||||
require.Len(t, shardCommits, 4, "Should have 4 commit slots")
|
||||
|
||||
// The hyperedge removes should be at index 3
|
||||
assert.Equal(t, hyperedgeRemovesCommit, shardCommits[3], "Hyperedge removes commit should match")
|
||||
}
|
||||
@ -62,6 +62,11 @@ var pebbleMigrations = []func(*pebble.Batch) error{
|
||||
migration_2_1_0_158,
|
||||
migration_2_1_0_159,
|
||||
migration_2_1_0_17,
|
||||
migration_2_1_0_171,
|
||||
migration_2_1_0_172,
|
||||
migration_2_1_0_172,
|
||||
migration_2_1_0_173,
|
||||
migration_2_1_0_18,
|
||||
}
|
||||
|
||||
func NewPebbleDB(
|
||||
@ -675,6 +680,22 @@ func migration_2_1_0_159(b *pebble.Batch) error {
|
||||
}
|
||||
|
||||
func migration_2_1_0_17(b *pebble.Batch) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func migration_2_1_0_171(b *pebble.Batch) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func migration_2_1_0_172(b *pebble.Batch) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func migration_2_1_0_173(b *pebble.Batch) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func migration_2_1_0_18(b *pebble.Batch) error {
|
||||
// Global shard key: L1={0,0,0}, L2=0xff*32
|
||||
globalShardKey := tries.ShardKey{
|
||||
L1: [3]byte{},
|
||||
|
||||
@ -62,6 +62,7 @@ const (
|
||||
TraversalProofType uint32 = 0x0316
|
||||
GlobalProposalType uint32 = 0x0317
|
||||
AppShardProposalType uint32 = 0x0318
|
||||
AltShardUpdateType uint32 = 0x0319
|
||||
TimeoutStateType uint32 = 0x031C
|
||||
TimeoutCertificateType uint32 = 0x031D
|
||||
|
||||
|
||||
@ -1870,6 +1870,214 @@ func (p *ProverUpdate) FromCanonicalBytes(data []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// AltShardUpdate serialization methods
|
||||
func (a *AltShardUpdate) ToCanonicalBytes() ([]byte, error) {
|
||||
buf := new(bytes.Buffer)
|
||||
|
||||
// Write type prefix
|
||||
if err := binary.Write(buf, binary.BigEndian, AltShardUpdateType); err != nil {
|
||||
return nil, errors.Wrap(err, "to canonical bytes")
|
||||
}
|
||||
|
||||
// Write public_key (length-prefixed)
|
||||
if err := binary.Write(
|
||||
buf,
|
||||
binary.BigEndian,
|
||||
uint32(len(a.PublicKey)),
|
||||
); err != nil {
|
||||
return nil, errors.Wrap(err, "to canonical bytes")
|
||||
}
|
||||
if _, err := buf.Write(a.PublicKey); err != nil {
|
||||
return nil, errors.Wrap(err, "to canonical bytes")
|
||||
}
|
||||
|
||||
// Write frame_number
|
||||
if err := binary.Write(buf, binary.BigEndian, a.FrameNumber); err != nil {
|
||||
return nil, errors.Wrap(err, "to canonical bytes")
|
||||
}
|
||||
|
||||
// Write vertex_adds_root (length-prefixed)
|
||||
if err := binary.Write(
|
||||
buf,
|
||||
binary.BigEndian,
|
||||
uint32(len(a.VertexAddsRoot)),
|
||||
); err != nil {
|
||||
return nil, errors.Wrap(err, "to canonical bytes")
|
||||
}
|
||||
if _, err := buf.Write(a.VertexAddsRoot); err != nil {
|
||||
return nil, errors.Wrap(err, "to canonical bytes")
|
||||
}
|
||||
|
||||
// Write vertex_removes_root (length-prefixed)
|
||||
if err := binary.Write(
|
||||
buf,
|
||||
binary.BigEndian,
|
||||
uint32(len(a.VertexRemovesRoot)),
|
||||
); err != nil {
|
||||
return nil, errors.Wrap(err, "to canonical bytes")
|
||||
}
|
||||
if _, err := buf.Write(a.VertexRemovesRoot); err != nil {
|
||||
return nil, errors.Wrap(err, "to canonical bytes")
|
||||
}
|
||||
|
||||
// Write hyperedge_adds_root (length-prefixed)
|
||||
if err := binary.Write(
|
||||
buf,
|
||||
binary.BigEndian,
|
||||
uint32(len(a.HyperedgeAddsRoot)),
|
||||
); err != nil {
|
||||
return nil, errors.Wrap(err, "to canonical bytes")
|
||||
}
|
||||
if _, err := buf.Write(a.HyperedgeAddsRoot); err != nil {
|
||||
return nil, errors.Wrap(err, "to canonical bytes")
|
||||
}
|
||||
|
||||
// Write hyperedge_removes_root (length-prefixed)
|
||||
if err := binary.Write(
|
||||
buf,
|
||||
binary.BigEndian,
|
||||
uint32(len(a.HyperedgeRemovesRoot)),
|
||||
); err != nil {
|
||||
return nil, errors.Wrap(err, "to canonical bytes")
|
||||
}
|
||||
if _, err := buf.Write(a.HyperedgeRemovesRoot); err != nil {
|
||||
return nil, errors.Wrap(err, "to canonical bytes")
|
||||
}
|
||||
|
||||
// Write signature (length-prefixed)
|
||||
if err := binary.Write(
|
||||
buf,
|
||||
binary.BigEndian,
|
||||
uint32(len(a.Signature)),
|
||||
); err != nil {
|
||||
return nil, errors.Wrap(err, "to canonical bytes")
|
||||
}
|
||||
if _, err := buf.Write(a.Signature); err != nil {
|
||||
return nil, errors.Wrap(err, "to canonical bytes")
|
||||
}
|
||||
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
func (a *AltShardUpdate) FromCanonicalBytes(data []byte) error {
|
||||
buf := bytes.NewBuffer(data)
|
||||
|
||||
// Read and verify type prefix
|
||||
var typePrefix uint32
|
||||
if err := binary.Read(buf, binary.BigEndian, &typePrefix); err != nil {
|
||||
return errors.Wrap(err, "from canonical bytes")
|
||||
}
|
||||
if typePrefix != AltShardUpdateType {
|
||||
return errors.Wrap(
|
||||
errors.New("invalid type prefix"),
|
||||
"from canonical bytes",
|
||||
)
|
||||
}
|
||||
|
||||
// Read public_key
|
||||
var pubKeyLen uint32
|
||||
if err := binary.Read(buf, binary.BigEndian, &pubKeyLen); err != nil {
|
||||
return errors.Wrap(err, "from canonical bytes")
|
||||
}
|
||||
if pubKeyLen > 600 {
|
||||
return errors.Wrap(
|
||||
errors.New("invalid public key length"),
|
||||
"from canonical bytes",
|
||||
)
|
||||
}
|
||||
a.PublicKey = make([]byte, pubKeyLen)
|
||||
if _, err := buf.Read(a.PublicKey); err != nil {
|
||||
return errors.Wrap(err, "from canonical bytes")
|
||||
}
|
||||
|
||||
// Read frame_number
|
||||
if err := binary.Read(buf, binary.BigEndian, &a.FrameNumber); err != nil {
|
||||
return errors.Wrap(err, "from canonical bytes")
|
||||
}
|
||||
|
||||
// Read vertex_adds_root
|
||||
var vertexAddsLen uint32
|
||||
if err := binary.Read(buf, binary.BigEndian, &vertexAddsLen); err != nil {
|
||||
return errors.Wrap(err, "from canonical bytes")
|
||||
}
|
||||
if vertexAddsLen > 80 {
|
||||
return errors.Wrap(
|
||||
errors.New("invalid vertex adds root length"),
|
||||
"from canonical bytes",
|
||||
)
|
||||
}
|
||||
a.VertexAddsRoot = make([]byte, vertexAddsLen)
|
||||
if _, err := buf.Read(a.VertexAddsRoot); err != nil {
|
||||
return errors.Wrap(err, "from canonical bytes")
|
||||
}
|
||||
|
||||
// Read vertex_removes_root
|
||||
var vertexRemovesLen uint32
|
||||
if err := binary.Read(buf, binary.BigEndian, &vertexRemovesLen); err != nil {
|
||||
return errors.Wrap(err, "from canonical bytes")
|
||||
}
|
||||
if vertexRemovesLen > 80 {
|
||||
return errors.Wrap(
|
||||
errors.New("invalid vertex removes root length"),
|
||||
"from canonical bytes",
|
||||
)
|
||||
}
|
||||
a.VertexRemovesRoot = make([]byte, vertexRemovesLen)
|
||||
if _, err := buf.Read(a.VertexRemovesRoot); err != nil {
|
||||
return errors.Wrap(err, "from canonical bytes")
|
||||
}
|
||||
|
||||
// Read hyperedge_adds_root
|
||||
var hyperedgeAddsLen uint32
|
||||
if err := binary.Read(buf, binary.BigEndian, &hyperedgeAddsLen); err != nil {
|
||||
return errors.Wrap(err, "from canonical bytes")
|
||||
}
|
||||
if hyperedgeAddsLen > 80 {
|
||||
return errors.Wrap(
|
||||
errors.New("invalid hyperedge adds root length"),
|
||||
"from canonical bytes",
|
||||
)
|
||||
}
|
||||
a.HyperedgeAddsRoot = make([]byte, hyperedgeAddsLen)
|
||||
if _, err := buf.Read(a.HyperedgeAddsRoot); err != nil {
|
||||
return errors.Wrap(err, "from canonical bytes")
|
||||
}
|
||||
|
||||
// Read hyperedge_removes_root
|
||||
var hyperedgeRemovesLen uint32
|
||||
if err := binary.Read(buf, binary.BigEndian, &hyperedgeRemovesLen); err != nil {
|
||||
return errors.Wrap(err, "from canonical bytes")
|
||||
}
|
||||
if hyperedgeRemovesLen > 80 {
|
||||
return errors.Wrap(
|
||||
errors.New("invalid hyperedge removes root length"),
|
||||
"from canonical bytes",
|
||||
)
|
||||
}
|
||||
a.HyperedgeRemovesRoot = make([]byte, hyperedgeRemovesLen)
|
||||
if _, err := buf.Read(a.HyperedgeRemovesRoot); err != nil {
|
||||
return errors.Wrap(err, "from canonical bytes")
|
||||
}
|
||||
|
||||
// Read signature
|
||||
var sigLen uint32
|
||||
if err := binary.Read(buf, binary.BigEndian, &sigLen); err != nil {
|
||||
return errors.Wrap(err, "from canonical bytes")
|
||||
}
|
||||
if sigLen > 80 {
|
||||
return errors.Wrap(
|
||||
errors.New("invalid signature length"),
|
||||
"from canonical bytes",
|
||||
)
|
||||
}
|
||||
a.Signature = make([]byte, sigLen)
|
||||
if _, err := buf.Read(a.Signature); err != nil {
|
||||
return errors.Wrap(err, "from canonical bytes")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MessageRequest) ToCanonicalBytes() ([]byte, error) {
|
||||
buf := new(bytes.Buffer)
|
||||
|
||||
@ -1933,6 +2141,8 @@ func (m *MessageRequest) ToCanonicalBytes() ([]byte, error) {
|
||||
innerBytes, err = request.CodeFinalize.ToCanonicalBytes()
|
||||
case *MessageRequest_Shard:
|
||||
innerBytes, err = request.Shard.ToCanonicalBytes()
|
||||
case *MessageRequest_AltShardUpdate:
|
||||
innerBytes, err = request.AltShardUpdate.ToCanonicalBytes()
|
||||
default:
|
||||
return nil, errors.New("unknown request type")
|
||||
}
|
||||
@ -2189,6 +2399,15 @@ func (m *MessageRequest) FromCanonicalBytes(data []byte) error {
|
||||
}
|
||||
m.Request = &MessageRequest_Shard{Shard: frameHeader}
|
||||
|
||||
case AltShardUpdateType:
|
||||
altShardUpdate := &AltShardUpdate{}
|
||||
if err := altShardUpdate.FromCanonicalBytes(dataBytes); err != nil {
|
||||
return errors.Wrap(err, "from canonical bytes")
|
||||
}
|
||||
m.Request = &MessageRequest_AltShardUpdate{
|
||||
AltShardUpdate: altShardUpdate,
|
||||
}
|
||||
|
||||
default:
|
||||
return errors.Errorf("unknown message type: 0x%08X", innerType)
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -78,6 +78,27 @@ message ProverReject {
|
||||
repeated bytes filters = 4;
|
||||
}
|
||||
|
||||
// AltShardUpdate allows external entities to maintain their own state trees
|
||||
// with provable ownership through signature verification. The shard address
|
||||
// is derived from the poseidon hash of the BLS48-581 public key.
|
||||
message AltShardUpdate {
|
||||
// The BLS48-581 public key that owns this shard (585 bytes)
|
||||
bytes public_key = 1;
|
||||
// The frame number when this update was signed (must be within 2 frames)
|
||||
uint64 frame_number = 2;
|
||||
// The root hash for vertex adds tree (64 or 74 bytes)
|
||||
bytes vertex_adds_root = 3;
|
||||
// The root hash for vertex removes tree (64 or 74 bytes)
|
||||
bytes vertex_removes_root = 4;
|
||||
// The root hash for hyperedge adds tree (64 or 74 bytes)
|
||||
bytes hyperedge_adds_root = 5;
|
||||
// The root hash for hyperedge removes tree (64 or 74 bytes)
|
||||
bytes hyperedge_removes_root = 6;
|
||||
// The BLS48-581 signature (74 bytes) over (FrameNumber || VertexAddsRoot ||
|
||||
// VertexRemovesRoot || HyperedgeAddsRoot || HyperedgeRemovesRoot)
|
||||
bytes signature = 7;
|
||||
}
|
||||
|
||||
message MessageRequest {
|
||||
oneof request {
|
||||
quilibrium.node.global.pb.ProverJoin join = 1;
|
||||
@ -105,6 +126,7 @@ message MessageRequest {
|
||||
quilibrium.node.compute.pb.CodeExecute code_execute = 23;
|
||||
quilibrium.node.compute.pb.CodeFinalize code_finalize = 24;
|
||||
quilibrium.node.global.pb.FrameHeader shard = 25;
|
||||
quilibrium.node.global.pb.AltShardUpdate alt_shard_update = 26;
|
||||
}
|
||||
int64 timestamp = 99;
|
||||
}
|
||||
|
||||
@ -46,6 +46,9 @@ type ProverAllocationInfo struct {
|
||||
LeaveRejectFrameNumber uint64
|
||||
// Last frame number the prover had proved
|
||||
LastActiveFrameNumber uint64
|
||||
// The 32-byte vertex address of this allocation in the hypergraph
|
||||
// (derived from poseidon hash of "PROVER_ALLOCATION" + PublicKey + Filter)
|
||||
VertexAddress []byte
|
||||
}
|
||||
|
||||
// ProverInfo represents information about a prover
|
||||
|
||||
@ -113,4 +113,28 @@ type HypergraphStore interface {
|
||||
) ([]byte, error)
|
||||
GetRootCommits(frameNumber uint64) (map[tries.ShardKey][][]byte, error)
|
||||
ApplySnapshot(dbPath string) error
|
||||
// SetAltShardCommit stores the four roots for an alt shard at a given frame
|
||||
// number and updates the latest index if this is the newest frame.
|
||||
SetAltShardCommit(
|
||||
txn tries.TreeBackingStoreTransaction,
|
||||
frameNumber uint64,
|
||||
shardAddress []byte,
|
||||
vertexAddsRoot []byte,
|
||||
vertexRemovesRoot []byte,
|
||||
hyperedgeAddsRoot []byte,
|
||||
hyperedgeRemovesRoot []byte,
|
||||
) error
|
||||
// GetLatestAltShardCommit retrieves the most recent roots for an alt shard.
|
||||
GetLatestAltShardCommit(
|
||||
shardAddress []byte,
|
||||
) (
|
||||
vertexAddsRoot []byte,
|
||||
vertexRemovesRoot []byte,
|
||||
hyperedgeAddsRoot []byte,
|
||||
hyperedgeRemovesRoot []byte,
|
||||
err error,
|
||||
)
|
||||
// RangeAltShardAddresses returns all alt shard addresses that have stored
|
||||
// commits.
|
||||
RangeAltShardAddresses() ([][]byte, error)
|
||||
}
|
||||
|
||||
@ -2045,11 +2045,25 @@ func (t *LazyVectorCommitmentTree) Delete(
|
||||
mergedPrefix = append(mergedPrefix, lastChildIndex)
|
||||
mergedPrefix = append(mergedPrefix, childBranch.Prefix...)
|
||||
|
||||
// Delete the child node from its old location before updating
|
||||
err := t.Store.DeleteNode(
|
||||
txn,
|
||||
t.SetType,
|
||||
t.PhaseType,
|
||||
t.ShardKey,
|
||||
generateKeyFromPath(childBranch.FullPrefix),
|
||||
childBranch.FullPrefix,
|
||||
)
|
||||
if err != nil {
|
||||
log.Panic("failed to delete old child path", zap.Error(err))
|
||||
}
|
||||
|
||||
childBranch.Prefix = mergedPrefix
|
||||
childBranch.FullPrefix = n.FullPrefix // Update to parent's position
|
||||
childBranch.Commitment = nil
|
||||
|
||||
// Delete this node from storage
|
||||
err := t.Store.DeleteNode(
|
||||
err = t.Store.DeleteNode(
|
||||
txn,
|
||||
t.SetType,
|
||||
t.PhaseType,
|
||||
@ -2061,7 +2075,7 @@ func (t *LazyVectorCommitmentTree) Delete(
|
||||
log.Panic("failed to delete path", zap.Error(err))
|
||||
}
|
||||
|
||||
// Insert the merged child at this path
|
||||
// Insert the merged child at the parent's path
|
||||
err = t.Store.InsertNode(
|
||||
txn,
|
||||
t.SetType,
|
||||
@ -2131,7 +2145,7 @@ func (t *LazyVectorCommitmentTree) Delete(
|
||||
|
||||
func SerializeTree(tree *LazyVectorCommitmentTree) ([]byte, error) {
|
||||
tree.treeMx.Lock()
|
||||
defer tree.treeMx.RLock()
|
||||
defer tree.treeMx.Unlock()
|
||||
var buf bytes.Buffer
|
||||
if err := serializeNode(&buf, tree.Root); err != nil {
|
||||
return nil, fmt.Errorf("failed to serialize tree: %w", err)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user