mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 18:37:26 +08:00
* experiment: reject bad peer info messages * v2.1.0.18 preview * add tagged sync * Add missing hypergraph changes * small tweaks to sync * allow local sync, use it for provers with workers * missing file * resolve build error * resolve sync issue, remove raw sync * resolve deletion promotion bug * resolve sync abstraction leak from tree deletion changes * rearrange prover sync * remove pruning from sync * restore removed sync flag * fix: sync, event stream deadlock, heuristic scoring of better shards * resolve hanging shutdown + pubsub proxy issue * further bugfixes: sync (restore old leaf sync), pubsub shutdown, merge events * fix: clean up rust ffi, background coverage events, and sync tweaks * fix: linking issue for channel, connectivity test aggression, sync regression, join tests * fix: disjoint sync, improper application of filter * resolve sync/reel/validation deadlock * adjust sync to handle no leaf edge cases, multi-path segment traversal * use simpler sync * faster, simpler sync with some debug extras * migration to recalculate * don't use batch * square up the roots * fix nil pointer * fix: seniority calculation, sync race condition, migration * make sync dumber * fix: tree deletion issue * fix: missing seniority merge request canonical serialization * address issues from previous commit test * stale workers should be cleared * remove missing gap check * rearrange collect, reduce sync logging noise * fix: the disjoint leaf/branch sync case * nuclear option on sync failures * v2.1.0.18, finalized
1311 lines
44 KiB
Go
1311 lines
44 KiB
Go
//go:build integrationtest
|
|
// +build integrationtest
|
|
|
|
package global
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/binary"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"math/big"
|
|
"slices"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/iden3/go-iden3-crypto/poseidon"
|
|
"github.com/libp2p/go-libp2p"
|
|
pcrypto "github.com/libp2p/go-libp2p/core/crypto"
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/multiformats/go-multiaddr"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/mock"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zapcore"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/protobuf/types/known/wrapperspb"
|
|
"source.quilibrium.com/quilibrium/monorepo/bls48581"
|
|
"source.quilibrium.com/quilibrium/monorepo/bulletproofs"
|
|
"source.quilibrium.com/quilibrium/monorepo/channel"
|
|
"source.quilibrium.com/quilibrium/monorepo/config"
|
|
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
|
|
hgcrdt "source.quilibrium.com/quilibrium/monorepo/hypergraph"
|
|
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
|
|
"source.quilibrium.com/quilibrium/monorepo/node/compiler"
|
|
"source.quilibrium.com/quilibrium/monorepo/node/consensus/difficulty"
|
|
"source.quilibrium.com/quilibrium/monorepo/node/consensus/events"
|
|
"source.quilibrium.com/quilibrium/monorepo/node/consensus/fees"
|
|
"source.quilibrium.com/quilibrium/monorepo/node/consensus/provers"
|
|
"source.quilibrium.com/quilibrium/monorepo/node/consensus/registration"
|
|
"source.quilibrium.com/quilibrium/monorepo/node/consensus/reward"
|
|
consensustime "source.quilibrium.com/quilibrium/monorepo/node/consensus/time"
|
|
"source.quilibrium.com/quilibrium/monorepo/node/consensus/validator"
|
|
"source.quilibrium.com/quilibrium/monorepo/node/keys"
|
|
qp2p "source.quilibrium.com/quilibrium/monorepo/node/p2p"
|
|
"source.quilibrium.com/quilibrium/monorepo/node/store"
|
|
"source.quilibrium.com/quilibrium/monorepo/node/tests"
|
|
"source.quilibrium.com/quilibrium/monorepo/protobufs"
|
|
tconsensus "source.quilibrium.com/quilibrium/monorepo/types/consensus"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/crypto"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/execution/intrinsics"
|
|
thypergraph "source.quilibrium.com/quilibrium/monorepo/types/hypergraph"
|
|
tkeys "source.quilibrium.com/quilibrium/monorepo/types/keys"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/p2p"
|
|
qcrypto "source.quilibrium.com/quilibrium/monorepo/types/tries"
|
|
"source.quilibrium.com/quilibrium/monorepo/vdf"
|
|
"source.quilibrium.com/quilibrium/monorepo/verenc"
|
|
)
|
|
|
|
// mockIntegrationPubSub is a pubsub mock for integration testing
|
|
type mockIntegrationPubSub struct {
|
|
mock.Mock
|
|
mu sync.RWMutex
|
|
subscribers map[string][]func(message *pb.Message) error
|
|
validators map[string]func(peerID peer.ID, message *pb.Message) p2p.ValidationResult
|
|
peerID []byte
|
|
peerCount int
|
|
networkPeers map[string]*mockIntegrationPubSub
|
|
frames []*protobufs.GlobalFrame
|
|
onPublish func(bitmask []byte, data []byte)
|
|
deliveryComplete chan struct{} // Signal when all deliveries are done
|
|
msgProcessor func(*pb.Message) // Custom message processor for tracking
|
|
underlyingHost host.Host
|
|
underlyingBlossomSub *qp2p.BlossomSub
|
|
}
|
|
|
|
// Close implements p2p.PubSub.
|
|
func (m *mockIntegrationPubSub) Close() error {
|
|
panic("unimplemented")
|
|
}
|
|
|
|
// SetShutdownContext implements p2p.PubSub.
|
|
func (m *mockIntegrationPubSub) SetShutdownContext(ctx context.Context) {
|
|
// Forward to underlying blossomsub if available
|
|
if m.underlyingBlossomSub != nil {
|
|
m.underlyingBlossomSub.SetShutdownContext(ctx)
|
|
}
|
|
}
|
|
|
|
// GetOwnMultiaddrs implements p2p.PubSub.
|
|
func (m *mockIntegrationPubSub) GetOwnMultiaddrs() []multiaddr.Multiaddr {
|
|
ma, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/8336")
|
|
return []multiaddr.Multiaddr{ma}
|
|
}
|
|
|
|
func newMockIntegrationPubSub(config *config.Config, logger *zap.Logger, peerID []byte, host host.Host, privKey pcrypto.PrivKey, bootstrapHosts []host.Host) *mockIntegrationPubSub {
|
|
blossomSub := qp2p.NewBlossomSubWithHost(config.P2P, config.Engine, logger, 1, true, host, privKey, bootstrapHosts)
|
|
return &mockIntegrationPubSub{
|
|
subscribers: make(map[string][]func(message *pb.Message) error),
|
|
validators: make(map[string]func(peerID peer.ID, message *pb.Message) p2p.ValidationResult),
|
|
peerID: peerID,
|
|
peerCount: 0, // Start with 0 to trigger genesis
|
|
networkPeers: make(map[string]*mockIntegrationPubSub),
|
|
frames: make([]*protobufs.GlobalFrame, 0),
|
|
deliveryComplete: make(chan struct{}),
|
|
underlyingHost: host,
|
|
underlyingBlossomSub: blossomSub,
|
|
}
|
|
}
|
|
|
|
func (m *mockIntegrationPubSub) Subscribe(bitmask []byte, handler func(message *pb.Message) error) error {
|
|
m.mu.Lock()
|
|
key := string(bitmask)
|
|
m.subscribers[key] = append(m.subscribers[key], handler)
|
|
m.mu.Unlock()
|
|
|
|
return m.underlyingBlossomSub.Subscribe(bitmask, handler)
|
|
}
|
|
|
|
func (m *mockIntegrationPubSub) PublishToBitmask(bitmask []byte, data []byte) error {
|
|
if m.onPublish != nil {
|
|
m.onPublish(bitmask, data)
|
|
}
|
|
|
|
// Check if data is long enough to contain type prefix
|
|
if len(data) >= 4 {
|
|
// Read type prefix from first 4 bytes
|
|
typePrefix := binary.BigEndian.Uint32(data[:4])
|
|
|
|
// Check if it's a GlobalFrame
|
|
if typePrefix == protobufs.GlobalProposalType {
|
|
frame := &protobufs.GlobalProposal{}
|
|
if err := frame.FromCanonicalBytes(data); err == nil {
|
|
m.mu.Lock()
|
|
m.frames = append(m.frames, frame.State)
|
|
m.mu.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Count total handlers to track
|
|
m.mu.RLock()
|
|
handlers := m.subscribers[string(bitmask)]
|
|
totalHandlers := len(handlers) // self handlers
|
|
for _, peer := range m.networkPeers {
|
|
peer.mu.RLock()
|
|
totalHandlers += len(peer.subscribers[string(bitmask)])
|
|
peer.mu.RUnlock()
|
|
}
|
|
m.mu.RUnlock()
|
|
|
|
return m.underlyingBlossomSub.PublishToBitmask(bitmask, data)
|
|
}
|
|
|
|
func (m *mockIntegrationPubSub) deliverMessage(bitmask []byte, message *pb.Message, wg *sync.WaitGroup) {
|
|
// Check validator first
|
|
m.mu.RLock()
|
|
validator := m.validators[string(bitmask)]
|
|
m.mu.RUnlock()
|
|
|
|
if validator != nil {
|
|
result := validator(peer.ID(message.From), message)
|
|
if result != p2p.ValidationResultAccept {
|
|
// Message rejected by validator, still need to decrement wait group
|
|
m.mu.RLock()
|
|
handlerCount := len(m.subscribers[string(bitmask)])
|
|
m.mu.RUnlock()
|
|
for i := 0; i < handlerCount; i++ {
|
|
wg.Done()
|
|
}
|
|
return
|
|
}
|
|
}
|
|
|
|
m.mu.RLock()
|
|
handlers := m.subscribers[string(bitmask)]
|
|
m.mu.RUnlock()
|
|
|
|
// Create wrapped handler that decrements wait group
|
|
wrappedHandler := func(h func(*pb.Message) error, msg *pb.Message) {
|
|
defer wg.Done()
|
|
if m.msgProcessor != nil {
|
|
m.msgProcessor(msg)
|
|
}
|
|
h(msg)
|
|
}
|
|
|
|
// Deliver asynchronously
|
|
for _, handler := range handlers {
|
|
go wrappedHandler(handler, message)
|
|
}
|
|
}
|
|
|
|
func (m *mockIntegrationPubSub) RegisterValidator(bitmask []byte, validator func(peerID peer.ID, message *pb.Message) p2p.ValidationResult, sync bool) error {
|
|
m.mu.Lock()
|
|
m.validators[string(bitmask)] = validator
|
|
m.mu.Unlock()
|
|
return m.underlyingBlossomSub.RegisterValidator(bitmask, validator, sync)
|
|
}
|
|
|
|
func (m *mockIntegrationPubSub) UnregisterValidator(bitmask []byte) error {
|
|
m.mu.Lock()
|
|
delete(m.validators, string(bitmask))
|
|
m.mu.Unlock()
|
|
return m.underlyingBlossomSub.UnregisterValidator(bitmask)
|
|
}
|
|
|
|
func (m *mockIntegrationPubSub) Unsubscribe(bitmask []byte, raw bool) {
|
|
m.mu.Lock()
|
|
delete(m.subscribers, string(bitmask))
|
|
m.mu.Unlock()
|
|
m.underlyingBlossomSub.Unsubscribe(bitmask, raw)
|
|
}
|
|
|
|
func (m *mockIntegrationPubSub) GetPeerID() []byte {
|
|
return m.peerID
|
|
}
|
|
|
|
func (m *mockIntegrationPubSub) GetPeerstoreCount() int {
|
|
return m.peerCount
|
|
}
|
|
|
|
func (m *mockIntegrationPubSub) GetNetworkInfo() *protobufs.NetworkInfoResponse {
|
|
return &protobufs.NetworkInfoResponse{
|
|
NetworkInfo: []*protobufs.NetworkInfo{},
|
|
}
|
|
}
|
|
|
|
// Stub implementations for other interface methods
|
|
func (m *mockIntegrationPubSub) Publish(address []byte, data []byte) error { return nil }
|
|
func (m *mockIntegrationPubSub) GetNetworkPeersCount() int { return m.peerCount }
|
|
func (m *mockIntegrationPubSub) GetRandomPeer(bitmask []byte) ([]byte, error) { return nil, nil }
|
|
func (m *mockIntegrationPubSub) GetMultiaddrOfPeerStream(ctx context.Context, peerId []byte) <-chan multiaddr.Multiaddr {
|
|
return nil
|
|
}
|
|
func (m *mockIntegrationPubSub) GetMultiaddrOfPeer(peerId []byte) string { return "" }
|
|
func (m *mockIntegrationPubSub) StartDirectChannelListener(key []byte, purpose string, server *grpc.Server) error {
|
|
return nil
|
|
}
|
|
func (m *mockIntegrationPubSub) GetDirectChannel(ctx context.Context, peerId []byte, purpose string) (*grpc.ClientConn, error) {
|
|
return nil, nil
|
|
}
|
|
func (m *mockIntegrationPubSub) SignMessage(msg []byte) ([]byte, error) { return nil, nil }
|
|
func (m *mockIntegrationPubSub) GetPublicKey() []byte { return nil }
|
|
func (m *mockIntegrationPubSub) GetPeerScore(peerId []byte) int64 { return 0 }
|
|
func (m *mockIntegrationPubSub) SetPeerScore(peerId []byte, score int64) {}
|
|
func (m *mockIntegrationPubSub) AddPeerScore(peerId []byte, scoreDelta int64) {}
|
|
func (m *mockIntegrationPubSub) Reconnect(peerId []byte) error { return nil }
|
|
func (m *mockIntegrationPubSub) Bootstrap(ctx context.Context) error { return nil }
|
|
func (m *mockIntegrationPubSub) DiscoverPeers(ctx context.Context) error { return nil }
|
|
func (m *mockIntegrationPubSub) GetNetwork() uint { return 0 }
|
|
func (m *mockIntegrationPubSub) IsPeerConnected(peerId []byte) bool { return false }
|
|
func (m *mockIntegrationPubSub) Reachability() *wrapperspb.BoolValue { return nil }
|
|
|
|
// Helper functions
|
|
|
|
// calculateProverAddress calculates the prover address from public key using poseidon hash
|
|
func calculateProverAddress(publicKey []byte) []byte {
|
|
hash, err := poseidon.HashBytes(publicKey)
|
|
if err != nil {
|
|
panic(err) // Should not happen in tests
|
|
}
|
|
return hash.FillBytes(make([]byte, 32))
|
|
}
|
|
|
|
// registerProverInHypergraph registers a prover without filter (for global consensus)
|
|
func registerProverInHypergraph(t *testing.T, hg thypergraph.Hypergraph, publicKey []byte, address []byte) {
|
|
// Create the full address: GLOBAL_INTRINSIC_ADDRESS + prover address
|
|
fullAddress := [64]byte{}
|
|
copy(fullAddress[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:])
|
|
copy(fullAddress[32:], address)
|
|
|
|
// Create a VectorCommitmentTree with the prover data
|
|
tree := &qcrypto.VectorCommitmentTree{}
|
|
|
|
// Index 0: Public key
|
|
err := tree.Insert([]byte{0}, publicKey, nil, big.NewInt(0))
|
|
if err != nil {
|
|
t.Fatalf("Failed to insert public key: %v", err)
|
|
}
|
|
|
|
// Index 1<<2 (4): Status (1 byte) - 1 = active
|
|
err = tree.Insert([]byte{1 << 2}, []byte{1}, nil, big.NewInt(0))
|
|
if err != nil {
|
|
t.Fatalf("Failed to insert status: %v", err)
|
|
}
|
|
|
|
err = tree.Insert([]byte{3 << 2}, []byte{0, 0, 0, 0, 0, 0, 3, 232}, nil, big.NewInt(0)) // seniority = 1000
|
|
require.NoError(t, err)
|
|
|
|
// Type Index:
|
|
typeBI, _ := poseidon.HashBytes(
|
|
slices.Concat(bytes.Repeat([]byte{0xff}, 32), []byte("prover:Prover")),
|
|
)
|
|
tree.Insert(bytes.Repeat([]byte{0xff}, 32), typeBI.FillBytes(make([]byte, 32)), nil, big.NewInt(32))
|
|
|
|
// Create allocation
|
|
allocationAddressBI, err := poseidon.HashBytes(slices.Concat([]byte("PROVER_ALLOCATION"), publicKey, []byte{}))
|
|
require.NoError(t, err)
|
|
allocationAddress := allocationAddressBI.FillBytes(make([]byte, 32))
|
|
|
|
allocationTree := &qcrypto.VectorCommitmentTree{}
|
|
// Store allocation data
|
|
err = allocationTree.Insert([]byte{0 << 2}, fullAddress[32:], nil, big.NewInt(0))
|
|
require.NoError(t, err)
|
|
err = allocationTree.Insert([]byte{2 << 2}, []byte{}, nil, big.NewInt(0))
|
|
require.NoError(t, err)
|
|
err = allocationTree.Insert([]byte{1 << 2}, []byte{1}, nil, big.NewInt(0)) // active
|
|
require.NoError(t, err)
|
|
joinFrameBytes := make([]byte, 8)
|
|
binary.BigEndian.PutUint64(joinFrameBytes, 0)
|
|
err = allocationTree.Insert([]byte{4 << 2}, joinFrameBytes, nil, big.NewInt(0))
|
|
require.NoError(t, err)
|
|
allocationTypeBI, _ := poseidon.HashBytes(
|
|
slices.Concat(bytes.Repeat([]byte{0xff}, 32), []byte("allocation:ProverAllocation")),
|
|
)
|
|
allocationTree.Insert(bytes.Repeat([]byte{0xff}, 32), allocationTypeBI.FillBytes(make([]byte, 32)), nil, big.NewInt(32))
|
|
|
|
// Add the prover to the hypergraph
|
|
inclusionProver := bls48581.NewKZGInclusionProver(zap.L())
|
|
commitment := tree.Commit(inclusionProver, false)
|
|
if len(commitment) != 74 && len(commitment) != 64 {
|
|
t.Fatalf("Invalid commitment length: %d", len(commitment))
|
|
}
|
|
allocCommitment := allocationTree.Commit(inclusionProver, false)
|
|
if len(allocCommitment) != 74 && len(allocCommitment) != 64 {
|
|
t.Fatalf("Invalid commitment length: %d", len(allocCommitment))
|
|
}
|
|
|
|
// Add vertex to hypergraph
|
|
txn, _ := hg.NewTransaction(false)
|
|
err = hg.AddVertex(txn, hgcrdt.NewVertex([32]byte(fullAddress[:32]), [32]byte(fullAddress[32:]), commitment, big.NewInt(0)))
|
|
if err != nil {
|
|
t.Fatalf("Failed to add prover vertex to hypergraph: %v", err)
|
|
}
|
|
err = hg.AddVertex(txn, hgcrdt.NewVertex([32]byte(fullAddress[:32]), [32]byte(allocationAddress[:]), allocCommitment, big.NewInt(0)))
|
|
if err != nil {
|
|
t.Fatalf("Failed to add prover vertex to hypergraph: %v", err)
|
|
}
|
|
|
|
hg.SetVertexData(txn, fullAddress, tree)
|
|
hg.SetVertexData(txn, [64]byte(slices.Concat(fullAddress[:32], allocationAddress)), allocationTree)
|
|
txn.Commit()
|
|
|
|
// Commit the hypergraph
|
|
hg.Commit(0)
|
|
|
|
t.Logf(" Registered global prover with address: %x (public key length: %d)", address, len(publicKey))
|
|
}
|
|
|
|
// Test helper to create a fully wired engine for integration tests
|
|
func createIntegrationTestGlobalConsensusEngine(
|
|
t *testing.T,
|
|
peerID []byte,
|
|
network uint8,
|
|
h host.Host,
|
|
privKey pcrypto.PrivKey,
|
|
bootstrapHosts []host.Host,
|
|
) (
|
|
*GlobalConsensusEngine,
|
|
*mockIntegrationPubSub,
|
|
*consensustime.GlobalTimeReel,
|
|
func(),
|
|
) {
|
|
return createIntegrationTestGlobalConsensusEngineWithHypergraph(t, peerID, nil, network, h, privKey, bootstrapHosts)
|
|
}
|
|
|
|
// createIntegrationTestGlobalConsensusEngineWithHypergraph creates an engine with optional shared hypergraph
|
|
func createIntegrationTestGlobalConsensusEngineWithHypergraph(
|
|
t *testing.T,
|
|
peerID []byte,
|
|
sharedHypergraph thypergraph.Hypergraph,
|
|
network uint8,
|
|
h host.Host,
|
|
privKey pcrypto.PrivKey,
|
|
bootstrapHosts []host.Host,
|
|
) (
|
|
*GlobalConsensusEngine,
|
|
*mockIntegrationPubSub,
|
|
*consensustime.GlobalTimeReel,
|
|
func(),
|
|
) {
|
|
return createIntegrationTestGlobalConsensusEngineWithHypergraphAndKey(t, peerID, sharedHypergraph, nil, network, h, privKey, bootstrapHosts)
|
|
}
|
|
|
|
// createIntegrationTestGlobalConsensusEngineWithHypergraphAndKey creates an engine with optional shared hypergraph and pre-generated key
|
|
func createIntegrationTestGlobalConsensusEngineWithHypergraphAndKey(
|
|
t *testing.T,
|
|
peerID []byte,
|
|
sharedHypergraph thypergraph.Hypergraph,
|
|
preGeneratedKey *tkeys.Key,
|
|
network uint8,
|
|
h host.Host,
|
|
privKey pcrypto.PrivKey,
|
|
bootstrapHosts []host.Host,
|
|
) (
|
|
*GlobalConsensusEngine,
|
|
*mockIntegrationPubSub,
|
|
*consensustime.GlobalTimeReel,
|
|
func(),
|
|
) {
|
|
logcfg := zap.NewDevelopmentConfig()
|
|
if preGeneratedKey != nil {
|
|
adBI, _ := poseidon.HashBytes(preGeneratedKey.PublicKey)
|
|
addr := adBI.FillBytes(make([]byte, 32))
|
|
logcfg.EncoderConfig.TimeKey = "M"
|
|
logcfg.EncoderConfig.EncodeTime = func(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
|
|
enc.AppendString(fmt.Sprintf("node | %s", hex.EncodeToString(addr)[:10]))
|
|
}
|
|
}
|
|
logger, _ := logcfg.Build()
|
|
|
|
// Create unique in-memory key manager for each node
|
|
bc := &bls48581.Bls48581KeyConstructor{}
|
|
dc := &bulletproofs.Decaf448KeyConstructor{}
|
|
keyManager := keys.NewInMemoryKeyManager(bc, dc)
|
|
|
|
// Create alert signer and put it in the config
|
|
alertSigner, _, _ := keyManager.CreateSigningKey("alert-key", crypto.KeyTypeEd448)
|
|
pub := alertSigner.Public().([]byte)
|
|
alertHex := hex.EncodeToString(pub)
|
|
|
|
// Set up peer key
|
|
peerkey, _, _ := keyManager.CreateSigningKey("q-peer-key", crypto.KeyTypeEd448)
|
|
peerpriv := peerkey.Private()
|
|
peerHex := hex.EncodeToString(peerpriv)
|
|
p2pcfg := config.P2PConfig{}.WithDefaults()
|
|
|
|
p2pcfg.Network = network
|
|
p2pcfg.PeerPrivKey = peerHex
|
|
p2pcfg.StreamListenMultiaddr = "/ip4/0.0.0.0/tcp/0"
|
|
cfg := &config.Config{
|
|
Engine: &config.EngineConfig{
|
|
Difficulty: 100,
|
|
ProvingKeyId: "q-prover-key", // Always use the required key ID
|
|
AlertKey: alertHex,
|
|
ArchiveMode: true,
|
|
},
|
|
P2P: &p2pcfg,
|
|
}
|
|
|
|
// Create the required "q-prover-key"
|
|
var proverKey crypto.Signer
|
|
var err error
|
|
|
|
if preGeneratedKey != nil {
|
|
// Use the pre-generated key from the multi-node test
|
|
preGeneratedKey.Id = "q-prover-key"
|
|
err = keyManager.PutRawKey(preGeneratedKey)
|
|
require.NoError(t, err)
|
|
|
|
proverKey, err = keyManager.GetSigningKey("q-prover-key")
|
|
require.NoError(t, err)
|
|
} else {
|
|
// Single node test - just create the key normally
|
|
proverKey, _, err = keyManager.CreateSigningKey("q-prover-key", crypto.KeyTypeBLS48581G1)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// Create stores
|
|
pebbleDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/global"}, 0)
|
|
|
|
// Create inclusion prover and verifiable encryptor
|
|
inclusionProver := bls48581.NewKZGInclusionProver(logger)
|
|
verifiableEncryptor := verenc.NewMPCitHVerifiableEncryptor(1)
|
|
|
|
// Create or use shared hypergraph
|
|
var hg thypergraph.Hypergraph
|
|
if sharedHypergraph != nil {
|
|
hg = sharedHypergraph
|
|
} else {
|
|
hypergraphStore := store.NewPebbleHypergraphStore(&config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/global"}, pebbleDB, logger, verifiableEncryptor, inclusionProver)
|
|
hg = hgcrdt.NewHypergraph(logger, hypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}, 1)
|
|
}
|
|
|
|
// Create key store
|
|
keyStore := store.NewPebbleKeyStore(pebbleDB, logger)
|
|
|
|
// Create clock store
|
|
clockStore := store.NewPebbleClockStore(pebbleDB, logger)
|
|
|
|
// Create concrete components
|
|
frameProver := vdf.NewWesolowskiFrameProver(logger)
|
|
signerRegistry, err := registration.NewCachedSignerRegistry(keyStore, keyManager, bc, bulletproofs.NewBulletproofProver(), logger)
|
|
require.NoError(t, err)
|
|
|
|
// Create prover registry with hypergraph
|
|
proverRegistry, err := provers.NewProverRegistry(logger, hg)
|
|
require.NoError(t, err)
|
|
|
|
// Register multiple provers in hypergraph (no filter for global)
|
|
// We need at least a few provers for the consensus to work
|
|
proverKeys := []crypto.Signer{proverKey}
|
|
|
|
// Only register provers if we're creating a new hypergraph
|
|
if sharedHypergraph == nil {
|
|
// Register all provers
|
|
for i, key := range proverKeys {
|
|
proverAddress := calculateProverAddress(key.Public().([]byte))
|
|
registerProverInHypergraph(t, hg, key.Public().([]byte), proverAddress)
|
|
t.Logf("Registered prover %d with address: %x", i, proverAddress)
|
|
}
|
|
}
|
|
|
|
// Refresh the prover registry
|
|
proverRegistry.Refresh()
|
|
|
|
// Create fee manager
|
|
dynamicFeeManager := fees.NewDynamicFeeManager(logger, inclusionProver)
|
|
|
|
// Create validators and adjusters
|
|
appFrameValidator := validator.NewBLSAppFrameValidator(proverRegistry, bc, frameProver, logger)
|
|
frameValidator := validator.NewBLSGlobalFrameValidator(proverRegistry, bc, frameProver, logger)
|
|
difficultyAdjuster := difficulty.NewAsertDifficultyAdjuster(0, time.Now().UnixMilli(), 80000)
|
|
rewardIssuance := reward.NewOptRewardIssuance()
|
|
|
|
// Create pubsub
|
|
pubsub := newMockIntegrationPubSub(cfg, logger, peerID, h, privKey, bootstrapHosts)
|
|
|
|
// Create time reel
|
|
globalTimeReel, err := consensustime.NewGlobalTimeReel(logger, proverRegistry, clockStore, network, true)
|
|
require.NoError(t, err)
|
|
|
|
// Create event distributor
|
|
eventDistributor := events.NewGlobalEventDistributor(
|
|
globalTimeReel.GetEventCh(),
|
|
)
|
|
|
|
// Create engine
|
|
engine, err := NewGlobalConsensusEngine(
|
|
logger,
|
|
cfg,
|
|
1000, // frameTimeMillis
|
|
pubsub,
|
|
hg,
|
|
keyManager,
|
|
keyStore,
|
|
frameProver,
|
|
inclusionProver,
|
|
signerRegistry,
|
|
proverRegistry,
|
|
dynamicFeeManager,
|
|
appFrameValidator,
|
|
frameValidator,
|
|
difficultyAdjuster,
|
|
rewardIssuance,
|
|
eventDistributor,
|
|
globalTimeReel,
|
|
clockStore,
|
|
nil, // inboxStore
|
|
nil, // hypergraphStore
|
|
store.NewPebbleShardsStore(pebbleDB, logger),
|
|
store.NewPebbleConsensusStore(pebbleDB, logger),
|
|
store.NewPebbleWorkerStore(pebbleDB, logger),
|
|
channel.NewDoubleRatchetEncryptedChannel(), // encryptedChannel
|
|
&bulletproofs.Decaf448BulletproofProver{}, // bulletproofProver
|
|
&verenc.MPCitHVerifiableEncryptor{}, // verEnc
|
|
&bulletproofs.Decaf448KeyConstructor{}, // decafConstructor
|
|
compiler.NewBedlamCompiler(),
|
|
bc,
|
|
qp2p.NewInMemoryPeerInfoManager(logger),
|
|
)
|
|
require.NoError(t, err)
|
|
|
|
cleanup := func() {
|
|
engine.Stop(false)
|
|
}
|
|
|
|
return engine, pubsub, globalTimeReel, cleanup
|
|
}
|
|
|
|
func TestGlobalConsensusEngine_Integration_BasicFrameProgression(t *testing.T) {
|
|
// Generate hosts for testing
|
|
_, m, cleanupHosts := tests.GenerateSimnetHosts(t, 1, []libp2p.Option{})
|
|
defer cleanupHosts()
|
|
|
|
engine, pubsub, _, _ := createIntegrationTestGlobalConsensusEngine(t, []byte(m.Nodes[0].ID()), 99, m.Nodes[0], m.Keys[0], m.Nodes)
|
|
|
|
// Track published frames
|
|
publishedFrames := make([]*protobufs.GlobalFrame, 0)
|
|
var mu sync.Mutex
|
|
pubsub.onPublish = func(bitmask []byte, data []byte) {
|
|
// Check if data is long enough to contain type prefix
|
|
if len(data) >= 4 {
|
|
// Read type prefix from first 4 bytes
|
|
typePrefix := binary.BigEndian.Uint32(data[:4])
|
|
|
|
// Check if it's a GlobalFrame
|
|
if typePrefix == protobufs.GlobalProposalType {
|
|
frame := &protobufs.GlobalProposal{}
|
|
if err := frame.FromCanonicalBytes(data); err == nil {
|
|
mu.Lock()
|
|
publishedFrames = append(publishedFrames, frame.State)
|
|
mu.Unlock()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Start the engine
|
|
ctx, cancel, errChan := lifecycle.WithSignallerAndCancel(context.Background())
|
|
err := engine.Start(ctx)
|
|
require.NoError(t, err)
|
|
|
|
// Check for startup errors
|
|
select {
|
|
case err := <-errChan:
|
|
require.NoError(t, err)
|
|
case <-time.After(100 * time.Millisecond):
|
|
// No error is good
|
|
}
|
|
|
|
// Wait for state transitions
|
|
time.Sleep(20 * time.Second)
|
|
|
|
// Verify engine is in an active state
|
|
state := engine.GetState()
|
|
t.Logf("Current engine state: %v", state)
|
|
assert.NotEqual(t, tconsensus.EngineStateStopped, state)
|
|
assert.NotEqual(t, tconsensus.EngineStateStarting, state)
|
|
|
|
// Wait for frame processing
|
|
time.Sleep(10 * time.Second)
|
|
|
|
// Check if frames were published
|
|
mu.Lock()
|
|
frameCount := len(publishedFrames)
|
|
mu.Unlock()
|
|
|
|
t.Logf("Published %d frames", frameCount)
|
|
|
|
// Stop the engine
|
|
cancel()
|
|
<-engine.Stop(false)
|
|
}
|
|
|
|
func TestGlobalConsensusEngine_Integration_MultiNodeConsensus(t *testing.T) {
|
|
// Generate hosts for all 6 nodes first
|
|
_, m, cleanupHosts := tests.GenerateSimnetHosts(t, 6, []libp2p.Option{})
|
|
defer cleanupHosts()
|
|
|
|
// Create shared components first
|
|
logger, _ := zap.NewDevelopment()
|
|
|
|
// Create shared hypergraph that all nodes will use
|
|
pebbleDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/global_shared"}, 0)
|
|
inclusionProver := bls48581.NewKZGInclusionProver(logger)
|
|
verifiableEncryptor := verenc.NewMPCitHVerifiableEncryptor(1)
|
|
hypergraphStores := make([]*store.PebbleHypergraphStore, 6)
|
|
hypergraphs := make([]*hgcrdt.HypergraphCRDT, 6)
|
|
|
|
// Create a temporary key manager to generate keys for hypergraph registration
|
|
bc := &bls48581.Bls48581KeyConstructor{}
|
|
dc := &bulletproofs.Decaf448KeyConstructor{}
|
|
|
|
// Create and store raw keys for all nodes
|
|
nodeRawKeys := make([]*tkeys.Key, 6)
|
|
|
|
// Create and register 6 provers (one for each node)
|
|
for i := 0; i < 6; i++ {
|
|
hypergraphStores[i] = store.NewPebbleHypergraphStore(&config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/global_shared"}, pebbleDB, logger, verifiableEncryptor, inclusionProver)
|
|
hypergraphs[i] = hgcrdt.NewHypergraph(logger, hypergraphStores[i], inclusionProver, []int{}, &tests.Nopthenticator{}, 1)
|
|
}
|
|
for i := 0; i < 6; i++ {
|
|
tempKeyManager := keys.NewInMemoryKeyManager(bc, dc)
|
|
|
|
proverKey, _, err := tempKeyManager.CreateSigningKey("q-prover-key", crypto.KeyTypeBLS48581G1)
|
|
require.NoError(t, err)
|
|
|
|
// Get the raw key for later use
|
|
rawKey, err := tempKeyManager.GetRawKey("q-prover-key")
|
|
require.NoError(t, err)
|
|
nodeRawKeys[i] = rawKey
|
|
|
|
proverAddress := calculateProverAddress(proverKey.Public().([]byte))
|
|
registerProverInHypergraph(t, hypergraphs[0], proverKey.Public().([]byte), proverAddress)
|
|
registerProverInHypergraph(t, hypergraphs[1], proverKey.Public().([]byte), proverAddress)
|
|
registerProverInHypergraph(t, hypergraphs[2], proverKey.Public().([]byte), proverAddress)
|
|
registerProverInHypergraph(t, hypergraphs[3], proverKey.Public().([]byte), proverAddress)
|
|
registerProverInHypergraph(t, hypergraphs[4], proverKey.Public().([]byte), proverAddress)
|
|
registerProverInHypergraph(t, hypergraphs[5], proverKey.Public().([]byte), proverAddress)
|
|
t.Logf("Registered prover %d with address: %x", i, proverAddress)
|
|
}
|
|
|
|
// Commit the hypergraph
|
|
for i := 0; i < 6; i++ {
|
|
hypergraphs[i].Commit(0)
|
|
}
|
|
|
|
// Create six engines that can communicate (minimum required for consensus)
|
|
engines := make([]*GlobalConsensusEngine, 6)
|
|
pubsubs := make([]*mockIntegrationPubSub, 6)
|
|
|
|
for i := 0; i < 6; i++ {
|
|
peerID := []byte(m.Nodes[i].ID())
|
|
engine, pubsub, _, _ := createIntegrationTestGlobalConsensusEngineWithHypergraphAndKey(t, peerID, hypergraphs[i], nodeRawKeys[i], 1, m.Nodes[i], m.Keys[i], m.Nodes)
|
|
engines[i] = engine
|
|
pubsubs[i] = pubsub
|
|
}
|
|
|
|
// Connect all pubsubs to each other
|
|
for i := 0; i < 6; i++ {
|
|
for j := 0; j < 6; j++ {
|
|
if i != j {
|
|
tests.ConnectSimnetHosts(t, m.Nodes[i], m.Nodes[j])
|
|
pubsubs[i].networkPeers[fmt.Sprintf("peer%d", j)] = pubsubs[j]
|
|
}
|
|
}
|
|
}
|
|
|
|
// Track frames and messages from all nodes
|
|
allFrames := make([][]*protobufs.GlobalFrame, 6)
|
|
proposalCount := make([]int, 6)
|
|
voteCount := make([]int, 6)
|
|
livenessCount := make([]int, 6)
|
|
var mu sync.Mutex
|
|
|
|
for i := 0; i < 6; i++ {
|
|
nodeIdx := i
|
|
pubsubs[i].onPublish = func(bitmask []byte, data []byte) {
|
|
// Check if data is long enough to contain type prefix
|
|
if len(data) >= 4 {
|
|
// Read type prefix from first 4 bytes
|
|
typePrefix := binary.BigEndian.Uint32(data[:4])
|
|
|
|
// Check if it's a GlobalFrame
|
|
if typePrefix == protobufs.GlobalProposalType {
|
|
frame := &protobufs.GlobalProposal{}
|
|
if err := frame.FromCanonicalBytes(data); err == nil {
|
|
mu.Lock()
|
|
allFrames[nodeIdx] = append(allFrames[nodeIdx], frame.State)
|
|
mu.Unlock()
|
|
t.Logf("Node %d published frame %d", nodeIdx+1, frame.State.Header.FrameNumber)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Track message processing
|
|
pubsubs[i].Subscribe([]byte{0x00}, func(msg *pb.Message) error {
|
|
// Check if data is long enough to contain type prefix
|
|
if len(msg.Data) >= 4 {
|
|
// Read type prefix from first 4 bytes
|
|
typePrefix := binary.BigEndian.Uint32(msg.Data[:4])
|
|
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
switch typePrefix {
|
|
case protobufs.GlobalProposalType:
|
|
proposalCount[nodeIdx]++
|
|
case protobufs.ProposalVoteType:
|
|
voteCount[nodeIdx]++
|
|
case protobufs.ProverLivenessCheckType:
|
|
livenessCount[nodeIdx]++
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
cancels := []func(){}
|
|
// Start all engines
|
|
for i := 0; i < 6; i++ {
|
|
ctx, cancel, errChan := lifecycle.WithSignallerAndCancel(context.Background())
|
|
err := engines[i].Start(ctx)
|
|
require.NoError(t, err)
|
|
cancels = append(cancels, cancel)
|
|
|
|
// Check for startup errors
|
|
select {
|
|
case err := <-errChan:
|
|
require.NoError(t, err)
|
|
case <-time.After(100 * time.Millisecond):
|
|
// No error is good
|
|
}
|
|
}
|
|
|
|
// Let the engines run and reach initial sync
|
|
time.Sleep(5 * time.Second)
|
|
|
|
// Monitor state transitions and ensure all proposals are seen
|
|
proposalsSeen := make([]bool, 6)
|
|
timeout := time.After(30 * time.Second)
|
|
ticker := time.NewTicker(1 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
// Wait for all nodes to see all proposals
|
|
loop:
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
t.Fatal("Timeout waiting for all nodes to see proposals")
|
|
case <-ticker.C:
|
|
// Check if all nodes have seen at least 5 proposals (from other nodes)
|
|
allSeen := true
|
|
mu.Lock()
|
|
for i := 0; i < 6; i++ {
|
|
if proposalCount[i] < 5 {
|
|
allSeen = false
|
|
} else if !proposalsSeen[i] {
|
|
proposalsSeen[i] = true
|
|
t.Logf("Node %d has seen %d proposals", i+1, proposalCount[i])
|
|
}
|
|
}
|
|
mu.Unlock()
|
|
|
|
if allSeen {
|
|
// Wait for message deliveries to complete
|
|
time.Sleep(1 * time.Second)
|
|
t.Log("All nodes have seen all proposals, proceeding")
|
|
break loop
|
|
}
|
|
|
|
// Log current state
|
|
for i := 0; i < 6; i++ {
|
|
state := engines[i].GetState()
|
|
mu.Lock()
|
|
t.Logf("Engine %d state: %v, proposals: %d, votes: %d, liveness: %d",
|
|
i+1, state, proposalCount[i], voteCount[i], livenessCount[i])
|
|
mu.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Give time for voting and finalization
|
|
time.Sleep(10 * time.Second)
|
|
|
|
// Check states after consensus
|
|
for i := 0; i < 6; i++ {
|
|
state := engines[i].GetState()
|
|
mu.Lock()
|
|
t.Logf("Final Engine %d state: %v, proposals: %d, votes: %d",
|
|
i+1, state, proposalCount[i], voteCount[i])
|
|
mu.Unlock()
|
|
}
|
|
|
|
// Check if any frames were published
|
|
mu.Lock()
|
|
totalFrames := 0
|
|
for i := 0; i < 6; i++ {
|
|
totalFrames += len(allFrames[i])
|
|
}
|
|
mu.Unlock()
|
|
|
|
t.Logf("Total frames published across all nodes: %d", totalFrames)
|
|
|
|
// Stop all engines
|
|
for i := 0; i < 6; i++ {
|
|
cancels[i]()
|
|
<-engines[i].Stop(false)
|
|
}
|
|
}
|
|
|
|
func TestGlobalConsensusEngine_Integration_ShardCoverage(t *testing.T) {
|
|
// This test needs to run long enough to hit the condition required
|
|
if testing.Short() {
|
|
t.Skip("Skipping shard coverage scenario test in short mode")
|
|
}
|
|
|
|
// Generate hosts for testing
|
|
_, m, cleanupHosts := tests.GenerateSimnetHosts(t, 1, []libp2p.Option{})
|
|
defer cleanupHosts()
|
|
|
|
pebbleDB := store.NewPebbleDB(zap.L(), &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/global_shared"}, 0)
|
|
|
|
inclusionProver := bls48581.NewKZGInclusionProver(zap.L())
|
|
hypergraphStore := store.NewPebbleHypergraphStore(&config.DBConfig{
|
|
InMemoryDONOTUSE: true,
|
|
Path: ".test/global",
|
|
}, pebbleDB, zap.L(), &verenc.MPCitHVerifiableEncryptor{}, inclusionProver)
|
|
hg := hgcrdt.NewHypergraph(zap.NewNop(), hypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}, 1)
|
|
for i := range 6 {
|
|
k := make([]byte, 585)
|
|
k[1] = byte(i)
|
|
abi, _ := poseidon.HashBytes(k)
|
|
registerProverInHypergraphWithFilter(t, hg, k, abi.FillBytes(make([]byte, 32)), []byte{1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8})
|
|
}
|
|
engine, _, _, _ := createIntegrationTestGlobalConsensusEngineWithHypergraphAndKey(t, []byte(m.Nodes[0].ID()), hg, nil, 1, m.Nodes[0], m.Keys[0], m.Nodes)
|
|
|
|
// simulate a one byte vertex so the shard has space being used
|
|
txn, _ := hg.NewTransaction(false)
|
|
c := make([]byte, 74)
|
|
c[0] = 0x02
|
|
hg.AddVertex(txn, hgcrdt.NewVertex([32]byte{1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8}, [32]byte{}, c, big.NewInt(1)))
|
|
txn.Commit()
|
|
|
|
// Track emitted events
|
|
var publishedEvents []tconsensus.ControlEvent
|
|
var mu sync.Mutex
|
|
|
|
// Replace event distributor to capture events
|
|
eventDistributor := events.NewGlobalEventDistributor(nil)
|
|
|
|
// Subscribe to capture events
|
|
eventCh := eventDistributor.Subscribe("test")
|
|
go func() {
|
|
for event := range eventCh {
|
|
mu.Lock()
|
|
publishedEvents = append(publishedEvents, event)
|
|
mu.Unlock()
|
|
t.Logf("Event published: %d", event.Type)
|
|
}
|
|
}()
|
|
|
|
engine.eventDistributor = eventDistributor
|
|
|
|
// Start the event distributor
|
|
ctx, cancel, _ := lifecycle.WithSignallerAndCancel(context.Background())
|
|
err := engine.Start(ctx)
|
|
require.NoError(t, err)
|
|
|
|
// Run shard coverage check
|
|
err = engine.checkShardCoverage(1)
|
|
require.NoError(t, err)
|
|
|
|
// Wait for event processing and possible new app shard head
|
|
time.Sleep(1800 * time.Second)
|
|
mu.Lock()
|
|
found := false
|
|
newHeadAfter := false
|
|
for _, e := range publishedEvents {
|
|
if e.Type == tconsensus.ControlEventCoverageHalt {
|
|
found = true
|
|
}
|
|
if found && e.Type == tconsensus.ControlEventAppNewHead {
|
|
newHeadAfter = true
|
|
}
|
|
}
|
|
mu.Unlock()
|
|
|
|
require.True(t, found)
|
|
require.False(t, newHeadAfter)
|
|
|
|
// Stop the event distributor
|
|
cancel()
|
|
<-engine.Stop(false)
|
|
}
|
|
|
|
// TestGlobalConsensusEngine_Integration_NoProversStaysInVerifying tests that engines
|
|
// remain in the verifying state when no provers are registered in the hypergraph
|
|
func TestGlobalConsensusEngine_Integration_NoProversStaysInVerifying(t *testing.T) {
|
|
t.Log("Testing global consensus engines with no registered provers")
|
|
|
|
// Create six nodes
|
|
numNodes := 6
|
|
|
|
// Generate hosts for all nodes first
|
|
_, m, cleanupHosts := tests.GenerateSimnetHosts(t, numNodes, []libp2p.Option{})
|
|
defer cleanupHosts()
|
|
|
|
// Create shared components
|
|
logger, _ := zap.NewDevelopment()
|
|
|
|
engines := make([]*GlobalConsensusEngine, numNodes)
|
|
pubsubs := make([]*mockIntegrationPubSub, numNodes)
|
|
cancels := make([]func(), numNodes)
|
|
|
|
// Create shared hypergraph with NO provers registered
|
|
inclusionProver := bls48581.NewKZGInclusionProver(logger)
|
|
verifiableEncryptor := verenc.NewMPCitHVerifiableEncryptor(1)
|
|
|
|
// Create separate hypergraph and prover registry for each node to ensure isolation
|
|
for i := 0; i < numNodes; i++ {
|
|
nodeID := i + 1
|
|
peerID := []byte(m.Nodes[i].ID())
|
|
|
|
t.Logf("Creating node %d with peer ID: %x", nodeID, peerID)
|
|
|
|
// Create unique components for each node
|
|
pebbleDB := store.NewPebbleDB(logger, &config.DBConfig{
|
|
InMemoryDONOTUSE: true,
|
|
Path: fmt.Sprintf(".test/global_no_provers_%d", nodeID),
|
|
}, 0)
|
|
|
|
hypergraphStore := store.NewPebbleHypergraphStore(&config.DBConfig{
|
|
InMemoryDONOTUSE: true,
|
|
Path: fmt.Sprintf(".test/global_no_provers_%d", nodeID),
|
|
}, pebbleDB, logger, verifiableEncryptor, inclusionProver)
|
|
hg := hgcrdt.NewHypergraph(logger, hypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}, 1)
|
|
|
|
// Create prover registry - but don't register any provers
|
|
proverRegistry, err := provers.NewProverRegistry(logger, hg)
|
|
require.NoError(t, err)
|
|
|
|
// Create key manager with prover key (but not registered in hypergraph)
|
|
bc := &bls48581.Bls48581KeyConstructor{}
|
|
dc := &bulletproofs.Decaf448KeyConstructor{}
|
|
keyManager := keys.NewInMemoryKeyManager(bc, dc)
|
|
_, _, err = keyManager.CreateSigningKey("q-prover-key", crypto.KeyTypeBLS48581G1)
|
|
require.NoError(t, err)
|
|
|
|
// Create other components
|
|
keyStore := store.NewPebbleKeyStore(pebbleDB, logger)
|
|
clockStore := store.NewPebbleClockStore(pebbleDB, logger)
|
|
frameProver := vdf.NewWesolowskiFrameProver(logger)
|
|
signerRegistry, err := registration.NewCachedSignerRegistry(keyStore, keyManager, bc, bulletproofs.NewBulletproofProver(), logger)
|
|
require.NoError(t, err)
|
|
|
|
// Create global time reel
|
|
globalTimeReel, err := consensustime.NewGlobalTimeReel(logger, proverRegistry, clockStore, 1, true)
|
|
require.NoError(t, err)
|
|
|
|
eventDistributor := events.NewGlobalEventDistributor(
|
|
globalTimeReel.GetEventCh(),
|
|
)
|
|
|
|
dynamicFeeManager := fees.NewDynamicFeeManager(logger, inclusionProver)
|
|
appFrameValidator := validator.NewBLSAppFrameValidator(proverRegistry, bc, frameProver, logger)
|
|
frameValidator := validator.NewBLSGlobalFrameValidator(proverRegistry, bc, frameProver, logger)
|
|
difficultyAdjuster := difficulty.NewAsertDifficultyAdjuster(0, time.Now().UnixMilli(), 10)
|
|
rewardIssuance := reward.NewOptRewardIssuance()
|
|
|
|
// Set up peer key
|
|
peerkey, _, _ := keyManager.CreateSigningKey("q-peer-key", crypto.KeyTypeEd448)
|
|
peerpriv := peerkey.Private()
|
|
peerHex := hex.EncodeToString(peerpriv)
|
|
|
|
p2pcfg := config.P2PConfig{}.WithDefaults()
|
|
p2pcfg.Network = 2
|
|
p2pcfg.PeerPrivKey = peerHex
|
|
p2pcfg.StreamListenMultiaddr = "/ip4/0.0.0.0/tcp/0"
|
|
cfg := &config.Config{
|
|
Engine: &config.EngineConfig{
|
|
Difficulty: 10,
|
|
ProvingKeyId: "q-prover-key",
|
|
GenesisSeed: strings.Repeat("00", 585),
|
|
},
|
|
P2P: &p2pcfg,
|
|
}
|
|
|
|
// Create pubsub with host and key
|
|
pubsubs[i] = newMockIntegrationPubSub(cfg, logger, peerID, m.Nodes[i], m.Keys[i], m.Nodes)
|
|
pubsubs[i].peerCount = 10 // Set high peer count
|
|
// Create engine
|
|
engine, err := NewGlobalConsensusEngine(
|
|
logger,
|
|
cfg,
|
|
1000, // frameTimeMillis
|
|
pubsubs[i],
|
|
hg,
|
|
keyManager,
|
|
keyStore,
|
|
frameProver,
|
|
inclusionProver,
|
|
signerRegistry,
|
|
proverRegistry,
|
|
dynamicFeeManager,
|
|
appFrameValidator,
|
|
frameValidator,
|
|
difficultyAdjuster,
|
|
rewardIssuance,
|
|
eventDistributor,
|
|
globalTimeReel,
|
|
clockStore,
|
|
nil, // inboxStore
|
|
nil, // hypergraphStore
|
|
store.NewPebbleShardsStore(pebbleDB, logger),
|
|
store.NewPebbleConsensusStore(pebbleDB, logger),
|
|
store.NewPebbleWorkerStore(pebbleDB, logger),
|
|
channel.NewDoubleRatchetEncryptedChannel(),
|
|
&bulletproofs.Decaf448BulletproofProver{}, // bulletproofProver
|
|
&verenc.MPCitHVerifiableEncryptor{}, // verEnc
|
|
&bulletproofs.Decaf448KeyConstructor{}, // decafConstructor
|
|
compiler.NewBedlamCompiler(),
|
|
bc, // blsConstructor
|
|
qp2p.NewInMemoryPeerInfoManager(logger),
|
|
)
|
|
require.NoError(t, err)
|
|
|
|
engines[i] = engine
|
|
}
|
|
|
|
// Wire up all pubsubs to each other
|
|
for i := 0; i < numNodes; i++ {
|
|
for j := 0; j < numNodes; j++ {
|
|
if i != j {
|
|
pubsubs[i].networkPeers[fmt.Sprintf("peer%d", j)] = pubsubs[j]
|
|
}
|
|
}
|
|
}
|
|
|
|
// Start all engines
|
|
for i := 0; i < numNodes; i++ {
|
|
ctx, cancel, errChan := lifecycle.WithSignallerAndCancel(context.Background())
|
|
err := engines[i].Start(ctx)
|
|
require.NoError(t, err)
|
|
cancels[i] = cancel
|
|
|
|
select {
|
|
case err := <-errChan:
|
|
require.NoError(t, err)
|
|
case <-time.After(100 * time.Millisecond):
|
|
// No error is good
|
|
}
|
|
}
|
|
|
|
// Let engines run for a while
|
|
t.Log("Letting engines run for 10 seconds...")
|
|
time.Sleep(10 * time.Second)
|
|
|
|
// Check that all engines are still in loading state
|
|
for i := 0; i < numNodes; i++ {
|
|
state := engines[i].GetState()
|
|
t.Logf("Node %d state: %v", i+1, state)
|
|
|
|
// Should be in loading state since no provers are registered
|
|
assert.Equal(t, tconsensus.EngineStateVerifying, state,
|
|
"Node %d should be in verifying state when no provers are registered", i+1)
|
|
}
|
|
|
|
// Stop all engines
|
|
for i := 0; i < numNodes; i++ {
|
|
cancels[i]()
|
|
<-engines[i].Stop(false)
|
|
}
|
|
|
|
t.Log("Test completed - all nodes remained in verifying state as expected")
|
|
}
|
|
|
|
// TestGlobalConsensusEngine_Integration_AlertStopsProgression tests that engines
|
|
// halt when an alert broadcast occurs
|
|
func TestGlobalConsensusEngine_Integration_AlertStopsProgression(t *testing.T) {
|
|
// Generate hosts for testing
|
|
_, m, cleanupHosts := tests.GenerateSimnetHosts(t, 1, []libp2p.Option{})
|
|
defer cleanupHosts()
|
|
|
|
engine, pubsub, _, _ := createIntegrationTestGlobalConsensusEngine(t, []byte(m.Nodes[0].ID()), 99, m.Nodes[0], m.Keys[0], m.Nodes)
|
|
|
|
// Track published frames
|
|
publishedFrames := make([]*protobufs.GlobalFrame, 0)
|
|
afterAlertFrames := make([]*protobufs.GlobalFrame, 0)
|
|
afterAlert := false
|
|
|
|
var mu sync.Mutex
|
|
pubsub.onPublish = func(bitmask []byte, data []byte) {
|
|
// Check if data is long enough to contain type prefix
|
|
if len(data) >= 4 {
|
|
// Read type prefix from first 4 bytes
|
|
typePrefix := binary.BigEndian.Uint32(data[:4])
|
|
|
|
// Check if it's a GlobalFrame
|
|
if typePrefix == protobufs.GlobalProposalType {
|
|
frame := &protobufs.GlobalProposal{}
|
|
if err := frame.FromCanonicalBytes(data); err == nil {
|
|
mu.Lock()
|
|
if afterAlert {
|
|
afterAlertFrames = append(afterAlertFrames, frame.State)
|
|
} else {
|
|
publishedFrames = append(publishedFrames, frame.State)
|
|
}
|
|
mu.Unlock()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Start the engine
|
|
ctx, cancel, errChan := lifecycle.WithSignallerAndCancel(context.Background())
|
|
err := engine.Start(ctx)
|
|
require.NoError(t, err)
|
|
|
|
// Check for startup errors
|
|
select {
|
|
case err := <-errChan:
|
|
require.NoError(t, err)
|
|
case <-time.After(100 * time.Millisecond):
|
|
// No error is good
|
|
}
|
|
|
|
// Wait for state transitions
|
|
time.Sleep(10 * time.Second)
|
|
|
|
// Verify engine is in an active state
|
|
state := engine.GetState()
|
|
t.Logf("Current engine state: %v", state)
|
|
assert.NotEqual(t, tconsensus.EngineStateStopped, state)
|
|
assert.NotEqual(t, tconsensus.EngineStateStarting, state)
|
|
|
|
// Wait for frame processing
|
|
time.Sleep(10 * time.Second)
|
|
|
|
alertKey, _ := engine.keyManager.GetSigningKey("alert-key")
|
|
sig, _ := alertKey.SignWithDomain([]byte("It's time to stop!"), []byte("GLOBAL_ALERT"))
|
|
alertMessage := &protobufs.GlobalAlert{
|
|
Message: "It's time to stop!",
|
|
Signature: sig,
|
|
}
|
|
|
|
alertMessageBytes, _ := alertMessage.ToCanonicalBytes()
|
|
|
|
// Send alert
|
|
engine.globalAlertMessageQueue <- &pb.Message{
|
|
From: []byte{0x00},
|
|
Data: alertMessageBytes,
|
|
}
|
|
|
|
// Wait for event bus to catch up
|
|
time.Sleep(1 * time.Second)
|
|
|
|
mu.Lock()
|
|
afterAlert = true
|
|
mu.Unlock()
|
|
|
|
// Wait for any new messages to flow after
|
|
time.Sleep(10 * time.Second)
|
|
|
|
// Check if frames were published
|
|
mu.Lock()
|
|
frameCount := len(publishedFrames)
|
|
afterAlertCount := len(afterAlertFrames)
|
|
mu.Unlock()
|
|
|
|
t.Logf("Published %d frames before alert", frameCount)
|
|
t.Logf("Published %d frames after alert", afterAlertCount)
|
|
|
|
require.Equal(t, 0, afterAlertCount)
|
|
|
|
// Stop the engine
|
|
cancel()
|
|
<-engine.Stop(false)
|
|
}
|
|
|
|
// registerProverInHypergraphWithFilter registers a prover with a specific filter (shard address)
|
|
func registerProverInHypergraphWithFilter(t *testing.T, hg thypergraph.Hypergraph, publicKey []byte, address []byte, filter []byte) {
|
|
// Create the full address: GLOBAL_INTRINSIC_ADDRESS + prover address
|
|
fullAddress := [64]byte{}
|
|
copy(fullAddress[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:])
|
|
copy(fullAddress[32:], address)
|
|
|
|
// Create a VectorCommitmentTree with the prover data
|
|
tree := &qcrypto.VectorCommitmentTree{}
|
|
|
|
// Index 0: Public key
|
|
err := tree.Insert([]byte{0}, publicKey, nil, big.NewInt(0))
|
|
if err != nil {
|
|
t.Fatalf("Failed to insert public key: %v", err)
|
|
}
|
|
|
|
// Index 1<<2 (4): Status (1 byte) - 1 = active
|
|
err = tree.Insert([]byte{1 << 2}, []byte{1}, nil, big.NewInt(0))
|
|
if err != nil {
|
|
t.Fatalf("Failed to insert status: %v", err)
|
|
}
|
|
|
|
err = tree.Insert([]byte{3 << 2}, []byte{0, 0, 0, 0, 0, 0, 3, 232}, nil, big.NewInt(0)) // seniority = 1000
|
|
require.NoError(t, err)
|
|
|
|
// Type Index:
|
|
typeBI, _ := poseidon.HashBytes(
|
|
slices.Concat(bytes.Repeat([]byte{0xff}, 32), []byte("prover:Prover")),
|
|
)
|
|
tree.Insert(bytes.Repeat([]byte{0xff}, 32), typeBI.FillBytes(make([]byte, 32)), nil, big.NewInt(32))
|
|
|
|
// Create allocation
|
|
allocationAddressBI, err := poseidon.HashBytes(slices.Concat([]byte("PROVER_ALLOCATION"), publicKey, []byte{}))
|
|
require.NoError(t, err)
|
|
allocationAddress := allocationAddressBI.FillBytes(make([]byte, 32))
|
|
|
|
allocationTree := &qcrypto.VectorCommitmentTree{}
|
|
// Store allocation data
|
|
err = allocationTree.Insert([]byte{0 << 2}, fullAddress[32:], nil, big.NewInt(0))
|
|
require.NoError(t, err)
|
|
err = allocationTree.Insert([]byte{2 << 2}, filter, nil, big.NewInt(0)) // confirm filter
|
|
require.NoError(t, err)
|
|
err = allocationTree.Insert([]byte{1 << 2}, []byte{1}, nil, big.NewInt(0)) // active
|
|
require.NoError(t, err)
|
|
joinFrameBytes := make([]byte, 8)
|
|
binary.BigEndian.PutUint64(joinFrameBytes, 0)
|
|
err = allocationTree.Insert([]byte{4 << 2}, joinFrameBytes, nil, big.NewInt(0))
|
|
require.NoError(t, err)
|
|
allocationTypeBI, _ := poseidon.HashBytes(
|
|
slices.Concat(bytes.Repeat([]byte{0xff}, 32), []byte("allocation:ProverAllocation")),
|
|
)
|
|
allocationTree.Insert(bytes.Repeat([]byte{0xff}, 32), allocationTypeBI.FillBytes(make([]byte, 32)), nil, big.NewInt(32))
|
|
|
|
// Add the prover to the hypergraph
|
|
inclusionProver := bls48581.NewKZGInclusionProver(zap.L())
|
|
commitment := tree.Commit(inclusionProver, false)
|
|
if len(commitment) != 74 && len(commitment) != 64 {
|
|
t.Fatalf("Invalid commitment length: %d", len(commitment))
|
|
}
|
|
allocCommitment := allocationTree.Commit(inclusionProver, false)
|
|
if len(allocCommitment) != 74 && len(allocCommitment) != 64 {
|
|
t.Fatalf("Invalid commitment length: %d", len(allocCommitment))
|
|
}
|
|
|
|
// Add vertex to hypergraph
|
|
txn, _ := hg.NewTransaction(false)
|
|
err = hg.AddVertex(txn, hgcrdt.NewVertex([32]byte(fullAddress[:32]), [32]byte(fullAddress[32:]), commitment, big.NewInt(0)))
|
|
if err != nil {
|
|
t.Fatalf("Failed to add prover vertex to hypergraph: %v", err)
|
|
}
|
|
err = hg.AddVertex(txn, hgcrdt.NewVertex([32]byte(fullAddress[:32]), [32]byte(allocationAddress[:]), allocCommitment, big.NewInt(0)))
|
|
if err != nil {
|
|
t.Fatalf("Failed to add prover vertex to hypergraph: %v", err)
|
|
}
|
|
|
|
hg.SetVertexData(txn, fullAddress, tree)
|
|
hg.SetVertexData(txn, [64]byte(slices.Concat(fullAddress[:32], allocationAddress)), allocationTree)
|
|
txn.Commit()
|
|
|
|
// Commit the hypergraph
|
|
hg.Commit(0)
|
|
|
|
t.Logf(" Registered prover with address: %x, filter: %x (public key length: %d)", address, filter, len(publicKey))
|
|
}
|