mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 18:37:26 +08:00
* experiment: reject bad peer info messages * v2.1.0.18 preview * add tagged sync * Add missing hypergraph changes * small tweaks to sync * allow local sync, use it for provers with workers * missing file * resolve build error * resolve sync issue, remove raw sync * resolve deletion promotion bug * resolve sync abstraction leak from tree deletion changes * rearrange prover sync * remove pruning from sync * restore removed sync flag * fix: sync, event stream deadlock, heuristic scoring of better shards * resolve hanging shutdown + pubsub proxy issue * further bugfixes: sync (restore old leaf sync), pubsub shutdown, merge events * fix: clean up rust ffi, background coverage events, and sync tweaks * fix: linking issue for channel, connectivity test aggression, sync regression, join tests * fix: disjoint sync, improper application of filter * resolve sync/reel/validation deadlock * adjust sync to handle no leaf edge cases, multi-path segment traversal * use simpler sync * faster, simpler sync with some debug extras * migration to recalculate * don't use batch * square up the roots * fix nil pointer * fix: seniority calculation, sync race condition, migration * make sync dumber * fix: tree deletion issue * fix: missing seniority merge request canonical serialization * address issues from previous commit test * stale workers should be cleared * remove missing gap check * rearrange collect, reduce sync logging noise * fix: the disjoint leaf/branch sync case * nuclear option on sync failures * v2.1.0.18, finalized
414 lines
13 KiB
Go
414 lines
13 KiB
Go
package global
|
|
|
|
import (
|
|
"context"
|
|
"slices"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/zap"
|
|
typesconsensus "source.quilibrium.com/quilibrium/monorepo/types/consensus"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/store"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/worker"
|
|
)
|
|
|
|
// mockWorkerManager is a simple mock for testing reconcileWorkerAllocations
|
|
type mockWorkerManager struct {
|
|
workers map[uint]*store.WorkerInfo
|
|
}
|
|
|
|
func newMockWorkerManager() *mockWorkerManager {
|
|
return &mockWorkerManager{
|
|
workers: make(map[uint]*store.WorkerInfo),
|
|
}
|
|
}
|
|
|
|
func (m *mockWorkerManager) Start(ctx context.Context) error { return nil }
|
|
func (m *mockWorkerManager) Stop() error { return nil }
|
|
func (m *mockWorkerManager) AllocateWorker(coreId uint, filter []byte) error {
|
|
if w, ok := m.workers[coreId]; ok {
|
|
w.Filter = slices.Clone(filter)
|
|
w.Allocated = true
|
|
}
|
|
return nil
|
|
}
|
|
func (m *mockWorkerManager) DeallocateWorker(coreId uint) error {
|
|
if w, ok := m.workers[coreId]; ok {
|
|
w.Filter = nil
|
|
w.Allocated = false
|
|
}
|
|
return nil
|
|
}
|
|
func (m *mockWorkerManager) CheckWorkersConnected() ([]uint, error) { return nil, nil }
|
|
func (m *mockWorkerManager) GetWorkerIdByFilter(filter []byte) (uint, error) {
|
|
for _, w := range m.workers {
|
|
if string(w.Filter) == string(filter) {
|
|
return w.CoreId, nil
|
|
}
|
|
}
|
|
return 0, nil
|
|
}
|
|
func (m *mockWorkerManager) GetFilterByWorkerId(coreId uint) ([]byte, error) {
|
|
if w, ok := m.workers[coreId]; ok {
|
|
return w.Filter, nil
|
|
}
|
|
return nil, nil
|
|
}
|
|
func (m *mockWorkerManager) RegisterWorker(info *store.WorkerInfo) error {
|
|
m.workers[info.CoreId] = info
|
|
return nil
|
|
}
|
|
func (m *mockWorkerManager) ProposeAllocations(coreIds []uint, filters [][]byte) error {
|
|
return nil
|
|
}
|
|
func (m *mockWorkerManager) DecideAllocations(reject [][]byte, confirm [][]byte) error {
|
|
return nil
|
|
}
|
|
func (m *mockWorkerManager) RangeWorkers() ([]*store.WorkerInfo, error) {
|
|
result := make([]*store.WorkerInfo, 0, len(m.workers))
|
|
for _, w := range m.workers {
|
|
result = append(result, w)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
var _ worker.WorkerManager = (*mockWorkerManager)(nil)
|
|
|
|
func TestReconcileWorkerAllocations_RejectedAllocationClearsFilter(t *testing.T) {
|
|
logger := zap.NewNop()
|
|
wm := newMockWorkerManager()
|
|
|
|
// Create a worker with an assigned filter (simulating a pending join)
|
|
filter1 := []byte("shard-filter-1")
|
|
worker1 := &store.WorkerInfo{
|
|
CoreId: 1,
|
|
Filter: slices.Clone(filter1),
|
|
Allocated: false,
|
|
PendingFilterFrame: 100, // join was proposed at frame 100
|
|
}
|
|
require.NoError(t, wm.RegisterWorker(worker1))
|
|
|
|
// Create the engine with just the worker manager
|
|
engine := &GlobalConsensusEngine{
|
|
logger: logger,
|
|
workerManager: wm,
|
|
}
|
|
|
|
// Case 1: Allocation is rejected - filter should be cleared
|
|
selfWithRejected := &typesconsensus.ProverInfo{
|
|
Address: []byte("prover-address"),
|
|
Allocations: []typesconsensus.ProverAllocationInfo{
|
|
{
|
|
Status: typesconsensus.ProverStatusRejected,
|
|
ConfirmationFilter: filter1,
|
|
JoinFrameNumber: 100,
|
|
},
|
|
},
|
|
}
|
|
|
|
// Run reconciliation at frame 200 (past the join frame but within grace period)
|
|
engine.reconcileWorkerAllocations(200, selfWithRejected)
|
|
|
|
// Verify the worker's filter was cleared because the allocation is rejected
|
|
workers, err := wm.RangeWorkers()
|
|
require.NoError(t, err)
|
|
require.Len(t, workers, 1)
|
|
assert.Nil(t, workers[0].Filter, "rejected allocation should cause filter to be cleared")
|
|
assert.False(t, workers[0].Allocated, "rejected allocation should not be allocated")
|
|
assert.Equal(t, uint64(0), workers[0].PendingFilterFrame, "pending frame should be cleared")
|
|
}
|
|
|
|
func TestReconcileWorkerAllocations_ActiveAllocationKeepsFilter(t *testing.T) {
|
|
logger := zap.NewNop()
|
|
wm := newMockWorkerManager()
|
|
|
|
// Create a worker with an assigned filter
|
|
filter1 := []byte("shard-filter-1")
|
|
worker1 := &store.WorkerInfo{
|
|
CoreId: 1,
|
|
Filter: slices.Clone(filter1),
|
|
Allocated: true,
|
|
PendingFilterFrame: 0,
|
|
}
|
|
require.NoError(t, wm.RegisterWorker(worker1))
|
|
|
|
engine := &GlobalConsensusEngine{
|
|
logger: logger,
|
|
workerManager: wm,
|
|
}
|
|
|
|
// Case 2: Allocation is active - filter should be kept
|
|
selfWithActive := &typesconsensus.ProverInfo{
|
|
Address: []byte("prover-address"),
|
|
Allocations: []typesconsensus.ProverAllocationInfo{
|
|
{
|
|
Status: typesconsensus.ProverStatusActive,
|
|
ConfirmationFilter: filter1,
|
|
JoinFrameNumber: 100,
|
|
},
|
|
},
|
|
}
|
|
|
|
engine.reconcileWorkerAllocations(200, selfWithActive)
|
|
|
|
workers, err := wm.RangeWorkers()
|
|
require.NoError(t, err)
|
|
require.Len(t, workers, 1)
|
|
assert.Equal(t, filter1, workers[0].Filter, "active allocation should keep filter")
|
|
assert.True(t, workers[0].Allocated, "active allocation should be allocated")
|
|
}
|
|
|
|
func TestReconcileWorkerAllocations_JoiningAllocationKeepsFilter(t *testing.T) {
|
|
logger := zap.NewNop()
|
|
wm := newMockWorkerManager()
|
|
|
|
// Create a worker with an assigned filter
|
|
filter1 := []byte("shard-filter-1")
|
|
worker1 := &store.WorkerInfo{
|
|
CoreId: 1,
|
|
Filter: slices.Clone(filter1),
|
|
Allocated: false,
|
|
PendingFilterFrame: 100,
|
|
}
|
|
require.NoError(t, wm.RegisterWorker(worker1))
|
|
|
|
engine := &GlobalConsensusEngine{
|
|
logger: logger,
|
|
workerManager: wm,
|
|
}
|
|
|
|
// Case 3: Allocation is joining - filter should be kept
|
|
selfWithJoining := &typesconsensus.ProverInfo{
|
|
Address: []byte("prover-address"),
|
|
Allocations: []typesconsensus.ProverAllocationInfo{
|
|
{
|
|
Status: typesconsensus.ProverStatusJoining,
|
|
ConfirmationFilter: filter1,
|
|
JoinFrameNumber: 100,
|
|
},
|
|
},
|
|
}
|
|
|
|
engine.reconcileWorkerAllocations(200, selfWithJoining)
|
|
|
|
workers, err := wm.RangeWorkers()
|
|
require.NoError(t, err)
|
|
require.Len(t, workers, 1)
|
|
assert.Equal(t, filter1, workers[0].Filter, "joining allocation should keep filter")
|
|
assert.False(t, workers[0].Allocated, "joining allocation should not be allocated yet")
|
|
assert.Equal(t, uint64(100), workers[0].PendingFilterFrame, "pending frame should be join frame")
|
|
}
|
|
|
|
func TestReconcileWorkerAllocations_MultipleWorkersWithMixedStates(t *testing.T) {
|
|
logger := zap.NewNop()
|
|
wm := newMockWorkerManager()
|
|
|
|
// Create workers with different filters
|
|
filter1 := []byte("shard-filter-1")
|
|
filter2 := []byte("shard-filter-2")
|
|
filter3 := []byte("shard-filter-3")
|
|
|
|
worker1 := &store.WorkerInfo{
|
|
CoreId: 1,
|
|
Filter: slices.Clone(filter1),
|
|
Allocated: true,
|
|
PendingFilterFrame: 0,
|
|
}
|
|
worker2 := &store.WorkerInfo{
|
|
CoreId: 2,
|
|
Filter: slices.Clone(filter2),
|
|
Allocated: false,
|
|
PendingFilterFrame: 100,
|
|
}
|
|
worker3 := &store.WorkerInfo{
|
|
CoreId: 3,
|
|
Filter: slices.Clone(filter3),
|
|
Allocated: false,
|
|
PendingFilterFrame: 100,
|
|
}
|
|
require.NoError(t, wm.RegisterWorker(worker1))
|
|
require.NoError(t, wm.RegisterWorker(worker2))
|
|
require.NoError(t, wm.RegisterWorker(worker3))
|
|
|
|
engine := &GlobalConsensusEngine{
|
|
logger: logger,
|
|
workerManager: wm,
|
|
}
|
|
|
|
// Mixed states: one active, one joining, one rejected
|
|
self := &typesconsensus.ProverInfo{
|
|
Address: []byte("prover-address"),
|
|
Allocations: []typesconsensus.ProverAllocationInfo{
|
|
{
|
|
Status: typesconsensus.ProverStatusActive,
|
|
ConfirmationFilter: filter1,
|
|
JoinFrameNumber: 50,
|
|
},
|
|
{
|
|
Status: typesconsensus.ProverStatusJoining,
|
|
ConfirmationFilter: filter2,
|
|
JoinFrameNumber: 100,
|
|
},
|
|
{
|
|
Status: typesconsensus.ProverStatusRejected,
|
|
ConfirmationFilter: filter3,
|
|
JoinFrameNumber: 100,
|
|
},
|
|
},
|
|
}
|
|
|
|
engine.reconcileWorkerAllocations(200, self)
|
|
|
|
workers, err := wm.RangeWorkers()
|
|
require.NoError(t, err)
|
|
require.Len(t, workers, 3)
|
|
|
|
// Find each worker by core ID
|
|
workerMap := make(map[uint]*store.WorkerInfo)
|
|
for _, w := range workers {
|
|
workerMap[w.CoreId] = w
|
|
}
|
|
|
|
// Worker 1: active allocation - should keep filter and be allocated
|
|
w1 := workerMap[1]
|
|
assert.Equal(t, filter1, w1.Filter, "active worker should keep filter")
|
|
assert.True(t, w1.Allocated, "active worker should be allocated")
|
|
|
|
// Worker 2: joining allocation - should keep filter but not be allocated
|
|
w2 := workerMap[2]
|
|
assert.Equal(t, filter2, w2.Filter, "joining worker should keep filter")
|
|
assert.False(t, w2.Allocated, "joining worker should not be allocated")
|
|
|
|
// Worker 3: rejected allocation - should have filter cleared
|
|
w3 := workerMap[3]
|
|
assert.Nil(t, w3.Filter, "rejected worker should have filter cleared")
|
|
assert.False(t, w3.Allocated, "rejected worker should not be allocated")
|
|
}
|
|
|
|
func TestReconcileWorkerAllocations_RejectedWithNoFreeWorker(t *testing.T) {
|
|
logger := zap.NewNop()
|
|
wm := newMockWorkerManager()
|
|
|
|
// Create a worker with no filter initially
|
|
worker1 := &store.WorkerInfo{
|
|
CoreId: 1,
|
|
Filter: nil,
|
|
Allocated: false,
|
|
PendingFilterFrame: 0,
|
|
}
|
|
require.NoError(t, wm.RegisterWorker(worker1))
|
|
|
|
engine := &GlobalConsensusEngine{
|
|
logger: logger,
|
|
workerManager: wm,
|
|
}
|
|
|
|
// A rejected allocation shouldn't try to assign a worker
|
|
filter1 := []byte("shard-filter-1")
|
|
self := &typesconsensus.ProverInfo{
|
|
Address: []byte("prover-address"),
|
|
Allocations: []typesconsensus.ProverAllocationInfo{
|
|
{
|
|
Status: typesconsensus.ProverStatusRejected,
|
|
ConfirmationFilter: filter1,
|
|
JoinFrameNumber: 100,
|
|
},
|
|
},
|
|
}
|
|
|
|
engine.reconcileWorkerAllocations(200, self)
|
|
|
|
workers, err := wm.RangeWorkers()
|
|
require.NoError(t, err)
|
|
require.Len(t, workers, 1)
|
|
|
|
// The free worker should remain free - rejected allocation should not consume it
|
|
assert.Nil(t, workers[0].Filter, "free worker should remain free when only rejected allocations exist")
|
|
assert.False(t, workers[0].Allocated, "free worker should not be allocated")
|
|
}
|
|
|
|
func TestReconcileWorkerAllocations_UnconfirmedProposalClearsAfterTimeout(t *testing.T) {
|
|
logger := zap.NewNop()
|
|
wm := newMockWorkerManager()
|
|
|
|
// Create a worker with a filter set from a join proposal that never landed
|
|
filter1 := []byte("shard-filter-1")
|
|
worker1 := &store.WorkerInfo{
|
|
CoreId: 1,
|
|
Filter: slices.Clone(filter1),
|
|
Allocated: false,
|
|
PendingFilterFrame: 100, // proposal was made at frame 100
|
|
}
|
|
require.NoError(t, wm.RegisterWorker(worker1))
|
|
|
|
engine := &GlobalConsensusEngine{
|
|
logger: logger,
|
|
workerManager: wm,
|
|
}
|
|
|
|
// Prover has no allocations at all - the proposal never landed in registry
|
|
self := &typesconsensus.ProverInfo{
|
|
Address: []byte("prover-address"),
|
|
Allocations: []typesconsensus.ProverAllocationInfo{},
|
|
}
|
|
|
|
// At frame 105 (5 frames after proposal), filter should NOT be cleared yet
|
|
engine.reconcileWorkerAllocations(105, self)
|
|
|
|
workers, err := wm.RangeWorkers()
|
|
require.NoError(t, err)
|
|
require.Len(t, workers, 1)
|
|
assert.Equal(t, filter1, workers[0].Filter, "filter should be kept within timeout window")
|
|
assert.Equal(t, uint64(100), workers[0].PendingFilterFrame, "pending frame should be preserved")
|
|
|
|
// At frame 111 (11 frames after proposal, past the 10 frame timeout), filter SHOULD be cleared
|
|
engine.reconcileWorkerAllocations(111, self)
|
|
|
|
workers, err = wm.RangeWorkers()
|
|
require.NoError(t, err)
|
|
require.Len(t, workers, 1)
|
|
assert.Nil(t, workers[0].Filter, "filter should be cleared after proposal timeout")
|
|
assert.False(t, workers[0].Allocated, "worker should not be allocated")
|
|
assert.Equal(t, uint64(0), workers[0].PendingFilterFrame, "pending frame should be cleared")
|
|
}
|
|
|
|
func TestReconcileWorkerAllocations_UnconfirmedProposalWithNilSelf(t *testing.T) {
|
|
logger := zap.NewNop()
|
|
wm := newMockWorkerManager()
|
|
|
|
// Create a worker with a filter set from a join proposal
|
|
filter1 := []byte("shard-filter-1")
|
|
worker1 := &store.WorkerInfo{
|
|
CoreId: 1,
|
|
Filter: slices.Clone(filter1),
|
|
Allocated: false,
|
|
PendingFilterFrame: 100,
|
|
}
|
|
require.NoError(t, wm.RegisterWorker(worker1))
|
|
|
|
engine := &GlobalConsensusEngine{
|
|
logger: logger,
|
|
workerManager: wm,
|
|
}
|
|
|
|
// Even with nil self (no prover info yet), after timeout the filter should be cleared
|
|
// This handles the case where we proposed but haven't synced prover info yet
|
|
|
|
// At frame 105, still within timeout - should keep filter
|
|
engine.reconcileWorkerAllocations(105, nil)
|
|
|
|
workers, err := wm.RangeWorkers()
|
|
require.NoError(t, err)
|
|
require.Len(t, workers, 1)
|
|
assert.Equal(t, filter1, workers[0].Filter, "filter should be kept within timeout window even with nil self")
|
|
|
|
// At frame 111, past timeout - should clear filter
|
|
engine.reconcileWorkerAllocations(111, nil)
|
|
|
|
workers, err = wm.RangeWorkers()
|
|
require.NoError(t, err)
|
|
require.Len(t, workers, 1)
|
|
assert.Nil(t, workers[0].Filter, "filter should be cleared after timeout even with nil self")
|
|
}
|