ceremonyclient/node/worker/manager_test.go
Cassandra Heart 53f7c2b5c9
v2.1.0.2 (#442)
* v2.1.0.2

* restore tweaks to simlibp2p

* fix: nil ref on size calc

* fix: panic should induce shutdown from event_distributor

* fix: friendlier initialization that requires less manual kickstarting for test/devnets

* fix: fewer available shards than provers should choose shard length

* fix: update stored worker registry, improve logging for debug mode

* fix: shut the fuck up, peer log

* qol: log value should be snake cased

* fix:non-archive snap sync issues

* fix: separate X448/Decaf448 signed keys, add onion key to registry

* fix: overflow arithmetic on frame number comparison

* fix: worker registration should be idempotent if inputs are same, otherwise permit updated records

* fix: remove global prover state from size calculation

* fix: divide by zero case

* fix: eager prover

* fix: broadcast listener default

* qol: diagnostic data for peer authenticator

* fix: master/worker connectivity issue in sparse networks

tight coupling of peer and workers can sometimes interfere if mesh is sparse, so give workers a pseudoidentity but publish messages with the proper peer key

* fix: reorder steps of join creation

* fix: join verify frame source + ensure domain is properly padded (unnecessary but good for consistency)

* fix: add delegate to protobuf <-> reified join conversion

* fix: preempt prover from planning with no workers

* fix: use the unallocated workers to generate a proof

* qol: underflow causes join fail in first ten frames on test/devnets

* qol: small logging tweaks for easier log correlation in debug mode

* qol: use fisher-yates shuffle to ensure prover allocations are evenly distributed when scores are equal

* qol: separate decisional logic on post-enrollment confirmation into consensus engine, proposer, and worker manager where relevant, refactor out scoring

* reuse shard descriptors for both join planning and confirm/reject decisions

* fix: add missing interface method and amend test blossomsub to use new peer id basis

* fix: only check allocations if they exist

* fix: pomw mint proof data needs to be hierarchically under global intrinsic domain

* staging temporary state under diagnostics

* fix: first phase of distributed lock refactoring

* fix: compute intrinsic locking

* fix: hypergraph intrinsic locking

* fix: token intrinsic locking

* fix: update execution engines to support new locking model

* fix: adjust tests with new execution shape

* fix: weave in lock/unlock semantics to liveness provider

* fix lock fallthrough, add missing allocation update

* qol: additional logging for diagnostics, also testnet/devnet handling for confirmations

* fix: establish grace period on halt scenario to permit recovery

* fix: support test/devnet defaults for coverage scenarios

* fix: nil ref on consensus halts for non-archive nodes

* fix: remove unnecessary prefix from prover ref

* add test coverage for fork choice behaviors and replay – once passing, blocker (2) is resolved

* fix: no fork replay on repeat for non-archive nodes, snap now behaves correctly

* rollup of pre-liveness check lock interactions

* ahead of tests, get the protobuf/metrics-related changes out so teams can prepare

* add test coverage for distributed lock behaviors – once passing, blocker (3) is resolved

* fix: blocker (3)

* Dev docs improvements (#445)

* Make install deps script more robust

* Improve testing instructions

* Worker node should stop upon OS SIGINT/SIGTERM signal (#447)

* move pebble close to Stop()

* move deferred Stop() to Start()

* add core id to worker stop log message

* create done os signal channel and stop worker upon message to it

---------

Co-authored-by: Cassandra Heart <7929478+CassOnMars@users.noreply.github.com>

---------

Co-authored-by: Daz <daz_the_corgi@proton.me>
Co-authored-by: Black Swan <3999712+blacks1ne@users.noreply.github.com>
2025-10-23 01:03:06 -05:00

781 lines
27 KiB
Go

package worker
import (
"context"
"crypto/rand"
"encoding/hex"
"errors"
"io"
"testing"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/multiformats/go-multiaddr"
mn "github.com/multiformats/go-multiaddr/net"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"google.golang.org/grpc"
"source.quilibrium.com/quilibrium/monorepo/config"
qgrpc "source.quilibrium.com/quilibrium/monorepo/node/internal/grpc"
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/node/store"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/channel"
typesStore "source.quilibrium.com/quilibrium/monorepo/types/store"
)
// mockWorkerStore is a mock implementation of WorkerStore for testing
type mockWorkerStore struct {
workers map[uint]*typesStore.WorkerInfo
workersByFilter map[string]*typesStore.WorkerInfo
transactions []mockTransaction
}
type mockTransaction struct {
operations []operation
committed bool
aborted bool
}
type operation struct {
op string // "set" or "delete"
key []byte
value []byte
}
func newMockWorkerStore() *mockWorkerStore {
return &mockWorkerStore{
workers: make(map[uint]*typesStore.WorkerInfo),
workersByFilter: make(map[string]*typesStore.WorkerInfo),
}
}
func (m *mockWorkerStore) NewTransaction(indexed bool) (typesStore.Transaction, error) {
txn := &mockTransaction{}
m.transactions = append(m.transactions, *txn)
return txn, nil
}
func (m *mockWorkerStore) GetWorker(coreId uint) (*typesStore.WorkerInfo, error) {
worker, exists := m.workers[coreId]
if !exists {
return nil, store.ErrNotFound
}
workerCopy := *worker
return &workerCopy, nil
}
func (m *mockWorkerStore) GetWorkerByFilter(filter []byte) (*typesStore.WorkerInfo, error) {
if len(filter) == 0 {
return nil, errors.New("filter cannot be empty")
}
worker, exists := m.workersByFilter[string(filter)]
if !exists {
return nil, store.ErrNotFound
}
return worker, nil
}
func (m *mockWorkerStore) PutWorker(txn typesStore.Transaction, worker *typesStore.WorkerInfo) error {
// Check if worker already exists to clean up old filter index
if existingWorker, exists := m.workers[worker.CoreId]; exists {
if len(existingWorker.Filter) > 0 &&
string(existingWorker.Filter) != string(worker.Filter) {
delete(m.workersByFilter, string(existingWorker.Filter))
}
}
m.workers[worker.CoreId] = worker
if len(worker.Filter) > 0 {
m.workersByFilter[string(worker.Filter)] = worker
}
return nil
}
func (m *mockWorkerStore) DeleteWorker(txn typesStore.Transaction, coreId uint) error {
worker, exists := m.workers[coreId]
if !exists {
return store.ErrNotFound
}
delete(m.workers, coreId)
if len(worker.Filter) > 0 {
delete(m.workersByFilter, string(worker.Filter))
}
return nil
}
func (m *mockWorkerStore) RangeWorkers() ([]*typesStore.WorkerInfo, error) {
var workers []*typesStore.WorkerInfo
for _, worker := range m.workers {
workers = append(workers, worker)
}
return workers, nil
}
// Mock transaction implementation
func (t *mockTransaction) Set(key, value []byte) error {
t.operations = append(t.operations, operation{op: "set", key: key, value: value})
return nil
}
func (t *mockTransaction) Delete(key []byte) error {
t.operations = append(t.operations, operation{op: "delete", key: key})
return nil
}
func (t *mockTransaction) Get(key []byte) ([]byte, io.Closer, error) {
return nil, io.NopCloser(nil), nil
}
func (t *mockTransaction) Commit() error {
t.committed = true
return nil
}
func (t *mockTransaction) NewIter(lowerBound []byte, upperBound []byte) (typesStore.Iterator, error) {
return nil, nil
}
func (t *mockTransaction) DeleteRange(lowerBound []byte, upperBound []byte) error {
return nil
}
func (t *mockTransaction) Abort() error {
t.aborted = true
return nil
}
func TestWorkerManager_StartStop(t *testing.T) {
logger, _ := zap.NewDevelopment()
store := newMockWorkerStore()
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
// Test starting the manager
ctx := context.Background()
err := manager.Start(ctx)
require.NoError(t, err)
// Test starting again should fail
err = manager.Start(ctx)
assert.Error(t, err)
assert.Contains(t, err.Error(), "already started")
// Test stopping the manager
err = manager.Stop()
require.NoError(t, err)
// Test stopping again should fail
err = manager.Stop()
assert.Error(t, err)
assert.Contains(t, err.Error(), "not started")
}
func TestWorkerManager_RegisterWorker(t *testing.T) {
logger, _ := zap.NewDevelopment()
store := newMockWorkerStore()
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
// Start the manager
ctx := context.Background()
err := manager.Start(ctx)
require.NoError(t, err)
defer manager.Stop()
// Register a worker
workerInfo := &typesStore.WorkerInfo{
CoreId: 1,
ListenMultiaddr: "/ip4/127.0.0.1/tcp/8080",
StreamListenMultiaddr: "/ip4/127.0.0.1/tcp/8081",
Filter: []byte("test-filter"),
TotalStorage: 1000000,
Automatic: true,
}
err = manager.RegisterWorker(workerInfo)
require.NoError(t, err)
// Verify worker was stored
storedWorker, err := store.GetWorker(1)
require.NoError(t, err)
assert.Equal(t, workerInfo.CoreId, storedWorker.CoreId)
assert.Equal(t, workerInfo.ListenMultiaddr, storedWorker.ListenMultiaddr)
assert.Equal(t, workerInfo.Filter, storedWorker.Filter)
}
func TestWorkerManager_RegisterWorkerNotStarted(t *testing.T) {
logger, _ := zap.NewDevelopment()
store := newMockWorkerStore()
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
// Try to register without starting
workerInfo := &typesStore.WorkerInfo{
CoreId: 1,
ListenMultiaddr: "/ip4/127.0.0.1/tcp/8080",
StreamListenMultiaddr: "/ip4/127.0.0.1/tcp/8081",
Filter: []byte("test-filter"),
TotalStorage: 1000000,
Automatic: true,
}
err := manager.RegisterWorker(workerInfo)
assert.Error(t, err)
assert.Contains(t, err.Error(), "not started")
}
func TestWorkerManager_AllocateDeallocateWorker(t *testing.T) {
logger, _ := zap.NewDevelopment()
store := newMockWorkerStore()
engcfg := (config.EngineConfig{}).WithDefaults()
engcfg.DataWorkerCount = 2
engcfg.DataWorkerP2PMultiaddrs = []string{"/ip4/0.0.0.0/tcp/8000", "/ip4/0.0.0.0/tcp/8000"}
engcfg.DataWorkerStreamMultiaddrs = []string{"/ip4/0.0.0.0/tcp/60002", "/ip4/0.0.0.0/tcp/60002"}
p2pcfg := (config.P2PConfig{}).WithDefaults()
priv, _, err := crypto.GenerateEd448Key(rand.Reader)
require.NoError(t, err)
k, _ := priv.Raw()
p2pcfg.PeerPrivKey = hex.EncodeToString(k)
manager := NewWorkerManager(store, logger, &config.Config{Engine: &engcfg, P2P: &p2pcfg}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
auth := p2p.NewPeerAuthenticator(
zap.L(),
&p2pcfg,
nil,
nil,
nil,
nil,
nil,
map[string]channel.AllowedPeerPolicyType{
"quilibrium.node.application.pb.HypergraphComparisonService": channel.AnyProverPeer,
"quilibrium.node.node.pb.DataIPCService": channel.OnlySelfPeer,
"quilibrium.node.global.pb.GlobalService": channel.OnlyGlobalProverPeer,
"quilibrium.node.global.pb.AppShardService": channel.OnlyShardProverPeer,
"quilibrium.node.global.pb.OnionService": channel.AnyPeer,
"quilibrium.node.global.pb.KeyRegistryService": channel.OnlySelfPeer,
},
map[string]channel.AllowedPeerPolicyType{
"/quilibrium.node.application.pb.HypergraphComparisonService/HyperStream": channel.OnlyShardProverPeer,
"/quilibrium.node.global.pb.MixnetService/GetTag": channel.AnyPeer,
"/quilibrium.node.global.pb.MixnetService/PutTag": channel.AnyPeer,
"/quilibrium.node.global.pb.MixnetService/PutMessage": channel.AnyPeer,
"/quilibrium.node.global.pb.MixnetService/RoundStream": channel.OnlyGlobalProverPeer,
"/quilibrium.node.global.pb.DispatchService/PutInboxMessage": channel.OnlySelfPeer,
"/quilibrium.node.global.pb.DispatchService/GetInboxMessages": channel.OnlySelfPeer,
"/quilibrium.node.global.pb.DispatchService/PutHub": channel.OnlySelfPeer,
"/quilibrium.node.global.pb.DispatchService/GetHub": channel.OnlySelfPeer,
"/quilibrium.node.global.pb.DispatchService/Sync": channel.AnyProverPeer,
"/quilibrium.node.ferretproxy.pb.FerretProxy/AliceProxy": channel.OnlySelfPeer,
"/quilibrium.node.ferretproxy.pb.FerretProxy/BobProxy": channel.AnyPeer,
},
)
tlsCreds, err := auth.CreateServerTLSCredentials()
require.NoError(t, err)
server := qgrpc.NewServer(
grpc.Creds(tlsCreds),
grpc.MaxRecvMsgSize(10*1024*1024),
grpc.MaxSendMsgSize(10*1024*1024),
)
mg, err := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/60002")
require.NoError(t, err)
lis, err := mn.Listen(mg)
require.NoError(t, err)
go func() {
protobufs.RegisterDataIPCServiceServer(server, &mockIPC{})
if err := server.Serve(mn.NetListener(lis)); err != nil {
zap.L().Info("terminating server", zap.Error(err))
}
}()
defer server.Stop()
// Start the manager
ctx := context.Background()
err = manager.Start(ctx)
require.NoError(t, err)
defer manager.Stop()
// Register a worker first
workerInfo := &typesStore.WorkerInfo{
CoreId: 1,
ListenMultiaddr: "/ip4/127.0.0.1/tcp/8080",
StreamListenMultiaddr: "/ip4/127.0.0.1/tcp/8081",
Filter: []byte("initial-filter"),
TotalStorage: 1000000,
Automatic: false,
}
err = manager.RegisterWorker(workerInfo)
require.NoError(t, err)
// Allocate the worker with a new filter
newFilter := []byte("new-filter")
err = manager.AllocateWorker(1, newFilter)
require.NoError(t, err)
// Verify filter was updated
filter, err := manager.GetFilterByWorkerId(1)
require.NoError(t, err)
assert.Equal(t, newFilter, filter)
// Verify we can get worker by new filter
workerId, err := manager.GetWorkerIdByFilter(newFilter)
require.NoError(t, err)
assert.Equal(t, uint(1), workerId)
// Test allocating again should fail
err = manager.AllocateWorker(1, nil)
assert.Error(t, err)
assert.Contains(t, err.Error(), "already allocated")
// Deallocate the worker
err = manager.DeallocateWorker(1)
require.NoError(t, err)
// Test deallocating again should fail
err = manager.DeallocateWorker(1)
assert.Error(t, err)
assert.Contains(t, err.Error(), "not allocated")
}
func TestWorkerManager_AllocateNonExistentWorker(t *testing.T) {
logger, _ := zap.NewDevelopment()
store := newMockWorkerStore()
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
// Start the manager
ctx := context.Background()
err := manager.Start(ctx)
require.NoError(t, err)
defer manager.Stop()
// Try to allocate non-existent worker
err = manager.AllocateWorker(999, []byte("filter"))
assert.Error(t, err)
assert.Contains(t, err.Error(), "not found")
}
func TestWorkerManager_GetWorkerIdByFilter(t *testing.T) {
logger, _ := zap.NewDevelopment()
store := newMockWorkerStore()
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
// Start the manager
ctx := context.Background()
err := manager.Start(ctx)
require.NoError(t, err)
defer manager.Stop()
// Register a worker
filter := []byte("unique-filter")
workerInfo := &typesStore.WorkerInfo{
CoreId: 42,
ListenMultiaddr: "/ip4/127.0.0.1/tcp/8080",
StreamListenMultiaddr: "/ip4/127.0.0.1/tcp/8081",
Filter: filter,
TotalStorage: 1000000,
Automatic: true,
}
err = manager.RegisterWorker(workerInfo)
require.NoError(t, err)
// Get worker ID by filter
workerId, err := manager.GetWorkerIdByFilter(filter)
require.NoError(t, err)
assert.Equal(t, uint(42), workerId)
// Test non-existent filter
_, err = manager.GetWorkerIdByFilter([]byte("non-existent"))
assert.Error(t, err)
assert.Contains(t, err.Error(), "no worker found")
}
func TestWorkerManager_GetFilterByWorkerId(t *testing.T) {
logger, _ := zap.NewDevelopment()
store := newMockWorkerStore()
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
// Start the manager
ctx := context.Background()
err := manager.Start(ctx)
require.NoError(t, err)
defer manager.Stop()
// Register a worker
filter := []byte("worker-filter")
workerInfo := &typesStore.WorkerInfo{
CoreId: 7,
ListenMultiaddr: "/ip4/127.0.0.1/tcp/8080",
StreamListenMultiaddr: "/ip4/127.0.0.1/tcp/8081",
Filter: filter,
TotalStorage: 1000000,
Automatic: false,
}
err = manager.RegisterWorker(workerInfo)
require.NoError(t, err)
// Get filter by worker ID
retrievedFilter, err := manager.GetFilterByWorkerId(7)
require.NoError(t, err)
assert.Equal(t, filter, retrievedFilter)
// Test non-existent worker
_, err = manager.GetFilterByWorkerId(999)
assert.Error(t, err)
assert.Contains(t, err.Error(), "not found")
}
func TestWorkerManager_LoadWorkersOnStart(t *testing.T) {
logger, _ := zap.NewDevelopment()
store := newMockWorkerStore()
// Pre-populate store with workers
worker1 := &typesStore.WorkerInfo{
CoreId: 1,
ListenMultiaddr: "/ip4/127.0.0.1/tcp/8080",
StreamListenMultiaddr: "/ip4/127.0.0.1/tcp/8081",
Filter: []byte("filter1"),
TotalStorage: 1000000,
Automatic: true,
}
worker2 := &typesStore.WorkerInfo{
CoreId: 2,
ListenMultiaddr: "/ip4/127.0.0.1/tcp/8082",
StreamListenMultiaddr: "/ip4/127.0.0.1/tcp/8083",
Filter: []byte("filter2"),
TotalStorage: 2000000,
Automatic: false,
}
store.workers[1] = worker1
store.workersByFilter[string(worker1.Filter)] = worker1
store.workers[2] = worker2
store.workersByFilter[string(worker2.Filter)] = worker2
// Create manager and start it
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
ctx := context.Background()
err := manager.Start(ctx)
require.NoError(t, err)
defer manager.Stop()
// Verify workers were loaded into cache
workerId1, err := manager.GetWorkerIdByFilter([]byte("filter1"))
require.NoError(t, err)
assert.Equal(t, uint(1), workerId1)
workerId2, err := manager.GetWorkerIdByFilter([]byte("filter2"))
require.NoError(t, err)
assert.Equal(t, uint(2), workerId2)
filter1, err := manager.GetFilterByWorkerId(1)
require.NoError(t, err)
assert.Equal(t, []byte("filter1"), filter1)
filter2, err := manager.GetFilterByWorkerId(2)
require.NoError(t, err)
assert.Equal(t, []byte("filter2"), filter2)
}
func TestWorkerManager_ConcurrentOperations(t *testing.T) {
logger, _ := zap.NewDevelopment()
store := newMockWorkerStore()
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
// Start the manager
ctx := context.Background()
err := manager.Start(ctx)
require.NoError(t, err)
defer manager.Stop()
// Register multiple workers concurrently
numWorkers := 10
done := make(chan error, numWorkers)
for i := 0; i < numWorkers; i++ {
go func(id int) {
workerInfo := &typesStore.WorkerInfo{
CoreId: uint(id),
ListenMultiaddr: "/ip4/127.0.0.1/tcp/8080",
StreamListenMultiaddr: "/ip4/127.0.0.1/tcp/8081",
Filter: []byte(string(rune('a' + id))),
TotalStorage: uint(1000000 * (id + 1)),
Automatic: id%2 == 0,
}
done <- manager.RegisterWorker(workerInfo)
}(i)
}
// Collect results
for i := 0; i < numWorkers; i++ {
err := <-done
assert.NoError(t, err)
}
// Verify all workers were registered
workers, err := store.RangeWorkers()
require.NoError(t, err)
assert.Len(t, workers, numWorkers)
}
func TestWorkerManager_EmptyFilter(t *testing.T) {
logger, _ := zap.NewDevelopment()
store := newMockWorkerStore()
engcfg := (config.EngineConfig{}).WithDefaults()
engcfg.DataWorkerCount = 1
engcfg.DataWorkerP2PMultiaddrs = []string{"/ip4/0.0.0.0/tcp/8000"}
engcfg.DataWorkerStreamMultiaddrs = []string{"/ip4/0.0.0.0/tcp/60000"}
p2pcfg := (config.P2PConfig{}).WithDefaults()
priv, _, err := crypto.GenerateEd448Key(rand.Reader)
require.NoError(t, err)
k, _ := priv.Raw()
p2pcfg.PeerPrivKey = hex.EncodeToString(k)
manager := NewWorkerManager(store, logger, &config.Config{Engine: &engcfg, P2P: &p2pcfg}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
auth := p2p.NewPeerAuthenticator(
zap.L(),
&p2pcfg,
nil,
nil,
nil,
nil,
nil,
map[string]channel.AllowedPeerPolicyType{
"quilibrium.node.application.pb.HypergraphComparisonService": channel.AnyProverPeer,
"quilibrium.node.node.pb.DataIPCService": channel.OnlySelfPeer,
"quilibrium.node.global.pb.GlobalService": channel.OnlyGlobalProverPeer,
"quilibrium.node.global.pb.AppShardService": channel.OnlyShardProverPeer,
"quilibrium.node.global.pb.OnionService": channel.AnyPeer,
"quilibrium.node.global.pb.KeyRegistryService": channel.OnlySelfPeer,
},
map[string]channel.AllowedPeerPolicyType{
"/quilibrium.node.application.pb.HypergraphComparisonService/HyperStream": channel.OnlyShardProverPeer,
"/quilibrium.node.global.pb.MixnetService/GetTag": channel.AnyPeer,
"/quilibrium.node.global.pb.MixnetService/PutTag": channel.AnyPeer,
"/quilibrium.node.global.pb.MixnetService/PutMessage": channel.AnyPeer,
"/quilibrium.node.global.pb.MixnetService/RoundStream": channel.OnlyGlobalProverPeer,
"/quilibrium.node.global.pb.DispatchService/PutInboxMessage": channel.OnlySelfPeer,
"/quilibrium.node.global.pb.DispatchService/GetInboxMessages": channel.OnlySelfPeer,
"/quilibrium.node.global.pb.DispatchService/PutHub": channel.OnlySelfPeer,
"/quilibrium.node.global.pb.DispatchService/GetHub": channel.OnlySelfPeer,
"/quilibrium.node.global.pb.DispatchService/Sync": channel.AnyProverPeer,
"/quilibrium.node.ferretproxy.pb.FerretProxy/AliceProxy": channel.OnlySelfPeer,
"/quilibrium.node.ferretproxy.pb.FerretProxy/BobProxy": channel.AnyPeer,
},
)
tlsCreds, err := auth.CreateServerTLSCredentials()
require.NoError(t, err)
server := qgrpc.NewServer(
grpc.Creds(tlsCreds),
grpc.MaxRecvMsgSize(10*1024*1024),
grpc.MaxSendMsgSize(10*1024*1024),
)
mg, err := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/60000")
require.NoError(t, err)
lis, err := mn.Listen(mg)
require.NoError(t, err)
go func() {
protobufs.RegisterDataIPCServiceServer(server, &mockIPC{})
if err := server.Serve(mn.NetListener(lis)); err != nil {
zap.L().Info("terminating server", zap.Error(err))
}
}()
defer server.Stop()
// Start the manager
ctx := context.Background()
err = manager.Start(ctx)
require.NoError(t, err)
defer manager.Stop()
// Register a worker with empty filter
workerInfo := &typesStore.WorkerInfo{
CoreId: 1,
ListenMultiaddr: "/ip4/127.0.0.1/tcp/8080",
StreamListenMultiaddr: "/ip4/127.0.0.1/tcp/8081",
Filter: []byte{}, // Empty filter
TotalStorage: 1000000,
Automatic: true,
Allocated: false,
}
err = manager.RegisterWorker(workerInfo)
require.NoError(t, err)
// Verify worker was stored
storedWorker, err := store.GetWorker(1)
require.NoError(t, err)
assert.Equal(t, workerInfo.CoreId, storedWorker.CoreId)
assert.Empty(t, storedWorker.Filter)
// Try to get worker by empty filter - should fail appropriately
_, err = manager.GetWorkerIdByFilter([]byte{})
assert.Error(t, err)
// Can still get filter (empty) by worker ID
filter, err := manager.GetFilterByWorkerId(1)
require.NoError(t, err)
assert.Empty(t, filter)
// Allocate with a filter
newFilter := []byte("new-filter")
err = manager.AllocateWorker(1, newFilter)
require.NoError(t, err)
// Now we should be able to get worker by the new filter
workerId, err := manager.GetWorkerIdByFilter(newFilter)
require.NoError(t, err)
assert.Equal(t, uint(1), workerId)
// Deallocate the worker
err = manager.DeallocateWorker(1)
require.NoError(t, err)
// Filter should still be set
filter, err = manager.GetFilterByWorkerId(1)
require.NoError(t, err)
assert.Equal(t, newFilter, filter)
// Can still get by filter after deallocation
workerId, err = manager.GetWorkerIdByFilter(newFilter)
require.NoError(t, err)
assert.Equal(t, uint(1), workerId)
}
func TestWorkerManager_FilterUpdate(t *testing.T) {
logger, _ := zap.NewDevelopment()
store := newMockWorkerStore()
engcfg := (config.EngineConfig{}).WithDefaults()
engcfg.DataWorkerCount = 2
engcfg.DataWorkerP2PMultiaddrs = []string{"/ip4/0.0.0.0/tcp/8000", "/ip4/0.0.0.0/tcp/8000"}
engcfg.DataWorkerStreamMultiaddrs = []string{"/ip4/0.0.0.0/tcp/60001", "/ip4/0.0.0.0/tcp/60001"}
p2pcfg := (config.P2PConfig{}).WithDefaults()
priv, _, err := crypto.GenerateEd448Key(rand.Reader)
require.NoError(t, err)
k, _ := priv.Raw()
p2pcfg.PeerPrivKey = hex.EncodeToString(k)
manager := NewWorkerManager(store, logger, &config.Config{Engine: &engcfg, P2P: &p2pcfg}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
auth := p2p.NewPeerAuthenticator(
zap.L(),
&p2pcfg,
nil,
nil,
nil,
nil,
nil,
map[string]channel.AllowedPeerPolicyType{
"quilibrium.node.application.pb.HypergraphComparisonService": channel.AnyProverPeer,
"quilibrium.node.node.pb.DataIPCService": channel.OnlySelfPeer,
"quilibrium.node.global.pb.GlobalService": channel.OnlyGlobalProverPeer,
"quilibrium.node.global.pb.AppShardService": channel.OnlyShardProverPeer,
"quilibrium.node.global.pb.OnionService": channel.AnyPeer,
"quilibrium.node.global.pb.KeyRegistryService": channel.OnlySelfPeer,
},
map[string]channel.AllowedPeerPolicyType{
"/quilibrium.node.application.pb.HypergraphComparisonService/HyperStream": channel.OnlyShardProverPeer,
"/quilibrium.node.global.pb.MixnetService/GetTag": channel.AnyPeer,
"/quilibrium.node.global.pb.MixnetService/PutTag": channel.AnyPeer,
"/quilibrium.node.global.pb.MixnetService/PutMessage": channel.AnyPeer,
"/quilibrium.node.global.pb.MixnetService/RoundStream": channel.OnlyGlobalProverPeer,
"/quilibrium.node.global.pb.DispatchService/PutInboxMessage": channel.OnlySelfPeer,
"/quilibrium.node.global.pb.DispatchService/GetInboxMessages": channel.OnlySelfPeer,
"/quilibrium.node.global.pb.DispatchService/PutHub": channel.OnlySelfPeer,
"/quilibrium.node.global.pb.DispatchService/GetHub": channel.OnlySelfPeer,
"/quilibrium.node.global.pb.DispatchService/Sync": channel.AnyProverPeer,
"/quilibrium.node.ferretproxy.pb.FerretProxy/AliceProxy": channel.OnlySelfPeer,
"/quilibrium.node.ferretproxy.pb.FerretProxy/BobProxy": channel.AnyPeer,
},
)
tlsCreds, err := auth.CreateServerTLSCredentials()
require.NoError(t, err)
server := qgrpc.NewServer(
grpc.Creds(tlsCreds),
grpc.MaxRecvMsgSize(10*1024*1024),
grpc.MaxSendMsgSize(10*1024*1024),
)
mg, err := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/60001")
require.NoError(t, err)
lis, err := mn.Listen(mg)
require.NoError(t, err)
go func() {
protobufs.RegisterDataIPCServiceServer(server, &mockIPC{})
if err := server.Serve(mn.NetListener(lis)); err != nil {
zap.L().Info("terminating server", zap.Error(err))
}
}()
defer server.Stop()
// Start the manager
ctx := context.Background()
err = manager.Start(ctx)
require.NoError(t, err)
defer manager.Stop()
// Register a worker with a filter
oldFilter := []byte("old-filter")
workerInfo := &typesStore.WorkerInfo{
CoreId: 2,
ListenMultiaddr: "/ip4/127.0.0.1/tcp/8080",
StreamListenMultiaddr: "/ip4/127.0.0.1/tcp/8081",
Filter: oldFilter,
TotalStorage: 1000000,
Automatic: false,
Allocated: false,
}
err = manager.RegisterWorker(workerInfo)
require.NoError(t, err)
// Verify we can get by old filter
workerId, err := manager.GetWorkerIdByFilter(oldFilter)
require.NoError(t, err)
assert.Equal(t, uint(2), workerId)
// Allocate with a new filter
newFilter := []byte("new-filter")
err = manager.AllocateWorker(2, newFilter)
require.NoError(t, err)
// Old filter should no longer work
_, err = manager.GetWorkerIdByFilter(oldFilter)
assert.Error(t, err, "Looking up old filter should fail after update")
// New filter should work
workerId, err = manager.GetWorkerIdByFilter(newFilter)
require.NoError(t, err)
assert.Equal(t, uint(2), workerId)
// Allocate with empty filter (should keep existing filter)
err = manager.AllocateWorker(2, []byte{})
assert.Error(t, err) // Should fail because already allocated
// Deallocate first
err = manager.DeallocateWorker(2)
require.NoError(t, err)
// Now allocate with empty filter - should keep existing filter
err = manager.AllocateWorker(2, []byte{})
require.NoError(t, err)
// Should still have the new filter
filter, err := manager.GetFilterByWorkerId(2)
require.NoError(t, err)
assert.Equal(t, newFilter, filter)
}
type mockIPC struct {
protobufs.DataIPCServiceServer
}
// Respawn implements protobufs.DataIPCServiceServer.
func (m *mockIPC) Respawn(context.Context, *protobufs.RespawnRequest) (*protobufs.RespawnResponse, error) {
return &protobufs.RespawnResponse{}, nil
}