v2.1.0.20 (#516)

* .20 testing

* Read in the debug by env variable (#514)

* v2.1.0.19

* enhanced error logging, fix seniority marker join blocker, fix sync message size limit defaults

* resolve signature failure

* additional error logging for merge-related signatures

* fix: one-shot sync message size, app shard TC signature size, collector/hotstuff race condition, expired joins blocking new joins due to pruning disable

* remove compat with old 2.0.0 blossomsub

* fix: resolve abandoned prover joins

* reload prover registry

* fix stale worker proposal edge

* add full sanity check on join before submitting to identify bug

* resolve non-fallthrough condition that should be fallthrough

* fix: resolve rare SIGFPE, fix orphan expired joins blocking workers from reallocating

* add reconnect fallback if no peers are found with variable reconnect time (#511)

Co-authored-by: Tyler Sturos <55340199+tjsturos@users.noreply.github.com>

* update base peer count to 1 (#513)

* fix: expired prover join frames, starting port ranges, proposer getting stuck, and seniority on joins

* fix: panic on shutdown, libp2p discovery picking inaccessible peers, coverage event check not in shutdown logic, amend app shard worker behavior to mirror global for prover root reconciliation

* fix: shutdown scenario quirks, reload hanging

* fix: do not bailout early on shutdown of coverage check

* fix: force registry refresh on worker waiting for registration

* add more logging to wait for prover

* fix: worker manager refreshes the filter on allocation, snapshots blocking close on shutdown

* tweak: force shutdown after five seconds for app worker

* fix: don't loop when shutting down

* fix: slight reordering, also added named workers to trace hanging shutdowns

* use deterministic key for peer id of workers to stop flagging workers as sybil attacks

* fix: remove pubsub stop from app consensus engine as it shouldn't manage pubsub lifecycle, integrate shutdown context to PerformSync to prevent stuck syncs from halting respawn

* fix: blossomsub pubsub interface does not properly track subscription status

* fix: subscribe order to avoid nil panic

* switch from dnsaddr to dns4

* add missing quic-v1

* additional logging to isolate respawn quirks

* fix: dnsaddr -> dns4 for blossomsub

* allow debug env var to be read

---------

Co-authored-by: Cassandra Heart <cassandra@quilibrium.com>
Co-authored-by: Tyler Sturos <55340199+tjsturos@users.noreply.github.com>
Co-authored-by: Cassandra Heart <7929478+CassOnMars@users.noreply.github.com>

* fix newPebbleDB constructor config param (#517)

* fix: high CPU overhead in initial worker behaviors/ongoing sync

* faster docker builds with better caching

* qol: add extra data to node info, and query metrics from command line

* leave proposals for overcrowded shards

* hub-and-spoke global message broadcasts

* small tweaks to cli output for join frames

---------

Co-authored-by: winged-pegasus <55340199+winged-pegasus@users.noreply.github.com>
Co-authored-by: Tyler Sturos <55340199+tjsturos@users.noreply.github.com>
Co-authored-by: Black Swan <3999712+blacks1ne@users.noreply.github.com>
This commit is contained in:
Cassandra Heart 2026-03-04 01:37:04 -06:00 committed by GitHub
parent ce4f77b140
commit 1b2660b7df
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
48 changed files with 4348 additions and 2052 deletions

View File

@ -43,7 +43,7 @@ func FormatVersion(version []byte) string {
}
func GetPatchNumber() byte {
return 0x13
return 0x14
}
func GetRCNumber() byte {

View File

@ -134,10 +134,36 @@ RUN go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest
WORKDIR /opt/ceremonyclient
# Copy source needed for generation (excluding node/client/sidecar handled in builders)
COPY --exclude=node \
--exclude=client \
--exclude=sidecar . .
# Rust workspace files (rarely change)
COPY Cargo.toml Cargo.lock ./
# Rust crate sources (needed by cargo build in generate.sh)
COPY crates crates
# Gen-* module directories (Go wrappers + generate.sh scripts)
COPY channel channel
COPY vdf vdf
COPY ferret ferret
COPY bls48581 bls48581
COPY rpm rpm
COPY verenc verenc
COPY bulletproofs bulletproofs
# go.mod/go.sum stubs for replace directive targets across all gen-* modules.
# Only module metadata is needed for go mod download; full Go source is deferred
# to build-context so that source changes don't bust gen-* stage caches.
COPY nekryptology/go.mod nekryptology/go.sum nekryptology/
COPY protobufs/go.mod protobufs/go.sum protobufs/
COPY consensus/go.mod consensus/go.sum consensus/
COPY types/go.mod types/go.sum types/
COPY utils/go.mod utils/go.sum utils/
COPY config/go.mod config/go.sum config/
COPY lifecycle/go.mod lifecycle/go.sum lifecycle/
COPY go-multiaddr/go.mod go-multiaddr/go.sum go-multiaddr/
COPY go-multiaddr-dns/go.mod go-multiaddr-dns/go.sum go-multiaddr-dns/
COPY go-libp2p/go.mod go-libp2p/go.sum go-libp2p/
COPY go-libp2p-kad-dht/go.mod go-libp2p-kad-dht/go.sum go-libp2p-kad-dht/
COPY go-libp2p-blossomsub/go.mod go-libp2p-blossomsub/go.sum go-libp2p-blossomsub/
# -----------------------------------------------------------------------------
# Parallel Generation Stages
@ -184,7 +210,14 @@ RUN ./generate.sh
# -----------------------------------------------------------------------------
FROM common-context AS build-context
# Copy generated artifacts back
# Copy full source tree for Go builds. Changes here only bust build-context
# cache, not the expensive gen-* stages above.
COPY --exclude=node \
--exclude=client \
--exclude=sidecar \
--exclude=qns-api . .
# Copy generated artifacts back (overwrites source dirs with generated versions)
COPY --from=gen-channel /opt/ceremonyclient/channel /opt/ceremonyclient/channel
COPY --from=gen-channel /opt/ceremonyclient/target/release/libchannel.a /opt/ceremonyclient/target/release/libchannel.a
@ -233,6 +266,27 @@ ARG BINARIES_DIR=/opt/ceremonyclient/target/release
RUN GOOS=${TARGETOS} GOARCH=${TARGETARCH} ./build.sh -o qclient
RUN cp qclient /usr/bin
# -----------------------------------------------------------------------------
# Stage: build-qns-api
# -----------------------------------------------------------------------------
FROM build-context AS build-qns-api
ARG TARGETOS
ARG TARGETARCH
COPY ./node /opt/ceremonyclient/node
WORKDIR /opt/ceremonyclient/node
RUN go mod download
COPY ./qns-api /opt/ceremonyclient/qns-api
WORKDIR /opt/ceremonyclient/qns-api
RUN go mod download
ARG BINARIES_DIR=/opt/ceremonyclient/target/release
ENV ROOT_DIR=/opt/ceremonyclient
ENV CEREMONYCLIENT_DIR=/opt/ceremonyclient
RUN GOOS=${TARGETOS} GOARCH=${TARGETARCH} ./build.sh -o qns-api ./cmd/api
RUN cp qns-api /usr/bin
# -----------------------------------------------------------------------------
# Stage: node-only
# -----------------------------------------------------------------------------
@ -279,6 +333,14 @@ FROM qclient-unix AS qclient-linux
FROM qclient-unix AS qclient-darwin
FROM qclient-${TARGETOS} AS qclient
FROM scratch AS qns-api-unix
COPY --from=build-qns-api /usr/bin/qns-api /qns-api
ENTRYPOINT [ "/qns-api" ]
FROM qns-api-unix AS qns-api-linux
FROM qns-api-unix AS qns-api-darwin
FROM qns-api-${TARGETOS} AS qns-api
# -----------------------------------------------------------------------------
# Stage: final (Default combined image)
# -----------------------------------------------------------------------------

View File

@ -69,7 +69,12 @@ RUN /opt/rustup-init.sh -y --profile minimal
# Install uniffi-bindgen-go
RUN cargo install uniffi-bindgen-go --git https://github.com/NordSecurity/uniffi-bindgen-go --tag v0.4.0+v0.28.3
FROM base-avx512 AS build-avx512
# -----------------------------------------------------------------------------
# Stage: gen-avx512
# Purpose: Build EMP and generate Rust bindings. Only Rust/gen sources are
# copied here so that Go source changes don't bust the cache.
# -----------------------------------------------------------------------------
FROM base-avx512 AS gen-avx512
ENV GOEXPERIMENT=arenas
ENV QUILIBRIUM_SIGNATURE_CHECK=false
@ -80,12 +85,41 @@ RUN go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest
WORKDIR /opt/ceremonyclient
# Copy everything except node and client so as to avoid
# invalidating the cache at this point on client or node rebuilds
# Rust workspace files (rarely change)
COPY Cargo.toml Cargo.lock ./
COPY --exclude=node \
--exclude=client \
--exclude=sidecar . .
# Rust crate sources (needed by cargo build in generate.sh)
COPY crates crates
# EMP toolkit sources
COPY install-emp.sh .
COPY emp-tool emp-tool
COPY emp-ot emp-ot
# Gen-* module directories (Go wrappers + generate.sh scripts)
COPY channel channel
COPY vdf vdf
COPY ferret ferret
COPY bls48581 bls48581
COPY rpm rpm
COPY verenc verenc
COPY bulletproofs bulletproofs
# go.mod/go.sum stubs for replace directive targets across all gen-* modules.
# Only module metadata is needed for go mod download; full Go source is deferred
# to build-avx512 so that source changes don't bust gen stage caches.
COPY nekryptology/go.mod nekryptology/go.sum nekryptology/
COPY protobufs/go.mod protobufs/go.sum protobufs/
COPY consensus/go.mod consensus/go.sum consensus/
COPY types/go.mod types/go.sum types/
COPY utils/go.mod utils/go.sum utils/
COPY config/go.mod config/go.sum config/
COPY lifecycle/go.mod lifecycle/go.sum lifecycle/
COPY go-multiaddr/go.mod go-multiaddr/go.sum go-multiaddr/
COPY go-multiaddr-dns/go.mod go-multiaddr-dns/go.sum go-multiaddr-dns/
COPY go-libp2p/go.mod go-libp2p/go.sum go-libp2p/
COPY go-libp2p-kad-dht/go.mod go-libp2p-kad-dht/go.sum go-libp2p-kad-dht/
COPY go-libp2p-blossomsub/go.mod go-libp2p-blossomsub/go.sum go-libp2p-blossomsub/
RUN bash install-emp.sh
@ -144,6 +178,20 @@ RUN go mod download
RUN ./generate.sh
# -----------------------------------------------------------------------------
# Stage: build-avx512
# Purpose: Add full source tree for Go builds. Changes here only bust
# build cache, not the expensive gen stage above.
# -----------------------------------------------------------------------------
FROM gen-avx512 AS build-avx512
WORKDIR /opt/ceremonyclient
# Copy full source tree for Go builds
COPY --exclude=node \
--exclude=client \
--exclude=sidecar . .
FROM build-avx512 AS build-node-avx512
# Build and install the node

View File

@ -247,9 +247,9 @@ func NewBlossomSubRouter(h host.Host, params BlossomSubParams, network uint8) *B
feature = func(f BlossomSubFeature, proto protocol.ID) bool {
switch f {
case BlossomSubFeatureMesh:
return proto == protos[0] || proto == protos[1]
return proto == protos[0]
case BlossomSubFeaturePX:
return proto == protos[0] || proto == protos[1]
return proto == protos[0]
case BlossomSubFeatureIdontwant:
return proto == protos[0]
default:

View File

@ -64,6 +64,10 @@ import (
up2p "source.quilibrium.com/quilibrium/monorepo/utils/p2p"
)
// globalSyncCooldownFrames is the number of frames to wait between
// performBlockingGlobalHypersync calls. At ~10s/frame this is ~50 seconds.
const globalSyncCooldownFrames = 5
// AppConsensusEngine uses the generic state machine for consensus
type AppConsensusEngine struct {
*lifecycle.ComponentManager
@ -118,6 +122,7 @@ type AppConsensusEngine struct {
appSpilloverMu sync.Mutex
lastProposalRank uint64
lastProposalRankMu sync.RWMutex
commitBarrier sync.Mutex
collectedMessages []*protobufs.Message
collectedMessagesMu sync.RWMutex
provingMessages []*protobufs.Message
@ -152,9 +157,10 @@ type AppConsensusEngine struct {
coverageMinProvers uint64
coverageHaltThreshold uint64
coverageHaltGrace uint64
globalProverRootVerifiedFrame atomic.Uint64
globalProverRootSynced atomic.Bool
globalProverSyncInProgress atomic.Bool
globalProverRootVerifiedFrame atomic.Uint64
globalProverRootSynced atomic.Bool
globalProverSyncInProgress atomic.Bool
globalSyncCooldownUntilFrame atomic.Uint64
// Genesis initialization
genesisInitialized atomic.Bool
@ -877,6 +883,14 @@ func NewAppConsensusEngine(
engine.processPeerInfoMessageQueue(ctx)
}))
componentBuilder.AddWorker(namedWorker("globalMessageStream", func(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
) {
ready()
engine.streamGlobalMessagesFromMaster(ctx)
}))
componentBuilder.AddWorker(namedWorker("dispatchMsgQueue", func(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
@ -938,26 +952,6 @@ func NewAppConsensusEngine(
return nil, err
}
err = engine.subscribeToGlobalFrameMessages()
if err != nil {
return nil, err
}
err = engine.subscribeToGlobalProverMessages()
if err != nil {
return nil, err
}
err = engine.subscribeToGlobalAlertMessages()
if err != nil {
return nil, err
}
err = engine.subscribeToPeerInfoMessages()
if err != nil {
return nil, err
}
err = engine.subscribeToDispatchMessages()
if err != nil {
return nil, err
@ -982,14 +976,6 @@ func (e *AppConsensusEngine) Stop(force bool) <-chan error {
e.pubsub.UnregisterValidator(e.getProverMessageBitmask())
e.pubsub.Unsubscribe(e.getFrameMessageBitmask(), false)
e.pubsub.UnregisterValidator(e.getFrameMessageBitmask())
e.pubsub.Unsubscribe(e.getGlobalFrameMessageBitmask(), false)
e.pubsub.UnregisterValidator(e.getGlobalFrameMessageBitmask())
e.pubsub.Unsubscribe(e.getGlobalProverMessageBitmask(), false)
e.pubsub.UnregisterValidator(e.getGlobalProverMessageBitmask())
e.pubsub.Unsubscribe(e.getGlobalAlertMessageBitmask(), false)
e.pubsub.UnregisterValidator(e.getGlobalAlertMessageBitmask())
e.pubsub.Unsubscribe(e.getGlobalPeerInfoMessageBitmask(), false)
e.pubsub.UnregisterValidator(e.getGlobalPeerInfoMessageBitmask())
e.pubsub.Unsubscribe(e.getDispatchMessageBitmask(), false)
e.pubsub.UnregisterValidator(e.getDispatchMessageBitmask())
@ -1045,7 +1031,16 @@ func (e *AppConsensusEngine) handleGlobalProverRoot(
)
e.globalProverRootSynced.Store(false)
e.globalProverRootVerifiedFrame.Store(0)
if frameNumber < e.globalSyncCooldownUntilFrame.Load() {
e.logger.Debug(
"global prover root error, skipping sync (cooldown active)",
zap.Uint64("frame_number", frameNumber),
zap.Uint64("cooldown_until", e.globalSyncCooldownUntilFrame.Load()),
)
return
}
e.performBlockingGlobalHypersync(frame.Header.Prover, expectedProverRoot)
e.globalSyncCooldownUntilFrame.Store(frameNumber + globalSyncCooldownFrames)
return
}
@ -1062,7 +1057,16 @@ func (e *AppConsensusEngine) handleGlobalProverRoot(
)
e.globalProverRootSynced.Store(false)
e.globalProverRootVerifiedFrame.Store(0)
if frameNumber < e.globalSyncCooldownUntilFrame.Load() {
e.logger.Debug(
"global prover root mismatch, skipping sync (cooldown active)",
zap.Uint64("frame_number", frameNumber),
zap.Uint64("cooldown_until", e.globalSyncCooldownUntilFrame.Load()),
)
return
}
e.performBlockingGlobalHypersync(frame.Header.Prover, expectedProverRoot)
e.globalSyncCooldownUntilFrame.Store(frameNumber + globalSyncCooldownFrames)
// Re-compute local root after sync to verify convergence, matching
// the global engine's post-sync verification pattern.
@ -1080,6 +1084,7 @@ func (e *AppConsensusEngine) handleGlobalProverRoot(
)
e.globalProverRootSynced.Store(true)
e.globalProverRootVerifiedFrame.Store(frameNumber)
e.globalSyncCooldownUntilFrame.Store(0)
if err := e.proverRegistry.Refresh(); err != nil {
e.logger.Warn("failed to refresh prover registry", zap.Error(err))
}
@ -1101,6 +1106,7 @@ func (e *AppConsensusEngine) handleGlobalProverRoot(
e.globalProverRootSynced.Store(true)
e.globalProverRootVerifiedFrame.Store(frameNumber)
e.globalSyncCooldownUntilFrame.Store(0)
if err := e.proverRegistry.Refresh(); err != nil {
e.logger.Warn("failed to refresh prover registry", zap.Error(err))
@ -1476,8 +1482,11 @@ func (e *AppConsensusEngine) materialize(
zap.Any("current_changeset_count", len(state.Changeset())),
)
if err := state.Commit(); err != nil {
return errors.Wrap(err, "materialize")
e.commitBarrier.Lock()
stateCommitErr := state.Commit()
e.commitBarrier.Unlock()
if stateCommitErr != nil {
return errors.Wrap(stateCommitErr, "materialize")
}
return nil
@ -1570,10 +1579,6 @@ func (e *AppConsensusEngine) getConsensusMessageBitmask() []byte {
return slices.Concat([]byte{0}, e.appFilter)
}
func (e *AppConsensusEngine) getGlobalProverMessageBitmask() []byte {
return global.GLOBAL_PROVER_BITMASK
}
func (e *AppConsensusEngine) getFrameMessageBitmask() []byte {
return e.appFilter
}
@ -1582,18 +1587,6 @@ func (e *AppConsensusEngine) getProverMessageBitmask() []byte {
return slices.Concat([]byte{0, 0, 0}, e.appFilter)
}
func (e *AppConsensusEngine) getGlobalFrameMessageBitmask() []byte {
return global.GLOBAL_FRAME_BITMASK
}
func (e *AppConsensusEngine) getGlobalAlertMessageBitmask() []byte {
return global.GLOBAL_ALERT_BITMASK
}
func (e *AppConsensusEngine) getGlobalPeerInfoMessageBitmask() []byte {
return global.GLOBAL_PEER_INFO_BITMASK
}
func (e *AppConsensusEngine) getDispatchMessageBitmask() []byte {
return slices.Concat([]byte{0, 0}, e.appFilter)
}
@ -2166,10 +2159,12 @@ func (e *AppConsensusEngine) internalProveFrame(
return nil, errors.New("no proving key available")
}
e.commitBarrier.Lock()
stateRoots, err := e.hypergraph.CommitShard(
previousFrame.Header.FrameNumber+1,
e.appAddress,
)
e.commitBarrier.Unlock()
if err != nil {
return nil, err
}

View File

@ -1137,12 +1137,6 @@ func (e *AppConsensusEngine) addCertifiedState(
return
}
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
}
aggregateSig := &protobufs.BLS48581AggregateSignature{
Signature: qc.GetAggregatedSignature().GetSignature(),
PublicKey: &protobufs.BLS48581G2PublicKey{
@ -1150,30 +1144,10 @@ func (e *AppConsensusEngine) addCertifiedState(
},
Bitmask: qc.GetAggregatedSignature().GetBitmask(),
}
if err := e.clockStore.PutQuorumCertificate(
&protobufs.QuorumCertificate{
Filter: e.appAddress,
Rank: qc.GetRank(),
FrameNumber: qc.GetFrameNumber(),
Selector: []byte(qc.Identity()),
AggregateSignature: aggregateSig,
},
txn,
); err != nil {
e.logger.Error("could not insert quorum certificate", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()
return
}
parent.State.Header.PublicKeySignatureBls48581 = aggregateSig
txn, err = e.clockStore.NewTransaction(false)
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
@ -1209,6 +1183,21 @@ func (e *AppConsensusEngine) addCertifiedState(
return
}
if err := e.clockStore.PutQuorumCertificate(
&protobufs.QuorumCertificate{
Filter: e.appAddress,
Rank: qc.GetRank(),
FrameNumber: qc.GetFrameNumber(),
Selector: []byte(qc.Identity()),
AggregateSignature: aggregateSig,
},
txn,
); err != nil {
e.logger.Error("could not insert quorum certificate", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()

View File

@ -1,9 +1,18 @@
package app
import (
"bytes"
"context"
"io"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/global"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/rpm"
"source.quilibrium.com/quilibrium/monorepo/types/p2p"
)
@ -50,29 +59,6 @@ func (e *AppConsensusEngine) subscribeToConsensusMessages() error {
return nil
}
func (e *AppConsensusEngine) subscribeToGlobalProverMessages() error {
if err := e.pubsub.Subscribe(
e.getGlobalProverMessageBitmask(),
func(message *pb.Message) error {
return nil
},
); err != nil {
return errors.Wrap(err, "subscribe to consensus messages")
}
// Register consensus message validator
if err := e.pubsub.RegisterValidator(
e.getGlobalProverMessageBitmask(),
func(peerID peer.ID, message *pb.Message) p2p.ValidationResult {
return e.validateGlobalProverMessage(peerID, message)
},
true,
); err != nil {
return errors.Wrap(err, "subscribe to consensus messages")
}
return nil
}
func (e *AppConsensusEngine) subscribeToProverMessages() error {
if err := e.pubsub.Subscribe(
@ -147,105 +133,6 @@ func (e *AppConsensusEngine) subscribeToFrameMessages() error {
return nil
}
func (e *AppConsensusEngine) subscribeToGlobalFrameMessages() error {
if err := e.pubsub.Subscribe(
e.getGlobalFrameMessageBitmask(),
func(message *pb.Message) error {
select {
case <-e.haltCtx.Done():
return nil
case e.globalFrameMessageQueue <- message:
return nil
case <-e.ShutdownSignal():
return errors.New("context cancelled")
default:
e.logger.Warn("global message queue full, dropping message")
return nil
}
},
); err != nil {
return errors.Wrap(err, "subscribe to global frame messages")
}
// Register frame validator
if err := e.pubsub.RegisterValidator(
e.getGlobalFrameMessageBitmask(),
func(peerID peer.ID, message *pb.Message) p2p.ValidationResult {
return e.validateGlobalFrameMessage(peerID, message)
},
true,
); err != nil {
return errors.Wrap(err, "subscribe to global frame messages")
}
return nil
}
func (e *AppConsensusEngine) subscribeToGlobalAlertMessages() error {
if err := e.pubsub.Subscribe(
e.getGlobalAlertMessageBitmask(),
func(message *pb.Message) error {
select {
case e.globalAlertMessageQueue <- message:
return nil
case <-e.ShutdownSignal():
return errors.New("context cancelled")
default:
e.logger.Warn("global alert queue full, dropping message")
return nil
}
},
); err != nil {
return errors.Wrap(err, "subscribe to global alert messages")
}
// Register alert validator
if err := e.pubsub.RegisterValidator(
e.getGlobalAlertMessageBitmask(),
func(peerID peer.ID, message *pb.Message) p2p.ValidationResult {
return e.validateAlertMessage(peerID, message)
},
true,
); err != nil {
return errors.Wrap(err, "subscribe to global alert messages")
}
return nil
}
func (e *AppConsensusEngine) subscribeToPeerInfoMessages() error {
if err := e.pubsub.Subscribe(
e.getGlobalPeerInfoMessageBitmask(),
func(message *pb.Message) error {
select {
case <-e.haltCtx.Done():
return nil
case e.globalPeerInfoMessageQueue <- message:
return nil
case <-e.ShutdownSignal():
return errors.New("context cancelled")
default:
e.logger.Warn("peer info message queue full, dropping message")
return nil
}
},
); err != nil {
return errors.Wrap(err, "subscribe to peer info messages")
}
// Register frame validator
if err := e.pubsub.RegisterValidator(
e.getGlobalPeerInfoMessageBitmask(),
func(peerID peer.ID, message *pb.Message) p2p.ValidationResult {
return e.validatePeerInfoMessage(peerID, message)
},
true,
); err != nil {
return errors.Wrap(err, "subscribe to peer info messages")
}
return nil
}
func (e *AppConsensusEngine) subscribeToDispatchMessages() error {
if err := e.pubsub.Subscribe(
@ -278,3 +165,101 @@ func (e *AppConsensusEngine) subscribeToDispatchMessages() error {
return nil
}
func (e *AppConsensusEngine) streamGlobalMessagesFromMaster(
ctx lifecycle.SignalerContext,
) {
for {
select {
case <-ctx.Done():
return
default:
}
if err := e.ensureGlobalClient(); err != nil {
e.logger.Warn("global message stream: failed to connect to master",
zap.Error(err),
)
select {
case <-ctx.Done():
return
case <-time.After(5 * time.Second):
}
continue
}
stream, err := e.globalClient.StreamGlobalMessages(
ctx,
&protobufs.StreamGlobalMessagesRequest{},
)
if err != nil {
e.logger.Warn("global message stream: failed to open stream",
zap.Error(err),
)
select {
case <-ctx.Done():
return
case <-time.After(5 * time.Second):
}
continue
}
e.logger.Info("connected to master global message stream")
e.receiveGlobalMessages(ctx, stream)
e.logger.Warn("global message stream disconnected, reconnecting")
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
}
}
}
func (e *AppConsensusEngine) receiveGlobalMessages(
ctx lifecycle.SignalerContext,
stream protobufs.GlobalService_StreamGlobalMessagesClient,
) {
for {
select {
case <-ctx.Done():
return
default:
}
msg, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
return
}
e.logger.Warn("global message stream: recv error",
zap.Error(err),
)
return
}
pbMsg := &pb.Message{
Data: msg.Data,
Bitmask: msg.Bitmask,
}
switch {
case bytes.Equal(msg.Bitmask, global.GLOBAL_FRAME_BITMASK):
select {
case e.globalFrameMessageQueue <- pbMsg:
default:
}
case bytes.Equal(msg.Bitmask, global.GLOBAL_ALERT_BITMASK):
select {
case e.globalAlertMessageQueue <- pbMsg:
default:
}
case bytes.Equal(msg.Bitmask, global.GLOBAL_PEER_INFO_BITMASK):
select {
case e.globalPeerInfoMessageQueue <- pbMsg:
default:
}
// GLOBAL_PROVER_BITMASK: intentionally not dispatched (workers discard)
}
}
}

View File

@ -674,9 +674,24 @@ func (e *GlobalConsensusEngine) evaluateForProposals(
}
if len(pendingFilters) != 0 {
// Build a descriptor list that excludes self's active allocations.
// DecideJoins computes bestScore from this list — if we include
// active allocations, their high scores cause perpetual rejection
// of pending joins (the proposer compares against shards it can't
// actually switch to, creating an infinite propose-reject loop).
pendingSet := make(map[string]struct{}, len(pendingFilters))
for _, pf := range pendingFilters {
pendingSet[string(pf)] = struct{}{}
}
decideCandidates := slices.Clone(proposalDescriptors)
for _, d := range decideDescriptors {
if _, isPending := pendingSet[string(d.Filter)]; isPending {
decideCandidates = append(decideCandidates, d)
}
}
if err := e.proposer.DecideJoins(
uint64(data.Frame.Header.Difficulty),
decideDescriptors,
decideCandidates,
pendingFilters,
worldBytes,
); err != nil {
@ -688,6 +703,53 @@ func (e *GlobalConsensusEngine) evaluateForProposals(
)
}
}
// Leave rebalancing: propose leaves for overcrowded shards
if len(snapshot.leaveProposalCandidates) > 0 && canPropose && !joinProposedThisCycle {
leaveFilters, err := e.proposer.PlanLeaves(
uint64(data.Frame.Header.Difficulty),
snapshot.leaveProposalCandidates,
proposalDescriptors,
worldBytes,
)
if err != nil {
e.logger.Error("could not plan leaves", zap.Error(err))
} else if len(leaveFilters) > 0 {
e.lastJoinAttemptFrame.Store(data.Frame.Header.FrameNumber)
e.logger.Info(
"proposed leaves",
zap.Int("leave_proposals", len(leaveFilters)),
)
}
}
// Decide pending leaves in the 360-720 frame window
if len(snapshot.pendingLeaveFilters) > 0 {
// Build decideCandidates for leaves: unallocated shards + leaving shard descriptors
pendingLeaveSet := make(map[string]struct{}, len(snapshot.pendingLeaveFilters))
for _, pf := range snapshot.pendingLeaveFilters {
pendingLeaveSet[string(pf)] = struct{}{}
}
leaveDecideCandidates := slices.Clone(proposalDescriptors)
for _, d := range decideDescriptors {
if _, isLeaving := pendingLeaveSet[string(d.Filter)]; isLeaving {
leaveDecideCandidates = append(leaveDecideCandidates, d)
}
}
if err := e.proposer.DecideLeaves(
uint64(data.Frame.Header.Difficulty),
leaveDecideCandidates,
snapshot.pendingLeaveFilters,
worldBytes,
); err != nil {
e.logger.Error("could not decide leaves", zap.Error(err))
} else {
e.logger.Info(
"decided on leaves",
zap.Int("leaves", len(snapshot.pendingLeaveFilters)),
)
}
}
}
type allocationSnapshot struct {
@ -702,6 +764,10 @@ type allocationSnapshot struct {
proposalDescriptors []provers.ShardDescriptor
decideDescriptors []provers.ShardDescriptor
worldBytes *big.Int
// Leave rebalancing fields
leaveProposalCandidates []provers.ShardDescriptor // Active allocations eligible for leave
pendingLeaveFilters [][]byte // Leaving allocations in 360-720 window
}
func (s *allocationSnapshot) statusFields() []zap.Field {
@ -1042,6 +1108,8 @@ func (e *GlobalConsensusEngine) collectAllocationSnapshot(
pendingFilters := [][]byte{}
proposalDescriptors := []provers.ShardDescriptor{}
decideDescriptors := []provers.ShardDescriptor{}
leaveProposalCandidates := []provers.ShardDescriptor{}
pendingLeaveFilters := [][]byte{}
for _, shardInfo := range shards {
shardKey := slices.Concat(shardInfo.L1, shardInfo.L2)
@ -1080,6 +1148,8 @@ func (e *GlobalConsensusEngine) collectAllocationSnapshot(
allocated := false
pending := false
isActiveAllocation := false
isPendingLeave := false
if self != nil {
for _, allocation := range self.Allocations {
if bytes.Equal(allocation.ConfirmationFilter, bp) {
@ -1104,9 +1174,15 @@ func (e *GlobalConsensusEngine) collectAllocationSnapshot(
}
if allocation.Status == typesconsensus.ProverStatusActive {
shardsActive++
isActiveAllocation = true
}
if allocation.Status == typesconsensus.ProverStatusLeaving {
shardsLeaving++
// Check if in the 360-720 decision window
if allocation.LeaveFrameNumber+360 <= data.Frame.Header.FrameNumber &&
data.Frame.Header.FrameNumber <= allocation.LeaveFrameNumber+pendingFilterGraceFrames {
isPendingLeave = true
}
}
if allocation.Status == typesconsensus.ProverStatusPaused {
shardsPaused++
@ -1159,6 +1235,20 @@ func (e *GlobalConsensusEngine) collectAllocationSnapshot(
},
)
}
if isActiveAllocation {
leaveProposalCandidates = append(
leaveProposalCandidates,
provers.ShardDescriptor{
Filter: bp,
Size: size.Uint64(),
Ring: uint8(len(above) / 8),
Shards: shard.DataShards,
},
)
}
if isPendingLeave {
pendingLeaveFilters = append(pendingLeaveFilters, bp)
}
decideDescriptors = append(
decideDescriptors,
provers.ShardDescriptor{
@ -1177,17 +1267,19 @@ func (e *GlobalConsensusEngine) collectAllocationSnapshot(
}
return &allocationSnapshot{
shardsPending: shardsPending,
awaitingFrames: awaitingFrames,
shardsLeaving: shardsLeaving,
shardsActive: shardsActive,
shardsPaused: shardsPaused,
shardDivisions: shardDivisions,
logicalShards: logicalShards,
pendingFilters: pendingFilters,
proposalDescriptors: proposalDescriptors,
decideDescriptors: decideDescriptors,
worldBytes: worldBytes,
shardsPending: shardsPending,
awaitingFrames: awaitingFrames,
shardsLeaving: shardsLeaving,
shardsActive: shardsActive,
shardsPaused: shardsPaused,
shardDivisions: shardDivisions,
logicalShards: logicalShards,
pendingFilters: pendingFilters,
proposalDescriptors: proposalDescriptors,
decideDescriptors: decideDescriptors,
worldBytes: worldBytes,
leaveProposalCandidates: leaveProposalCandidates,
pendingLeaveFilters: pendingLeaveFilters,
}, true
}

View File

@ -66,6 +66,12 @@ func (m *mockWorkerManager) ProposeAllocations(coreIds []uint, filters [][]byte)
func (m *mockWorkerManager) DecideAllocations(reject [][]byte, confirm [][]byte) error {
return nil
}
func (m *mockWorkerManager) ProposeLeave(filters [][]byte) error {
return nil
}
func (m *mockWorkerManager) DecideLeave(reject [][]byte, confirm [][]byte) error {
return nil
}
func (m *mockWorkerManager) RespawnWorker(coreId uint, filter []byte) error {
return nil
}

View File

@ -109,6 +109,42 @@ func contextFromShutdownSignal(sig <-chan struct{}) context.Context {
return ctx
}
func (e *GlobalConsensusEngine) addGlobalMessageSubscriber(
ch chan *protobufs.StreamGlobalMessagesResponse,
) {
e.globalMessageSubscribersMu.Lock()
e.globalMessageSubscribers[ch] = struct{}{}
e.globalMessageSubscribersMu.Unlock()
}
func (e *GlobalConsensusEngine) removeGlobalMessageSubscriber(
ch chan *protobufs.StreamGlobalMessagesResponse,
) {
e.globalMessageSubscribersMu.Lock()
delete(e.globalMessageSubscribers, ch)
e.globalMessageSubscribersMu.Unlock()
}
func (e *GlobalConsensusEngine) broadcastGlobalMessage(
data []byte,
bitmask []byte,
) {
msg := &protobufs.StreamGlobalMessagesResponse{
Data: data,
Bitmask: bitmask,
}
e.globalMessageSubscribersMu.RLock()
defer e.globalMessageSubscribersMu.RUnlock()
for ch := range e.globalMessageSubscribers {
select {
case ch <- msg:
default:
}
}
}
// GlobalConsensusEngine uses the generic state machine for consensus
type GlobalConsensusEngine struct {
*lifecycle.ComponentManager
@ -248,6 +284,7 @@ type GlobalConsensusEngine struct {
shardCommitmentTrees []*tries.VectorCommitmentTree
shardCommitmentKeySets []map[string]struct{}
shardCommitmentMu sync.Mutex
commitBarrier sync.Mutex
// Authentication provider
authProvider channel.AuthenticationProvider
@ -262,6 +299,10 @@ type GlobalConsensusEngine struct {
// gRPC server for services
grpcServer *grpc.Server
grpcListener net.Listener
// Global message streaming to workers
globalMessageSubscribersMu sync.RWMutex
globalMessageSubscribers map[chan *protobufs.StreamGlobalMessagesResponse]struct{}
}
// NewGlobalConsensusEngine creates a new global consensus engine using the
@ -349,6 +390,7 @@ func NewGlobalConsensusEngine(
txLockMap: make(map[uint64]map[string]map[string]*LockedTransaction),
appShardCache: make(map[string]*appShardCacheEntry),
globalMessageSpillover: make(map[uint64][][]byte),
globalMessageSubscribers: make(map[chan *protobufs.StreamGlobalMessagesResponse]struct{}),
}
engine.frameChainChecker = NewFrameChainChecker(clockStore, logger)
@ -435,6 +477,8 @@ func NewGlobalConsensusEngine(
config,
engine.ProposeWorkerJoin,
engine.DecideWorkerJoins,
engine.ProposeWorkerLeave,
engine.DecideWorkerLeaves,
)
if !config.Engine.ArchiveMode {
strategy := provers.RewardGreedy
@ -1662,7 +1706,9 @@ func (e *GlobalConsensusEngine) materialize(
var appliedCount atomic.Int64
var skippedCount atomic.Int64
e.commitBarrier.Lock()
_, err := e.hypergraph.Commit(frameNumber)
e.commitBarrier.Unlock()
if err != nil {
e.logger.Error("error committing hypergraph", zap.Error(err))
return errors.Wrap(err, "materialize")
@ -1836,8 +1882,11 @@ func (e *GlobalConsensusEngine) materialize(
return err
}
if err := state.Commit(); err != nil {
return errors.Wrap(err, "materialize")
e.commitBarrier.Lock()
stateCommitErr := state.Commit()
e.commitBarrier.Unlock()
if stateCommitErr != nil {
return errors.Wrap(stateCommitErr, "materialize")
}
// Persist any alt shard updates from this frame
@ -3806,6 +3855,167 @@ func (e *GlobalConsensusEngine) DecideWorkerJoins(
return nil
}
func (e *GlobalConsensusEngine) ProposeWorkerLeave(
filters [][]byte,
) error {
frame := e.GetFrame()
if frame == nil {
e.logger.Debug("cannot propose leave, no frame")
return errors.New("not ready")
}
_, err := e.keyManager.GetSigningKey("q-prover-key")
if err != nil {
e.logger.Debug("cannot propose leave, no signer key")
return errors.Wrap(err, "propose worker leave")
}
leave, err := global.NewProverLeave(
filters,
frame.Header.FrameNumber,
e.keyManager,
e.hypergraph,
schema.NewRDFMultiprover(&schema.TurtleRDFParser{}, e.inclusionProver),
)
if err != nil {
e.logger.Error("could not construct leave", zap.Error(err))
return errors.Wrap(err, "propose worker leave")
}
err = leave.Prove(frame.Header.FrameNumber)
if err != nil {
e.logger.Error("could not prove leave", zap.Error(err))
return errors.Wrap(err, "propose worker leave")
}
bundle := &protobufs.MessageBundle{
Requests: []*protobufs.MessageRequest{
{
Request: &protobufs.MessageRequest_Leave{
Leave: leave.ToProtobuf(),
},
},
},
Timestamp: time.Now().UnixMilli(),
}
msg, err := bundle.ToCanonicalBytes()
if err != nil {
e.logger.Error("could not serialize leave", zap.Error(err))
return errors.Wrap(err, "propose worker leave")
}
err = e.pubsub.PublishToBitmask(
GLOBAL_PROVER_BITMASK,
msg,
)
if err != nil {
e.logger.Error("could not publish leave", zap.Error(err))
return errors.Wrap(err, "propose worker leave")
}
e.logger.Info(
"submitted leave request",
zap.Int("filters", len(filters)),
)
return nil
}
func (e *GlobalConsensusEngine) DecideWorkerLeaves(
reject [][]byte,
confirm [][]byte,
) error {
frame := e.GetFrame()
if frame == nil {
e.logger.Debug("cannot decide leaves, no frame")
return errors.New("not ready")
}
_, err := e.keyManager.GetSigningKey("q-prover-key")
if err != nil {
e.logger.Debug("cannot decide leaves, no signer key")
return errors.Wrap(err, "decide worker leaves")
}
bundle := &protobufs.MessageBundle{
Requests: []*protobufs.MessageRequest{},
}
if len(reject) != 0 {
rejectMessage, err := global.NewProverReject(
reject,
frame.Header.FrameNumber,
e.keyManager,
e.hypergraph,
schema.NewRDFMultiprover(&schema.TurtleRDFParser{}, e.inclusionProver),
)
if err != nil {
e.logger.Error("could not construct leave reject", zap.Error(err))
return errors.Wrap(err, "decide worker leaves")
}
err = rejectMessage.Prove(frame.Header.FrameNumber)
if err != nil {
e.logger.Error("could not prove leave reject", zap.Error(err))
return errors.Wrap(err, "decide worker leaves")
}
bundle.Requests = append(bundle.Requests, &protobufs.MessageRequest{
Request: &protobufs.MessageRequest_Reject{
Reject: rejectMessage.ToProtobuf(),
},
})
}
if len(confirm) != 0 {
confirmMessage, err := global.NewProverConfirm(
confirm,
frame.Header.FrameNumber,
e.keyManager,
e.hypergraph,
schema.NewRDFMultiprover(&schema.TurtleRDFParser{}, e.inclusionProver),
)
if err != nil {
e.logger.Error("could not construct leave confirm", zap.Error(err))
return errors.Wrap(err, "decide worker leaves")
}
err = confirmMessage.Prove(frame.Header.FrameNumber)
if err != nil {
e.logger.Error("could not prove leave confirm", zap.Error(err))
return errors.Wrap(err, "decide worker leaves")
}
bundle.Requests = append(bundle.Requests, &protobufs.MessageRequest{
Request: &protobufs.MessageRequest_Confirm{
Confirm: confirmMessage.ToProtobuf(),
},
})
}
bundle.Timestamp = time.Now().UnixMilli()
msg, err := bundle.ToCanonicalBytes()
if err != nil {
e.logger.Error("could not serialize leave decisions", zap.Error(err))
return errors.Wrap(err, "decide worker leaves")
}
err = e.pubsub.PublishToBitmask(
GLOBAL_PROVER_BITMASK,
msg,
)
if err != nil {
e.logger.Error("could not publish leave decisions", zap.Error(err))
return errors.Wrap(err, "decide worker leaves")
}
e.logger.Debug("submitted leave decisions")
return nil
}
func (e *GlobalConsensusEngine) startConsensus(
trustedRoot *models.CertifiedState[*protobufs.GlobalFrame],
pending []*models.SignedProposal[
@ -4299,7 +4509,9 @@ func (e *GlobalConsensusEngine) rebuildShardCommitments(
frameNumber uint64,
rank uint64,
) ([]byte, error) {
e.commitBarrier.Lock()
commitSet, err := e.hypergraph.Commit(frameNumber)
e.commitBarrier.Unlock()
if err != nil {
e.logger.Error("could not commit", zap.Error(err))
return nil, errors.Wrap(err, "rebuild shard commitments")

View File

@ -1649,12 +1649,6 @@ func (e *GlobalConsensusEngine) addCertifiedState(
return
}
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
}
qc := child.ParentQuorumCertificate
if qc == nil {
e.logger.Error(
@ -1670,29 +1664,10 @@ func (e *GlobalConsensusEngine) addCertifiedState(
},
Bitmask: qc.GetAggregatedSignature().GetBitmask(),
}
if err := e.clockStore.PutQuorumCertificate(
&protobufs.QuorumCertificate{
Rank: qc.GetRank(),
FrameNumber: qc.GetFrameNumber(),
Selector: []byte(qc.Identity()),
AggregateSignature: aggregateSig,
},
txn,
); err != nil {
e.logger.Error("could not insert quorum certificate", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()
return
}
parent.State.Header.PublicKeySignatureBls48581 = aggregateSig
txn, err = e.clockStore.NewTransaction(false)
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
@ -1718,6 +1693,20 @@ func (e *GlobalConsensusEngine) addCertifiedState(
return
}
if err := e.clockStore.PutQuorumCertificate(
&protobufs.QuorumCertificate{
Rank: qc.GetRank(),
FrameNumber: qc.GetFrameNumber(),
Selector: []byte(qc.Identity()),
AggregateSignature: aggregateSig,
},
txn,
); err != nil {
e.logger.Error("could not insert quorum certificate", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()

View File

@ -132,6 +132,8 @@ func (e *GlobalConsensusEngine) subscribeToFrameMessages() error {
if err := e.pubsub.Subscribe(
GLOBAL_FRAME_BITMASK,
func(message *pb.Message) error {
e.broadcastGlobalMessage(message.Data, GLOBAL_FRAME_BITMASK)
// Don't subscribe if running in consensus, the time reel shouldn't have
// the frame ahead of time
if e.config.P2P.Network == 99 || e.config.Engine.ArchiveMode {
@ -171,6 +173,8 @@ func (e *GlobalConsensusEngine) subscribeToProverMessages() error {
if err := e.pubsub.Subscribe(
GLOBAL_PROVER_BITMASK,
func(message *pb.Message) error {
e.broadcastGlobalMessage(message.Data, GLOBAL_PROVER_BITMASK)
if e.config.P2P.Network != 99 && !e.config.Engine.ArchiveMode {
return nil
}
@ -210,6 +214,8 @@ func (e *GlobalConsensusEngine) subscribeToPeerInfoMessages() error {
if err := e.pubsub.Subscribe(
GLOBAL_PEER_INFO_BITMASK,
func(message *pb.Message) error {
e.broadcastGlobalMessage(message.Data, GLOBAL_PEER_INFO_BITMASK)
select {
case <-e.haltCtx.Done():
return nil
@ -244,6 +250,8 @@ func (e *GlobalConsensusEngine) subscribeToAlertMessages() error {
if err := e.pubsub.Subscribe(
GLOBAL_ALERT_BITMASK,
func(message *pb.Message) error {
e.broadcastGlobalMessage(message.Data, GLOBAL_ALERT_BITMASK)
select {
case e.globalAlertMessageQueue <- message:
return nil

View File

@ -411,6 +411,40 @@ func (e *GlobalConsensusEngine) GetWorkerInfo(
return resp, nil
}
func (e *GlobalConsensusEngine) StreamGlobalMessages(
req *protobufs.StreamGlobalMessagesRequest,
stream protobufs.GlobalService_StreamGlobalMessagesServer,
) error {
peerID, ok := qgrpc.PeerIDFromContext(stream.Context())
if !ok {
return status.Error(codes.Internal, "remote peer ID not found")
}
if !bytes.Equal(e.pubsub.GetPeerID(), []byte(peerID)) {
return status.Error(codes.PermissionDenied, "only local workers may stream global messages")
}
ch := make(chan *protobufs.StreamGlobalMessagesResponse, 256)
e.addGlobalMessageSubscriber(ch)
defer e.removeGlobalMessageSubscriber(ch)
e.logger.Info("worker connected to global message stream",
zap.String("peer_id", peerID.String()),
)
ctx := stream.Context()
for {
select {
case <-ctx.Done():
return ctx.Err()
case msg := <-ch:
if err := stream.Send(msg); err != nil {
return err
}
}
}
}
func (e *GlobalConsensusEngine) RegisterServices(server *grpc.Server) {
protobufs.RegisterGlobalServiceServer(server, e)
protobufs.RegisterDispatchServiceServer(server, e.dispatchService)

View File

@ -533,3 +533,212 @@ func (m *Manager) unallocatedWorkerCount() (int, error) {
}
return count, nil
}
// PlanLeaves identifies Active allocations that are on overcrowded shards
// (high Ring) when significantly better unallocated shards exist. Returns up
// to 3 filters to propose leaving.
func (m *Manager) PlanLeaves(
difficulty uint64,
allocatedShards []ShardDescriptor,
unallocatedShards []ShardDescriptor,
worldBytes *big.Int,
) ([][]byte, error) {
if len(allocatedShards) == 0 || len(unallocatedShards) == 0 {
return nil, nil
}
if worldBytes.Cmp(big.NewInt(0)) == 0 {
return nil, nil
}
basis := reward.PomwBasis(difficulty, worldBytes.Uint64(), m.Units)
unallocatedScores, err := m.scoreShards(unallocatedShards, basis, worldBytes)
if err != nil {
return nil, errors.Wrap(err, "plan leaves")
}
// Find best unallocated score.
var bestUnallocatedScore *big.Int
for _, sc := range unallocatedScores {
if bestUnallocatedScore == nil || sc.score.Cmp(bestUnallocatedScore) > 0 {
bestUnallocatedScore = sc.score
}
}
if bestUnallocatedScore == nil || bestUnallocatedScore.Sign() == 0 {
return nil, nil
}
allocatedScores, err := m.scoreShards(allocatedShards, basis, worldBytes)
if err != nil {
return nil, errors.Wrap(err, "plan leaves")
}
// Leave threshold: allocated shard must score below 67% of best unallocated
// (i.e., the alternative is ~50% better).
leaveThreshold := new(big.Int).Mul(bestUnallocatedScore, big.NewInt(67))
leaveThreshold.Div(leaveThreshold, big.NewInt(100))
// Collect leave candidates.
type candidate struct {
filter []byte
score *big.Int
}
candidates := make([]candidate, 0)
for _, sc := range allocatedScores {
if sc.score.Cmp(leaveThreshold) < 0 {
candidates = append(candidates, candidate{
filter: allocatedShards[sc.idx].Filter,
score: sc.score,
})
}
}
if len(candidates) == 0 {
return nil, nil
}
// Sort by score ascending (worst shards first).
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].score.Cmp(candidates[j].score) < 0
})
// Return up to 3 filters.
limit := 3
if limit > len(candidates) {
limit = len(candidates)
}
filters := make([][]byte, 0, limit)
for i := 0; i < limit; i++ {
fc := make([]byte, len(candidates[i].filter))
copy(fc, candidates[i].filter)
filters = append(filters, fc)
}
err = m.workerMgr.ProposeLeave(filters)
if err != nil {
return nil, errors.Wrap(err, "plan leaves")
}
return filters, nil
}
// DecideLeaves confirms or rejects pending leaves after 360 frames. For each
// pending leave filter:
// - If the shard's score is competitive (>= 67% of best), reject the leave
// (the shard improved, stay on it).
// - If the shard's score is still poor (< 67% of best), confirm the leave
// (better alternatives still exist).
// - If the filter is not found in shards, confirm (shard disappeared).
func (m *Manager) DecideLeaves(
difficulty uint64,
shards []ShardDescriptor,
pendingLeaves [][]byte,
worldBytes *big.Int,
) error {
if len(pendingLeaves) == 0 {
return nil
}
if len(shards) == 0 {
// No shards to score — confirm all leaves.
confirm := make([][]byte, 0, len(pendingLeaves))
for _, p := range pendingLeaves {
if len(p) == 0 {
continue
}
if len(confirm) > 99 {
break
}
pc := make([]byte, len(p))
copy(pc, p)
confirm = append(confirm, pc)
}
return m.workerMgr.DecideLeave(nil, confirm)
}
basis := reward.PomwBasis(difficulty, worldBytes.Uint64(), m.Units)
scores, err := m.scoreShards(shards, basis, worldBytes)
if err != nil {
return errors.Wrap(err, "decide leaves")
}
type srec struct {
desc ShardDescriptor
score *big.Int
}
byHex := make(map[string]srec, len(shards))
var bestScore *big.Int
for _, sc := range scores {
s := shards[sc.idx]
key := hex.EncodeToString(s.Filter)
byHex[key] = srec{desc: s, score: sc.score}
if bestScore == nil || sc.score.Cmp(bestScore) > 0 {
bestScore = sc.score
}
}
if bestScore == nil {
// Nothing scored — confirm all leaves.
confirm := make([][]byte, 0, len(pendingLeaves))
for _, p := range pendingLeaves {
if len(p) == 0 {
continue
}
if len(confirm) > 99 {
break
}
pc := make([]byte, len(p))
copy(pc, p)
confirm = append(confirm, pc)
}
return m.workerMgr.DecideLeave(nil, confirm)
}
// Reject threshold: if the leaving shard's score >= 67% of best, reject
// the leave (the shard has improved enough to stay).
rejectThreshold := new(big.Int).Mul(bestScore, big.NewInt(67))
rejectThreshold.Div(rejectThreshold, big.NewInt(100))
reject := make([][]byte, 0, len(pendingLeaves))
confirm := make([][]byte, 0, len(pendingLeaves))
for _, p := range pendingLeaves {
if len(p) == 0 {
continue
}
if len(reject) > 99 {
break
}
if len(confirm) > 99 {
break
}
key := hex.EncodeToString(p)
rec, ok := byHex[key]
if !ok {
// Shard disappeared — confirm the leave.
pc := make([]byte, len(p))
copy(pc, p)
confirm = append(confirm, pc)
continue
}
if rec.score.Cmp(rejectThreshold) >= 0 {
// Shard is now competitive — reject the leave (stay).
pc := make([]byte, len(p))
copy(pc, p)
reject = append(reject, pc)
} else {
// Still a bad shard — confirm the leave.
pc := make([]byte, len(p))
copy(pc, p)
confirm = append(confirm, pc)
}
}
return m.workerMgr.DecideLeave(reject, confirm)
}

View File

@ -19,6 +19,11 @@ type mockWorkerManager struct {
lastFiltersHex []string
rejected [][]byte
confirmed [][]byte
// Leave tracking
leaveProposed [][]byte
leaveRejected [][]byte
leaveConfirmed [][]byte
}
// CheckWorkersConnected implements worker.WorkerManager.
@ -32,6 +37,17 @@ func (m *mockWorkerManager) DecideAllocations(reject [][]byte, confirm [][]byte)
return nil
}
func (m *mockWorkerManager) ProposeLeave(filters [][]byte) error {
m.leaveProposed = filters
return nil
}
func (m *mockWorkerManager) DecideLeave(reject [][]byte, confirm [][]byte) error {
m.leaveRejected = reject
m.leaveConfirmed = confirm
return nil
}
func (m *mockWorkerManager) AllocateWorker(coreId uint, filter []byte) error {
panic("unimplemented")
}
@ -187,6 +203,12 @@ func TestPlanAndAllocate_EqualSizes_DeterministicWhenDataGreedy(t *testing.T) {
if wm.lastFiltersHex[0] != "01" {
t.Fatalf("expected deterministic lexicographic first (01) in DataGreedy, got %s", wm.lastFiltersHex[0])
}
// Reset worker filter to simulate completion
for _, worker := range wm.workers {
worker.Filter = nil
worker.PendingFilterFrame = 0
}
}
}
@ -214,7 +236,7 @@ func TestPlanAndAllocate_UnequalScores_PicksMax(t *testing.T) {
// Confirm when pending is best (RewardGreedy)
func TestDecideJoins_ConfirmWhenBest_RewardGreedy(t *testing.T) {
wm := &mockWorkerManager{}
wm := &mockWorkerManager{workers: createWorkers(1)}
m := newTestManager(t, RewardGreedy, wm)
shards := []ShardDescriptor{
{Filter: mustDecodeHex(t, "01"), Size: 50_000, Ring: 0, Shards: 1},
@ -253,8 +275,7 @@ func TestDecideJoins_RejectWhenBetterExists_RewardGreedy(t *testing.T) {
// Tie -> confirm (RewardGreedy)
func TestDecideJoins_TieConfirms_RewardGreedy(t *testing.T) {
wm := &mockWorkerManager{}
wm := &mockWorkerManager{workers: createWorkers(1)}
m := newTestManager(t, RewardGreedy, wm)
// Same size/ring/shards -> same score
shards := []ShardDescriptor{
@ -273,7 +294,7 @@ func TestDecideJoins_TieConfirms_RewardGreedy(t *testing.T) {
}
func TestDecideJoins_DataGreedy_SizeOnly(t *testing.T) {
wm := &mockWorkerManager{}
wm := &mockWorkerManager{workers: createWorkers(3)}
m := newTestManager(t, DataGreedy, wm)
shards := []ShardDescriptor{
{Filter: mustDecodeHex(t, "aa"), Size: 10_000, Ring: 3, Shards: 16}, // worse by size
@ -287,9 +308,11 @@ func TestDecideJoins_DataGreedy_SizeOnly(t *testing.T) {
t.Fatalf("DecideJoins error: %v", err)
}
rej := setOf(toHex(wm.rejected))
cfm := setOf(toHex(wm.confirmed))
if !(rej["aa"] && !rej["bb"] && !rej["cc"] && cfm["bb"] && cfm["cc"]) {
t.Fatalf("expected reject{aa} confirm{bb,cc}; got reject=%v confirm=%v", toHex(wm.rejected), toHex(wm.confirmed))
// When any rejections exist, DecideJoins sends only rejections (confirms
// wait for the next cycle). So we only check that aa was rejected and
// bb/cc were NOT rejected.
if !rej["aa"] || rej["bb"] || rej["cc"] {
t.Fatalf("expected reject{aa} only; got reject=%v confirm=%v", toHex(wm.rejected), toHex(wm.confirmed))
}
}
@ -335,3 +358,290 @@ func setOf(ss []string) map[string]bool {
}
return m
}
// --- PlanLeaves tests ---
// PlanLeaves should propose leaving a shard when a significantly better
// unallocated shard exists.
func TestPlanLeaves_LeavesWhenBetterExists(t *testing.T) {
wm := &mockWorkerManager{}
m := newTestManager(t, RewardGreedy, wm)
// Allocated shard: small, high ring → low score
allocated := []ShardDescriptor{
{Filter: mustDecodeHex(t, "aa"), Size: 50_000, Ring: 3, Shards: 1},
}
// Unallocated shard: large, ring 0 → high score
unallocated := []ShardDescriptor{
{Filter: mustDecodeHex(t, "bb"), Size: 200_000, Ring: 0, Shards: 1},
}
filters, err := m.PlanLeaves(100, allocated, unallocated, big.NewInt(250000))
if err != nil {
t.Fatalf("PlanLeaves error: %v", err)
}
if len(filters) != 1 {
t.Fatalf("expected 1 leave filter, got %d", len(filters))
}
if hex.EncodeToString(filters[0]) != "aa" {
t.Fatalf("expected leave filter aa, got %s", hex.EncodeToString(filters[0]))
}
if len(wm.leaveProposed) != 1 {
t.Fatalf("expected ProposeLeave called with 1 filter, got %d", len(wm.leaveProposed))
}
}
// PlanLeaves should not propose leaving when the allocated shard is competitive.
func TestPlanLeaves_StaysWhenCompetitive(t *testing.T) {
wm := &mockWorkerManager{}
m := newTestManager(t, RewardGreedy, wm)
// Allocated shard: same score as unallocated
allocated := []ShardDescriptor{
{Filter: mustDecodeHex(t, "aa"), Size: 100_000, Ring: 0, Shards: 1},
}
unallocated := []ShardDescriptor{
{Filter: mustDecodeHex(t, "bb"), Size: 100_000, Ring: 0, Shards: 1},
}
filters, err := m.PlanLeaves(100, allocated, unallocated, big.NewInt(200000))
if err != nil {
t.Fatalf("PlanLeaves error: %v", err)
}
if len(filters) != 0 {
t.Fatalf("expected no leave filters when competitive, got %d", len(filters))
}
}
// PlanLeaves should not propose leaving when no unallocated shards exist.
func TestPlanLeaves_NilWhenNoUnallocated(t *testing.T) {
wm := &mockWorkerManager{}
m := newTestManager(t, RewardGreedy, wm)
allocated := []ShardDescriptor{
{Filter: mustDecodeHex(t, "aa"), Size: 50_000, Ring: 3, Shards: 1},
}
filters, err := m.PlanLeaves(100, allocated, nil, big.NewInt(50000))
if err != nil {
t.Fatalf("PlanLeaves error: %v", err)
}
if filters != nil {
t.Fatalf("expected nil when no unallocated shards, got %v", toHex(filters))
}
}
// PlanLeaves caps at 3 leave proposals.
func TestPlanLeaves_CapsAt3(t *testing.T) {
wm := &mockWorkerManager{}
m := newTestManager(t, RewardGreedy, wm)
// 5 bad allocated shards (high ring, low score)
allocated := []ShardDescriptor{
{Filter: mustDecodeHex(t, "a1"), Size: 50_000, Ring: 4, Shards: 1},
{Filter: mustDecodeHex(t, "a2"), Size: 50_000, Ring: 4, Shards: 1},
{Filter: mustDecodeHex(t, "a3"), Size: 50_000, Ring: 4, Shards: 1},
{Filter: mustDecodeHex(t, "a4"), Size: 50_000, Ring: 4, Shards: 1},
{Filter: mustDecodeHex(t, "a5"), Size: 50_000, Ring: 4, Shards: 1},
}
// 1 great unallocated shard
unallocated := []ShardDescriptor{
{Filter: mustDecodeHex(t, "bb"), Size: 200_000, Ring: 0, Shards: 1},
}
filters, err := m.PlanLeaves(100, allocated, unallocated, big.NewInt(450000))
if err != nil {
t.Fatalf("PlanLeaves error: %v", err)
}
if len(filters) != 3 {
t.Fatalf("expected 3 leave filters (cap), got %d", len(filters))
}
}
// PlanLeaves sorts worst-first: the worst scoring shard should be first.
func TestPlanLeaves_WorstFirst(t *testing.T) {
wm := &mockWorkerManager{}
m := newTestManager(t, RewardGreedy, wm)
// Two bad shards, one worse than the other
allocated := []ShardDescriptor{
{Filter: mustDecodeHex(t, "a1"), Size: 50_000, Ring: 2, Shards: 1}, // bad
{Filter: mustDecodeHex(t, "a2"), Size: 50_000, Ring: 4, Shards: 1}, // worse
}
unallocated := []ShardDescriptor{
{Filter: mustDecodeHex(t, "bb"), Size: 200_000, Ring: 0, Shards: 1},
}
filters, err := m.PlanLeaves(100, allocated, unallocated, big.NewInt(300000))
if err != nil {
t.Fatalf("PlanLeaves error: %v", err)
}
if len(filters) < 2 {
t.Fatalf("expected at least 2 leave filters, got %d", len(filters))
}
// a2 (ring 4) should be first (worst score)
if hex.EncodeToString(filters[0]) != "a2" {
t.Fatalf("expected worst shard a2 first, got %s", hex.EncodeToString(filters[0]))
}
}
// --- DecideLeaves tests ---
// DecideLeaves should confirm a leave when the shard is still bad.
func TestDecideLeaves_ConfirmWhenStillBad(t *testing.T) {
wm := &mockWorkerManager{}
m := newTestManager(t, RewardGreedy, wm)
// Leaving shard is much worse than the best
shards := []ShardDescriptor{
{Filter: mustDecodeHex(t, "aa"), Size: 50_000, Ring: 3, Shards: 1}, // leaving this (bad)
{Filter: mustDecodeHex(t, "bb"), Size: 200_000, Ring: 0, Shards: 1}, // best available
}
pendingLeaves := [][]byte{mustDecodeHex(t, "aa")}
err := m.DecideLeaves(100, shards, pendingLeaves, big.NewInt(250000))
if err != nil {
t.Fatalf("DecideLeaves error: %v", err)
}
if len(wm.leaveConfirmed) != 1 || hex.EncodeToString(wm.leaveConfirmed[0]) != "aa" {
t.Fatalf("expected confirm aa, got reject=%v confirm=%v",
toHex(wm.leaveRejected), toHex(wm.leaveConfirmed))
}
if len(wm.leaveRejected) != 0 {
t.Fatalf("expected no rejections, got %v", toHex(wm.leaveRejected))
}
}
// DecideLeaves should reject a leave when the shard has improved (others left,
// Ring dropped).
func TestDecideLeaves_RejectWhenShardImproved(t *testing.T) {
wm := &mockWorkerManager{}
m := newTestManager(t, RewardGreedy, wm)
// Now the leaving shard is the best (same as the alternative)
shards := []ShardDescriptor{
{Filter: mustDecodeHex(t, "aa"), Size: 100_000, Ring: 0, Shards: 1}, // leaving this (now good)
{Filter: mustDecodeHex(t, "bb"), Size: 100_000, Ring: 0, Shards: 1}, // alternative
}
pendingLeaves := [][]byte{mustDecodeHex(t, "aa")}
err := m.DecideLeaves(100, shards, pendingLeaves, big.NewInt(200000))
if err != nil {
t.Fatalf("DecideLeaves error: %v", err)
}
if len(wm.leaveRejected) != 1 || hex.EncodeToString(wm.leaveRejected[0]) != "aa" {
t.Fatalf("expected reject aa (shard improved), got reject=%v confirm=%v",
toHex(wm.leaveRejected), toHex(wm.leaveConfirmed))
}
if len(wm.leaveConfirmed) != 0 {
t.Fatalf("expected no confirmations, got %v", toHex(wm.leaveConfirmed))
}
}
// DecideLeaves should confirm a leave when the shard disappeared from the view.
func TestDecideLeaves_ConfirmWhenShardDisappeared(t *testing.T) {
wm := &mockWorkerManager{}
m := newTestManager(t, RewardGreedy, wm)
// Only other shards exist — the leaving shard is gone
shards := []ShardDescriptor{
{Filter: mustDecodeHex(t, "bb"), Size: 100_000, Ring: 0, Shards: 1},
}
pendingLeaves := [][]byte{mustDecodeHex(t, "aa")}
err := m.DecideLeaves(100, shards, pendingLeaves, big.NewInt(100000))
if err != nil {
t.Fatalf("DecideLeaves error: %v", err)
}
if len(wm.leaveConfirmed) != 1 || hex.EncodeToString(wm.leaveConfirmed[0]) != "aa" {
t.Fatalf("expected confirm aa (disappeared), got reject=%v confirm=%v",
toHex(wm.leaveRejected), toHex(wm.leaveConfirmed))
}
}
// DecideLeaves should confirm all when no shards exist at all.
func TestDecideLeaves_ConfirmAllWhenNoShards(t *testing.T) {
wm := &mockWorkerManager{}
m := newTestManager(t, RewardGreedy, wm)
pendingLeaves := [][]byte{mustDecodeHex(t, "aa"), mustDecodeHex(t, "bb")}
err := m.DecideLeaves(100, nil, pendingLeaves, big.NewInt(100000))
if err != nil {
t.Fatalf("DecideLeaves error: %v", err)
}
if len(wm.leaveConfirmed) != 2 {
t.Fatalf("expected 2 confirmations, got %d", len(wm.leaveConfirmed))
}
}
// DecideLeaves with mixed results: some shards improved, some still bad.
func TestDecideLeaves_MixedDecisions(t *testing.T) {
wm := &mockWorkerManager{}
m := newTestManager(t, RewardGreedy, wm)
// aa: 150k at ring 0 → 75% of best (200k ring 0) → above 67% threshold → reject leave (stay)
// bb: 50k at ring 3 → tiny score → well below 67% threshold → confirm leave
// cc: 200k at ring 0 → best score
shards := []ShardDescriptor{
{Filter: mustDecodeHex(t, "aa"), Size: 150_000, Ring: 0, Shards: 1}, // improved
{Filter: mustDecodeHex(t, "bb"), Size: 50_000, Ring: 3, Shards: 1}, // still bad
{Filter: mustDecodeHex(t, "cc"), Size: 200_000, Ring: 0, Shards: 1}, // best alternative
}
pendingLeaves := [][]byte{mustDecodeHex(t, "aa"), mustDecodeHex(t, "bb")}
err := m.DecideLeaves(100, shards, pendingLeaves, big.NewInt(400000))
if err != nil {
t.Fatalf("DecideLeaves error: %v", err)
}
rejSet := setOf(toHex(wm.leaveRejected))
cfmSet := setOf(toHex(wm.leaveConfirmed))
// aa improved (ring 0, 150k) is 75% of best (200k) → above 67% → reject leave (stay)
if !rejSet["aa"] {
t.Fatalf("expected aa rejected (shard improved), got reject=%v confirm=%v",
toHex(wm.leaveRejected), toHex(wm.leaveConfirmed))
}
// bb still bad (ring 3, 50k) → confirm leave
if !cfmSet["bb"] {
t.Fatalf("expected bb confirmed (still bad), got reject=%v confirm=%v",
toHex(wm.leaveRejected), toHex(wm.leaveConfirmed))
}
}
// DecideLeaves with no pending leaves should be a no-op.
func TestDecideLeaves_NoPending(t *testing.T) {
wm := &mockWorkerManager{}
m := newTestManager(t, RewardGreedy, wm)
err := m.DecideLeaves(100, nil, nil, big.NewInt(100000))
if err != nil {
t.Fatalf("DecideLeaves error: %v", err)
}
if wm.leaveRejected != nil || wm.leaveConfirmed != nil {
t.Fatalf("expected no calls, got reject=%v confirm=%v",
toHex(wm.leaveRejected), toHex(wm.leaveConfirmed))
}
}
// DecideLeaves DataGreedy: size-only scoring means ring doesn't matter.
func TestDecideLeaves_DataGreedy(t *testing.T) {
wm := &mockWorkerManager{}
m := newTestManager(t, DataGreedy, wm)
shards := []ShardDescriptor{
{Filter: mustDecodeHex(t, "aa"), Size: 10_000, Ring: 0, Shards: 1}, // small → still bad
{Filter: mustDecodeHex(t, "bb"), Size: 80_000, Ring: 0, Shards: 1}, // best by size
}
pendingLeaves := [][]byte{mustDecodeHex(t, "aa")}
err := m.DecideLeaves(100, shards, pendingLeaves, big.NewInt(90000))
if err != nil {
t.Fatalf("DecideLeaves error: %v", err)
}
if len(wm.leaveConfirmed) != 1 || hex.EncodeToString(wm.leaveConfirmed[0]) != "aa" {
t.Fatalf("expected confirm aa (small shard), got reject=%v confirm=%v",
toHex(wm.leaveRejected), toHex(wm.leaveConfirmed))
}
}

View File

@ -446,6 +446,13 @@ func (r *ProverRegistry) UpdateProverActivity(
return nil
}
// CurrentFrame implements ProverRegistry
func (r *ProverRegistry) CurrentFrame() uint64 {
r.mu.RLock()
defer r.mu.RUnlock()
return r.currentFrame
}
// PruneOrphanJoins implements ProverRegistry
func (r *ProverRegistry) PruneOrphanJoins(frameNumber uint64) error {
// Pruning is disabled — it was causing tree divergence between nodes

View File

@ -192,7 +192,7 @@ func TestPruneOrphanJoins_Comprehensive(t *testing.T) {
// Create stores with in-memory pebble DB
pebbleDB := store.NewPebbleDB(
logger,
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/prune_orphan"},
&config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/prune_orphan"}},
0,
)
defer pebbleDB.Close()
@ -623,8 +623,8 @@ func TestPruneOrphanJoins_Comprehensive(t *testing.T) {
// Prover should remain because it has Active allocations
mixedActiveJoiningProverAddrs := make([][]byte, 3)
mixedActiveJoiningProverKeys := make([][]byte, 3)
mixedActiveFilters := make([][][]byte, 3) // Active filters that should remain
mixedJoiningFilters := make([][][]byte, 3) // Joining filters that should be pruned
mixedActiveFilters := make([][][]byte, 3) // Active filters that should remain
mixedJoiningFilters := make([][][]byte, 3) // Joining filters that should be pruned
for i := 0; i < 3; i++ {
publicKey := bytes.Repeat([]byte{byte(0x50 + i)}, 585)
mixedActiveJoiningProverKeys[i] = publicKey
@ -1101,7 +1101,7 @@ func TestPruneOrphanJoins_IncompleteState(t *testing.T) {
// Create stores with in-memory pebble DB
pebbleDB := store.NewPebbleDB(
logger,
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/prune_incomplete"},
&config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/prune_incomplete"}},
0,
)
defer pebbleDB.Close()
@ -1409,7 +1409,7 @@ func TestPruneOrphanJoins_OrphanedAllocation(t *testing.T) {
// Create stores with in-memory pebble DB
pebbleDB := store.NewPebbleDB(
logger,
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/prune_orphaned_alloc"},
&config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/prune_orphaned_alloc"}},
0,
)
defer pebbleDB.Close()

View File

@ -18,7 +18,7 @@ import (
func setupTestClockStore(t *testing.T) *store.PebbleClockStore {
logger, _ := zap.NewDevelopment()
tempDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/store"}, 0)
tempDB := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/store"}}, 0)
return store.NewPebbleClockStore(tempDB, logger)
}

View File

@ -488,7 +488,7 @@ func createTestAppConsensusEngine(
mockDifficultyAdjuster := new(mocks.MockDifficultyAdjuster)
mockRewardIssuance := new(mocks.MockRewardIssuance)
mockEventDistributor := new(mocks.MockEventDistributor)
pebbleDB := pstore.NewPebbleDB(logger, config.DB, 0)
pebbleDB := pstore.NewPebbleDB(logger, config, 0)
clockStore := pstore.NewPebbleClockStore(pebbleDB, logger)
inboxStore := pstore.NewPebbleInboxStore(pebbleDB, logger)
shardStore := pstore.NewPebbleShardsStore(pebbleDB, logger)

View File

@ -29,7 +29,7 @@ import (
// setupTest creates a test environment with a HypergraphState
func setupTest(t *testing.T) (*hypergraph.HypergraphState, thypergraph.Hypergraph, tcrypto.VerEncProof, []byte, tcrypto.InclusionProver) {
logger, _ := zap.NewDevelopment()
s := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, 0)
s := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}}, 0)
enc := &mocks.MockVerifiableEncryptor{}
incProver := &mocks.MockInclusionProver{}
vep := &mocks.MockVerEncProof{}
@ -79,7 +79,7 @@ func TestHypergraphState(t *testing.T) {
// Test state operations
t.Run("State Operations", func(t *testing.T) {
logger, _ := zap.NewDevelopment()
s := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, 0)
s := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}}, 0)
enc := &mocks.MockVerifiableEncryptor{}
incProver := &mocks.MockInclusionProver{}
vep := &mocks.MockVerEncProof{}

View File

@ -229,7 +229,7 @@ require (
github.com/polydawn/refmt v0.89.0 // indirect
github.com/prometheus/client_golang v1.22.0
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.64.0 // indirect
github.com/prometheus/common v0.64.0
github.com/prometheus/procfs v0.16.1 // indirect
github.com/quic-go/qpack v0.5.1 // indirect
github.com/quic-go/quic-go v0.54.0 // indirect

View File

@ -185,7 +185,6 @@ github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/subcommands v1.0.1 h1:/eqq+otEXm5vhfBrbREPCSVQbvofip6kIz+mX5TUH7k=
github.com/google/subcommands v1.0.1/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=

View File

@ -94,10 +94,20 @@ var (
false,
"prints peer info",
)
metrics = flag.Bool(
"metrics",
false,
"print prometheus metrics and exit",
)
metricsFilter = flag.String(
"metrics-filter",
"",
"optional metric name filter (substring match)",
)
debug = flag.Bool(
"debug",
false,
"sets log output to debug (verbose)",
debugDefault(),
"sets log output to debug (verbose) (default false or value of QUILIBRIUM_DEBUG env var)",
)
dhtOnly = flag.Bool(
"dht-only",
@ -182,6 +192,22 @@ func signatureCheckDefault() bool {
return true
}
func debugDefault() bool {
envVarValue, envVarExists := os.LookupEnv("QUILIBRIUM_DEBUG")
if envVarExists {
def, err := strconv.ParseBool(envVarValue)
if err == nil {
return def
}
fmt.Println(
"Invalid environment variable QUILIBRIUM_DEBUG, must be 'true' or 'false':",
envVarValue,
)
}
return false
}
// monitorParentProcess watches parent process and stops the worker if parent dies
func monitorParentProcess(
parentProcessId int,
@ -389,6 +415,16 @@ func main() {
return
}
if *metrics {
cfg, err := config.LoadConfig(*configDirectory, "", false)
if err != nil {
logger.Fatal("failed to load config", zap.Error(err))
}
printMetrics(logger, cfg, *metricsFilter)
return
}
if *dangerClearPending {
db := store.NewPebbleDB(logger, nodeConfig, 0)
defer db.Close()
@ -747,6 +783,33 @@ func printNodeInfo(logger *zap.Logger, cfg *config.Config) {
).String())
fmt.Println("Running Workers:", nodeInfo.RunningWorkers)
fmt.Println("Active Workers:", nodeInfo.AllocatedWorkers)
if len(nodeInfo.ShardAllocations) > 0 {
var active, joining, leaving, paused int
var joinFrames []string
for _, a := range nodeInfo.ShardAllocations {
switch a.Status {
case 1: // Joining
joining++
joinFrames = append(joinFrames, fmt.Sprintf("%d", a.JoinFrameNumber))
case 2: // Active
active++
case 3: // Paused
paused++
case 4: // Leaving
leaving++
}
}
fmt.Println("Shard Allocations:")
fmt.Println(" Active: ", active)
if len(joinFrames) > 0 {
fmt.Printf(" Joining: %d (join frames: %s)\n", joining, strings.Join(joinFrames, ", "))
} else {
fmt.Println(" Joining:", joining)
}
fmt.Println(" Leaving:", leaving)
fmt.Println(" Paused: ", paused)
}
}
func printPeerInfo(logger *zap.Logger, cfg *config.Config) {
@ -831,6 +894,33 @@ func printPeerInfo(logger *zap.Logger, cfg *config.Config) {
}
}
func printMetrics(logger *zap.Logger, cfg *config.Config, filter string) {
if cfg.ListenGRPCMultiaddr == "" {
logger.Fatal("gRPC Not Enabled, Please Configure")
}
conn, err := ConnectToNode(logger, cfg)
if err != nil {
logger.Fatal(
"could not connect to node. if it is still booting, please wait.",
zap.Error(err),
)
}
defer conn.Close()
client := protobufs.NewNodeServiceClient(conn)
resp, err := client.GetMetrics(
context.Background(),
&protobufs.GetMetricsRequest{Filter: filter},
)
if err != nil {
logger.Fatal("failed to fetch metrics", zap.Error(err))
}
fmt.Print(string(resp.Metrics))
}
func formatPeerID(raw []byte) string {
if len(raw) == 0 {
return ""

View File

@ -2645,6 +2645,7 @@ func TestMainnetBlossomsubFrameReceptionAndHypersync(t *testing.T) {
ValidateQueueSize: 128,
ValidateWorkers: 4,
PeerOutboundQueueSize: 128,
PeerReconnectCheckInterval: 60 * time.Second,
}
engineConfig := &config.EngineConfig{}

View File

@ -6,6 +6,7 @@ import (
"math/big"
"net/http"
"slices"
"strings"
"sync"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
@ -14,6 +15,8 @@ import (
"github.com/multiformats/go-multiaddr"
mn "github.com/multiformats/go-multiaddr/net"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
@ -212,6 +215,31 @@ func (r *RPCServer) GetNodeInfo(
}
}
var shardAllocations []*protobufs.ShardAllocationInfo
if proverInfo != nil {
currentFrame := r.proverRegistry.CurrentFrame()
for _, alloc := range proverInfo.Allocations {
// Omit expired joins and leaves, matching the proposer's logic
// in event_distributor.go (pendingFilterGraceFrames = 720).
if alloc.Status == consensus.ProverStatusJoining &&
currentFrame > alloc.JoinFrameNumber+720 {
continue
}
if alloc.Status == consensus.ProverStatusLeaving &&
currentFrame > alloc.LeaveFrameNumber+720 {
continue
}
shardAllocations = append(shardAllocations, &protobufs.ShardAllocationInfo{
Filter: alloc.ConfirmationFilter,
Status: uint32(alloc.Status),
JoinFrameNumber: alloc.JoinFrameNumber,
JoinConfirmFrameNumber: alloc.JoinConfirmFrameNumber,
LeaveFrameNumber: alloc.LeaveFrameNumber,
LastActiveFrameNumber: alloc.LastActiveFrameNumber,
})
}
}
return &protobufs.NodeInfoResponse{
PeerId: peer.ID(peerID).String(),
PeerScore: uint64(r.pubSub.GetPeerScore(peerID)),
@ -221,6 +249,7 @@ func (r *RPCServer) GetNodeInfo(
AllocatedWorkers: allocated,
PatchNumber: append([]byte{}, config.GetPatchNumber()),
Reachable: r.pubSub.Reachability().Value,
ShardAllocations: shardAllocations,
}, nil
}
@ -249,6 +278,30 @@ func (r *RPCServer) GetWorkerInfo(
}, nil
}
func (r *RPCServer) GetMetrics(
ctx context.Context,
req *protobufs.GetMetricsRequest,
) (*protobufs.GetMetricsResponse, error) {
families, err := prometheus.DefaultGatherer.Gather()
if err != nil {
r.logger.Warn("partial metrics gather error", zap.Error(err))
}
var buf bytes.Buffer
enc := expfmt.NewEncoder(&buf, expfmt.NewFormat(expfmt.TypeTextPlain))
filter := strings.ToLower(req.Filter)
for _, fam := range families {
if filter != "" && !strings.Contains(strings.ToLower(fam.GetName()), filter) {
continue
}
if err := enc.Encode(fam); err != nil {
return nil, errors.Wrap(err, "encode metrics")
}
}
return &protobufs.GetMetricsResponse{Metrics: buf.Bytes()}, nil
}
// Send implements protobufs.NodeServiceServer.
func (r *RPCServer) Send(
ctx context.Context,

View File

@ -17,7 +17,7 @@ import (
func setupTestClockStore(t *testing.T) *PebbleClockStore {
logger, _ := zap.NewDevelopment()
tempDB := NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/store"}, 0)
tempDB := NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/store"}}, 0)
return NewPebbleClockStore(tempDB, logger)
}

View File

@ -16,7 +16,7 @@ import (
func setupTestInboxStore(t *testing.T) *PebbleInboxStore {
logger, _ := zap.NewDevelopment()
tempDB := NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/store"}, 0)
tempDB := NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/store"}}, 0)
return NewPebbleInboxStore(tempDB, logger)
}
@ -554,7 +554,7 @@ func TestCRDTConcurrentScenarios(t *testing.T) {
func BenchmarkCRDTOperations(b *testing.B) {
logger, _ := zap.NewDevelopment()
tempDB := NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/store"}, 0)
tempDB := NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/store"}}, 0)
store := NewPebbleInboxStore(tempDB, logger)
address := bytes.Repeat([]byte{0xAA}, 32)

View File

@ -13,14 +13,14 @@ import (
func setupTestHypergraphStore(t *testing.T) *PebbleHypergraphStore {
logger := zap.NewNop()
cfg := &config.DBConfig{
cfg := &config.Config{DB: &config.DBConfig{
InMemoryDONOTUSE: true,
Path: ".test/hypergraph",
}
}}
db := NewPebbleDB(logger, cfg, 0)
require.NotNil(t, db)
t.Cleanup(func() { db.Close() })
return NewPebbleHypergraphStore(cfg, db, logger, nil, nil)
return NewPebbleHypergraphStore(cfg.DB, db, logger, nil, nil)
}
func TestGetRootCommits_IncludesAllCommitTypes(t *testing.T) {

View File

@ -16,7 +16,7 @@ import (
func setupTestKeyStore(t *testing.T) *PebbleKeyStore {
logger, _ := zap.NewDevelopment()
tempDB := NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/store"}, 0)
tempDB := NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/store"}}, 0)
return NewPebbleKeyStore(tempDB, logger)
}

View File

@ -97,6 +97,7 @@ var pebbleMigrations = []func(*pebble.Batch, *pebble.DB, *config.Config) error{
migration_2_1_0_1821,
migration_2_1_0_1822,
migration_2_1_0_1823,
migration_2_1_0_1824,
}
func NewPebbleDB(
@ -1140,6 +1141,234 @@ func migration_2_1_0_1823(b *pebble.Batch, db *pebble.DB, cfg *config.Config) er
return doMigration1818(db, cfg)
}
// migration_2_1_0_1824 rebuilds both vertex adds and hyperedge adds trees for
// the global prover shard to fix divergence from the materialize/commit race.
func migration_2_1_0_1824(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return doMigration1824(db, cfg)
}
// doMigration1824 rebuilds the global prover shard's vertex adds and hyperedge
// adds trees by syncing to an in-memory instance and back. Unlike doMigration1818
// which only checked vertex adds, this migration ensures both trees are rebuilt.
func doMigration1824(db *pebble.DB, cfg *config.Config) error {
logger := zap.L()
// Global prover shard key: L1={0,0,0}, L2=0xff*32
globalShardKey := tries.ShardKey{
L1: [3]byte{},
L2: [32]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
},
}
prover := bls48581.NewKZGInclusionProver(logger)
// Create hypergraph from actual DB
actualDBWrapper := &PebbleDB{db: db}
actualStore := NewPebbleHypergraphStore(cfg.DB, actualDBWrapper, logger, nil, prover)
actualHG, err := actualStore.LoadHypergraph(nil, 0)
if err != nil {
return errors.Wrap(err, "load actual hypergraph")
}
actualHGCRDT := actualHG.(*hgcrdt.HypergraphCRDT)
// Create in-memory pebble DB directly (bypassing NewPebbleDB to avoid cycle)
memOpts := &pebble.Options{
MemTableSize: 64 << 20,
FormatMajorVersion: pebble.FormatNewest,
FS: vfs.NewMem(),
}
memDB, err := pebble.Open("", memOpts)
if err != nil {
return errors.Wrap(err, "open in-memory pebble")
}
defer memDB.Close()
memDBWrapper := &PebbleDB{db: memDB}
memStore := NewPebbleHypergraphStore(cfg.DB, memDBWrapper, logger, nil, prover)
memHG, err := memStore.LoadHypergraph(nil, 0)
if err != nil {
return errors.Wrap(err, "load in-memory hypergraph")
}
memHGCRDT := memHG.(*hgcrdt.HypergraphCRDT)
// Phase 1: Sync from actual DB to in-memory
// Check both vertex adds and hyperedge adds roots
actualVertexRoot := actualHGCRDT.GetVertexAddsSet(globalShardKey).GetTree().Commit(nil, false)
actualHyperedgeRoot := actualHGCRDT.GetHyperedgeAddsSet(globalShardKey).GetTree().Commit(nil, false)
if actualVertexRoot == nil && actualHyperedgeRoot == nil {
logger.Info("migration 1824: no data in global prover shard, skipping")
return nil
}
// Use whichever root is available for the snapshot
snapshotRoot := actualVertexRoot
if snapshotRoot == nil {
snapshotRoot = actualHyperedgeRoot
}
actualHGCRDT.PublishSnapshot(snapshotRoot)
// Set up gRPC server backed by actual hypergraph
const bufSize = 1 << 20
actualLis := bufconn.Listen(bufSize)
actualGRPCServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024),
grpc.MaxSendMsgSize(100*1024*1024),
)
protobufs.RegisterHypergraphComparisonServiceServer(actualGRPCServer, actualHGCRDT)
go func() { _ = actualGRPCServer.Serve(actualLis) }()
defer actualGRPCServer.Stop()
// Create client connection to actual hypergraph server
actualDialer := func(context.Context, string) (net.Conn, error) {
return actualLis.Dial()
}
actualConn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(actualDialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024),
grpc.MaxCallSendMsgSize(100*1024*1024),
),
)
if err != nil {
return errors.Wrap(err, "dial actual hypergraph")
}
defer actualConn.Close()
actualClient := protobufs.NewHypergraphComparisonServiceClient(actualConn)
// Sync from actual to in-memory for vertex adds and hyperedge adds
phases := []protobufs.HypergraphPhaseSet{
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS,
}
for _, phase := range phases {
stream, err := actualClient.PerformSync(context.Background())
if err != nil {
return errors.Wrapf(err, "create sync stream for phase %v", phase)
}
_, err = memHGCRDT.SyncFrom(stream, globalShardKey, phase, nil)
if err != nil {
logger.Warn("sync from actual to memory failed", zap.Error(err), zap.Any("phase", phase))
}
_ = stream.CloseSend()
}
// Commit in-memory to get roots
memVertexRoot := memHGCRDT.GetVertexAddsSet(globalShardKey).GetTree().Commit(nil, false)
memHyperedgeRoot := memHGCRDT.GetHyperedgeAddsSet(globalShardKey).GetTree().Commit(nil, false)
logger.Info("migration 1824: synced to in-memory",
zap.String("actual_vertex_root", hex.EncodeToString(actualVertexRoot)),
zap.String("mem_vertex_root", hex.EncodeToString(memVertexRoot)),
zap.String("actual_hyperedge_root", hex.EncodeToString(actualHyperedgeRoot)),
zap.String("mem_hyperedge_root", hex.EncodeToString(memHyperedgeRoot)),
)
// Stop the actual server before wiping data
actualGRPCServer.Stop()
actualConn.Close()
// Phase 2: Wipe tree data for global prover shard from actual DB
// Only wipe vertex adds and hyperedge adds (not removes)
treePrefixes := []byte{
VERTEX_ADDS_TREE_NODE,
VERTEX_ADDS_TREE_NODE_BY_PATH,
VERTEX_ADDS_CHANGE_RECORD,
VERTEX_ADDS_TREE_ROOT,
HYPEREDGE_ADDS_TREE_NODE,
HYPEREDGE_ADDS_TREE_NODE_BY_PATH,
HYPEREDGE_ADDS_CHANGE_RECORD,
HYPEREDGE_ADDS_TREE_ROOT,
}
for _, prefix := range treePrefixes {
start, end := shardRangeBounds(prefix, globalShardKey)
if err := db.DeleteRange(start, end, &pebble.WriteOptions{Sync: true}); err != nil {
return errors.Wrapf(err, "delete range for prefix 0x%02x", prefix)
}
}
logger.Info("migration 1824: wiped vertex adds and hyperedge adds tree data from actual DB")
// Reload actual hypergraph after wipe
actualStore2 := NewPebbleHypergraphStore(cfg.DB, actualDBWrapper, logger, nil, prover)
actualHG2, err := actualStore2.LoadHypergraph(nil, 0)
if err != nil {
return errors.Wrap(err, "reload actual hypergraph after wipe")
}
actualHGCRDT2 := actualHG2.(*hgcrdt.HypergraphCRDT)
// Phase 3: Sync from in-memory back to actual DB
memSnapshotRoot := memVertexRoot
if memSnapshotRoot == nil {
memSnapshotRoot = memHyperedgeRoot
}
memHGCRDT.PublishSnapshot(memSnapshotRoot)
// Set up gRPC server backed by in-memory hypergraph
memLis := bufconn.Listen(bufSize)
memGRPCServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024),
grpc.MaxSendMsgSize(100*1024*1024),
)
protobufs.RegisterHypergraphComparisonServiceServer(memGRPCServer, memHGCRDT)
go func() { _ = memGRPCServer.Serve(memLis) }()
defer memGRPCServer.Stop()
// Create client connection to in-memory hypergraph server
memDialer := func(context.Context, string) (net.Conn, error) {
return memLis.Dial()
}
memConn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(memDialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024),
grpc.MaxCallSendMsgSize(100*1024*1024),
),
)
if err != nil {
return errors.Wrap(err, "dial in-memory hypergraph")
}
defer memConn.Close()
memClient := protobufs.NewHypergraphComparisonServiceClient(memConn)
// Sync from in-memory to actual for vertex adds and hyperedge adds
for _, phase := range phases {
stream, err := memClient.PerformSync(context.Background())
if err != nil {
return errors.Wrapf(err, "create sync stream for phase %v (reverse)", phase)
}
_, err = actualHGCRDT2.SyncFrom(stream, globalShardKey, phase, nil)
if err != nil {
logger.Warn("sync from memory to actual failed", zap.Error(err), zap.Any("phase", phase))
}
_ = stream.CloseSend()
}
// Final commit
finalVertexRoot := actualHGCRDT2.GetVertexAddsSet(globalShardKey).GetTree().Commit(nil, true)
finalHyperedgeRoot := actualHGCRDT2.GetHyperedgeAddsSet(globalShardKey).GetTree().Commit(nil, true)
logger.Info("migration 1824: completed",
zap.String("final_vertex_root", hex.EncodeToString(finalVertexRoot)),
zap.String("final_hyperedge_root", hex.EncodeToString(finalHyperedgeRoot)),
)
return nil
}
// pebbleBatchDB wraps a *pebble.Batch to implement store.KVDB for use in migrations
type pebbleBatchDB struct {
b *pebble.Batch

View File

@ -30,9 +30,9 @@ func TestNewPebbleDB_ExistingDirectory(t *testing.T) {
core, logs := observer.New(zap.InfoLevel)
testLogger := zap.New(core)
cfg := &config.DBConfig{
cfg := &config.Config{DB: &config.DBConfig{
Path: testDir,
}
}}
db := NewPebbleDB(testLogger, cfg, 0)
require.NotNil(t, db)
@ -57,9 +57,9 @@ func TestNewPebbleDB_ExistingDirectoryWorker(t *testing.T) {
core, logs := observer.New(zap.InfoLevel)
testLogger := zap.New(core)
cfg := &config.DBConfig{
cfg := &config.Config{DB: &config.DBConfig{
WorkerPaths: []string{testDir},
}
}}
db := NewPebbleDB(testLogger, cfg, 1)
require.NotNil(t, db)
@ -87,9 +87,9 @@ func TestNewPebbleDB_NonExistingDirectory(t *testing.T) {
core, logs := observer.New(zap.WarnLevel)
testLogger := zap.New(core)
cfg := &config.DBConfig{
cfg := &config.Config{DB: &config.DBConfig{
Path: testDir,
}
}}
db := NewPebbleDB(testLogger, cfg, 0)
require.NotNil(t, db)
@ -119,9 +119,9 @@ func TestNewPebbleDB_NonExistingDirectoryWorker(t *testing.T) {
core, logs := observer.New(zap.WarnLevel)
testLogger := zap.New(core)
cfg := &config.DBConfig{
cfg := &config.Config{DB: &config.DBConfig{
WorkerPaths: []string{testDir},
}
}}
db := NewPebbleDB(testLogger, cfg, 1)
require.NotNil(t, db)
@ -151,9 +151,9 @@ func TestNewPebbleDB_WorkerPathPrefix(t *testing.T) {
testLogger := zap.New(core)
pathFormat := filepath.Join(baseDir, "worker-%d")
cfg := &config.DBConfig{
cfg := &config.Config{DB: &config.DBConfig{
WorkerPathPrefix: pathFormat,
}
}}
db := NewPebbleDB(testLogger, cfg, 2)
require.NotNil(t, db)

View File

@ -104,7 +104,7 @@ func TestConvergence(t *testing.T) {
var store0 store.KVDB
for i := 0; i < numParties; i++ {
logger, _ := zap.NewDevelopment()
s := pebblestore.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, 0)
s := pebblestore.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}}, 0)
if i == 0 {
store0 = s
}

View File

@ -0,0 +1,206 @@
package tests
import (
"bytes"
"crypto/rand"
"math/big"
"runtime"
"sync"
"testing"
"github.com/stretchr/testify/mock"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/config"
hg "source.quilibrium.com/quilibrium/monorepo/hypergraph"
"source.quilibrium.com/quilibrium/monorepo/node/store"
"source.quilibrium.com/quilibrium/monorepo/types/mocks"
"source.quilibrium.com/quilibrium/monorepo/types/tries"
)
type vertexSpec struct {
appAddr [32]byte
dataAddr [32]byte
commit []byte
size *big.Int
}
// TestConcurrentAddVertexAndCommitRace verifies that serializing
// AddVertex batches and Commit calls with a mutex (the commitBarrier
// pattern) prevents partial-state tree roots.
//
// The test applies the same serialization that GlobalConsensusEngine
// uses: a mutex held across the entire AddVertex loop and around each
// Commit call. With this barrier, Commit can never observe a partially
// modified tree.
func TestConcurrentAddVertexAndCommitRace(t *testing.T) {
const (
iterations = 100
verticesPerRun = 50
// Multiple concurrent commit goroutines to increase contention.
commitGoroutines = 4
)
// Ensure true parallelism.
prevProcs := runtime.GOMAXPROCS(runtime.NumCPU())
defer runtime.GOMAXPROCS(prevProcs)
logger, _ := zap.NewDevelopment()
for iter := 0; iter < iterations; iter++ {
prover := &mocks.MockInclusionProver{}
mockCommit := make([]byte, 74)
mockCommit[0] = 0x02
rand.Read(mockCommit[1:])
prover.On("CommitRaw", mock.Anything, mock.Anything).Return(mockCommit, nil)
enc := &mocks.MockVerifiableEncryptor{}
dbCfg := &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}
s := store.NewPebbleDB(logger, &config.Config{DB: dbCfg}, 0)
hgStore := store.NewPebbleHypergraphStore(
dbCfg, s, logger, enc, prover,
)
hgcrdt := hg.NewHypergraph(
logger,
hgStore,
prover,
[]int{},
&Nopthenticator{},
200,
)
// All vertices share the same appAddress so they land in the same shard,
// maximizing tree contention.
appAddr := [32]byte{0x10}
// Commit baseline (frame 1) with no vertices — this is the
// "before" state that a commit goroutine may validly capture
// if it acquires the barrier before the AddVertex goroutine.
baselineCommits, err := hgcrdt.Commit(1)
if err != nil {
t.Fatalf("iter %d: baseline commit failed: %v", iter, err)
}
// Prepare vertices to add.
specs := make([]vertexSpec, verticesPerRun)
for i := 0; i < verticesPerRun; i++ {
dataAddr := [32]byte{byte(i + 1), byte(iter), byte(i >> 8)}
specs[i] = vertexSpec{
appAddr: appAddr,
dataAddr: dataAddr,
commit: mockCommit,
size: big.NewInt(55),
}
}
// commitBarrier mirrors the mutex in GlobalConsensusEngine that
// serializes materialize (AddVertex loop) with
// rebuildShardCommitments (Commit).
var commitBarrier sync.Mutex
var wg sync.WaitGroup
start := make(chan struct{})
type commitResult struct {
commits map[tries.ShardKey][][]byte
err error
}
results := make([]commitResult, commitGoroutines)
wg.Add(1 + commitGoroutines)
// Goroutine A: add vertices one at a time (like materialize does).
// Holds the barrier across the entire batch.
go func() {
defer wg.Done()
<-start
commitBarrier.Lock()
defer commitBarrier.Unlock()
for _, vs := range specs {
v := hg.NewVertex(vs.appAddr, vs.dataAddr, vs.commit, vs.size)
if addErr := hgcrdt.AddVertex(nil, v); addErr != nil {
t.Errorf("iter %d: AddVertex failed: %v", iter, addErr)
return
}
// Yield between additions — without the barrier this would
// allow Commit to interleave and capture partial state.
runtime.Gosched()
}
}()
// Goroutines B: commit the tree concurrently with vertex additions.
// Each acquires the barrier around Commit, so it waits for any
// in-progress AddVertex batch to finish.
for g := 0; g < commitGoroutines; g++ {
g := g
go func() {
defer wg.Done()
<-start
// Stagger start to hit different points in the AddVertex sequence.
for y := 0; y < g*3; y++ {
runtime.Gosched()
}
commitBarrier.Lock()
results[g].commits, results[g].err = hgcrdt.Commit(
uint64(iter*10+g+2),
)
commitBarrier.Unlock()
}()
}
close(start)
wg.Wait()
for g, r := range results {
if r.err != nil {
t.Fatalf("iter %d goroutine %d: commit failed: %v", iter, g, r.err)
}
}
// Final commit with ALL vertices present — the canonical state.
expectedCommits, err := hgcrdt.Commit(uint64(iter*10 + commitGoroutines + 2))
if err != nil {
t.Fatalf("iter %d: expected commit failed: %v", iter, err)
}
// With the commitBarrier, each concurrent commit must reflect a
// consistent state: either the baseline (0 vertices, committed
// before AddVertex batch) or the final state (all vertices,
// committed after). Any other result means Commit() interleaved
// with AddVertex calls and captured partial state.
for g, r := range results {
matchesBaseline := commitMapsEqual(r.commits, baselineCommits)
matchesFinal := commitMapsEqual(r.commits, expectedCommits)
if !matchesBaseline && !matchesFinal {
t.Fatalf(
"iter %d goroutine %d: commit captured partial state "+
"(tree root matches neither baseline nor final — "+
"divergence detected)",
iter, g,
)
}
}
}
}
// commitMapsEqual compares two commit maps for byte-level equality.
func commitMapsEqual(a, b map[tries.ShardKey][][]byte) bool {
if len(a) != len(b) {
return false
}
for k, aPhases := range a {
bPhases, ok := b[k]
if !ok {
return false
}
if len(aPhases) != len(bPhases) {
return false
}
for i := range aPhases {
if !bytes.Equal(aPhases[i], bPhases[i]) {
return false
}
}
}
return true
}

View File

@ -58,7 +58,7 @@ func TestHypergraph(t *testing.T) {
// Test vertex operations
t.Run("Vertex Operations", func(t *testing.T) {
logger, _ := zap.NewDevelopment()
s := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, 0)
s := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}}, 0)
enc := &mocks.MockVerifiableEncryptor{}
prover := &mocks.MockInclusionProver{}
vep := &mocks.MockVerEncProof{}
@ -124,7 +124,7 @@ func TestHypergraph(t *testing.T) {
// Test hyperedge operations
t.Run("Hyperedge Operations", func(t *testing.T) {
logger, _ := zap.NewDevelopment()
s := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, 0)
s := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}}, 0)
enc := &mocks.MockVerifiableEncryptor{}
prover := &mocks.MockInclusionProver{}
vep := &mocks.MockVerEncProof{}
@ -190,7 +190,7 @@ func TestHypergraph(t *testing.T) {
// Test "within" relationship
t.Run("Within Relationship", func(t *testing.T) {
logger, _ := zap.NewDevelopment()
s := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, 0)
s := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}}, 0)
enc := &mocks.MockVerifiableEncryptor{}
prover := &mocks.MockInclusionProver{}
vep := &mocks.MockVerEncProof{}
@ -250,7 +250,7 @@ func TestHypergraph(t *testing.T) {
// Test nested hyperedges
t.Run("Nested Hyperedges", func(t *testing.T) {
logger, _ := zap.NewDevelopment()
s := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, 0)
s := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}}, 0)
enc := &mocks.MockVerifiableEncryptor{}
prover := &mocks.MockInclusionProver{}
vep := &mocks.MockVerEncProof{}
@ -307,7 +307,7 @@ func TestHypergraph(t *testing.T) {
// Test error cases
t.Run("Error Cases", func(t *testing.T) {
logger, _ := zap.NewDevelopment()
s := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, 0)
s := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}}, 0)
enc := &mocks.MockVerifiableEncryptor{}
prover := &mocks.MockInclusionProver{}
vep := &mocks.MockVerEncProof{}
@ -353,7 +353,7 @@ func TestHypergraph(t *testing.T) {
// Test sharding
t.Run("Sharding", func(t *testing.T) {
logger, _ := zap.NewDevelopment()
s := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}, 0)
s := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest/store"}}, 0)
enc := &mocks.MockVerifiableEncryptor{}
prover := &mocks.MockInclusionProver{}
vep := &mocks.MockVerEncProof{}

View File

@ -44,6 +44,13 @@ type WorkerManager struct {
reject [][]byte,
confirm [][]byte,
) error
proposeLeaveFunc func(
filters [][]byte,
) error
decideLeaveFunc func(
reject [][]byte,
confirm [][]byte,
) error
// When automatic, hold reference to the workers
dataWorkers []*exec.Cmd
@ -72,6 +79,13 @@ func NewWorkerManager(
reject [][]byte,
confirm [][]byte,
) error,
proposeLeaveFunc func(
filters [][]byte,
) error,
decideLeaveFunc func(
reject [][]byte,
confirm [][]byte,
) error,
) typesWorker.WorkerManager {
return &WorkerManager{
store: store,
@ -83,6 +97,8 @@ func NewWorkerManager(
config: config,
proposeFunc: proposeFunc,
decideFunc: decideFunc,
proposeLeaveFunc: proposeLeaveFunc,
decideLeaveFunc: decideLeaveFunc,
}
}
@ -706,6 +722,21 @@ func (w *WorkerManager) DecideAllocations(
return w.decideFunc(reject, confirm)
}
// ProposeLeave invokes a leave proposal function set by the parent of the
// manager.
func (w *WorkerManager) ProposeLeave(filters [][]byte) error {
return w.proposeLeaveFunc(filters)
}
// DecideLeave invokes a leave deciding function set by the parent of the
// manager.
func (w *WorkerManager) DecideLeave(
reject [][]byte,
confirm [][]byte,
) error {
return w.decideLeaveFunc(reject, confirm)
}
// loadWorkersFromStore loads all workers from persistent storage into memory
func (w *WorkerManager) loadWorkersFromStore() error {
workers, err := w.store.RangeWorkers()

View File

@ -160,7 +160,7 @@ func (t *mockTransaction) Abort() error {
func TestWorkerManager_StartStop(t *testing.T) {
logger, _ := zap.NewDevelopment()
store := newMockWorkerStore()
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil }, func(filters [][]byte) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
// Test starting the manager
ctx := context.Background()
@ -185,7 +185,7 @@ func TestWorkerManager_StartStop(t *testing.T) {
func TestWorkerManager_RegisterWorker(t *testing.T) {
logger, _ := zap.NewDevelopment()
store := newMockWorkerStore()
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil }, func(filters [][]byte) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
// Start the manager
ctx := context.Background()
@ -217,7 +217,7 @@ func TestWorkerManager_RegisterWorker(t *testing.T) {
func TestWorkerManager_RegisterWorkerNotStarted(t *testing.T) {
logger, _ := zap.NewDevelopment()
store := newMockWorkerStore()
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil }, func(filters [][]byte) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
// Try to register without starting
workerInfo := &typesStore.WorkerInfo{
@ -246,7 +246,7 @@ func TestWorkerManager_AllocateDeallocateWorker(t *testing.T) {
require.NoError(t, err)
k, _ := priv.Raw()
p2pcfg.PeerPrivKey = hex.EncodeToString(k)
manager := NewWorkerManager(store, logger, &config.Config{Engine: &engcfg, P2P: &p2pcfg}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
manager := NewWorkerManager(store, logger, &config.Config{Engine: &engcfg, P2P: &p2pcfg}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil }, func(filters [][]byte) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
auth := p2p.NewPeerAuthenticator(
zap.L(),
&p2pcfg,
@ -353,7 +353,7 @@ func TestWorkerManager_AllocateDeallocateWorker(t *testing.T) {
func TestWorkerManager_AllocateNonExistentWorker(t *testing.T) {
logger, _ := zap.NewDevelopment()
store := newMockWorkerStore()
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil }, func(filters [][]byte) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
// Start the manager
ctx := context.Background()
@ -370,7 +370,7 @@ func TestWorkerManager_AllocateNonExistentWorker(t *testing.T) {
func TestWorkerManager_GetWorkerIdByFilter(t *testing.T) {
logger, _ := zap.NewDevelopment()
store := newMockWorkerStore()
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil }, func(filters [][]byte) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
// Start the manager
ctx := context.Background()
@ -406,7 +406,7 @@ func TestWorkerManager_GetWorkerIdByFilter(t *testing.T) {
func TestWorkerManager_GetFilterByWorkerId(t *testing.T) {
logger, _ := zap.NewDevelopment()
store := newMockWorkerStore()
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil }, func(filters [][]byte) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
// Start the manager
ctx := context.Background()
@ -467,7 +467,7 @@ func TestWorkerManager_LoadWorkersOnStart(t *testing.T) {
store.workersByFilter[string(worker2.Filter)] = worker2
// Create manager and start it
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{DataWorkerCount: 2}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{DataWorkerCount: 2}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil }, func(filters [][]byte) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
ctx := context.Background()
err := manager.Start(ctx)
require.NoError(t, err)
@ -494,7 +494,7 @@ func TestWorkerManager_LoadWorkersOnStart(t *testing.T) {
func TestWorkerManager_ConcurrentOperations(t *testing.T) {
logger, _ := zap.NewDevelopment()
store := newMockWorkerStore()
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
manager := NewWorkerManager(store, logger, &config.Config{Engine: &config.EngineConfig{}}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil }, func(filters [][]byte) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
// Start the manager
ctx := context.Background()
@ -544,7 +544,7 @@ func TestWorkerManager_EmptyFilter(t *testing.T) {
require.NoError(t, err)
k, _ := priv.Raw()
p2pcfg.PeerPrivKey = hex.EncodeToString(k)
manager := NewWorkerManager(store, logger, &config.Config{Engine: &engcfg, P2P: &p2pcfg}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
manager := NewWorkerManager(store, logger, &config.Config{Engine: &engcfg, P2P: &p2pcfg}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil }, func(filters [][]byte) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
auth := p2p.NewPeerAuthenticator(
zap.L(),
&p2pcfg,
@ -671,7 +671,7 @@ func TestWorkerManager_FilterUpdate(t *testing.T) {
require.NoError(t, err)
k, _ := priv.Raw()
p2pcfg.PeerPrivKey = hex.EncodeToString(k)
manager := NewWorkerManager(store, logger, &config.Config{Engine: &engcfg, P2P: &p2pcfg}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
manager := NewWorkerManager(store, logger, &config.Config{Engine: &engcfg, P2P: &p2pcfg}, func(coreIds []uint, filters [][]byte, serviceClients map[uint]*grpc.ClientConn) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil }, func(filters [][]byte) error { return nil }, func(reject [][]byte, confirm [][]byte) error { return nil })
auth := p2p.NewPeerAuthenticator(
zap.L(),
&p2pcfg,

File diff suppressed because it is too large Load Diff

View File

@ -235,6 +235,31 @@ func local_request_GlobalService_GetWorkerInfo_0(ctx context.Context, marshaler
}
func request_GlobalService_StreamGlobalMessages_0(ctx context.Context, marshaler runtime.Marshaler, client GlobalServiceClient, req *http.Request, pathParams map[string]string) (GlobalService_StreamGlobalMessagesClient, runtime.ServerMetadata, error) {
var protoReq StreamGlobalMessagesRequest
var metadata runtime.ServerMetadata
newReader, berr := utilities.IOReaderFactory(req.Body)
if berr != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr)
}
if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
stream, err := client.StreamGlobalMessages(ctx, &protoReq)
if err != nil {
return nil, metadata, err
}
header, err := stream.Header()
if err != nil {
return nil, metadata, err
}
metadata.HeaderMD = header
return stream, metadata, nil
}
func request_AppShardService_GetAppShardFrame_0(ctx context.Context, marshaler runtime.Marshaler, client AppShardServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq GetAppShardFrameRequest
var metadata runtime.ServerMetadata
@ -1191,6 +1216,13 @@ func RegisterGlobalServiceHandlerServer(ctx context.Context, mux *runtime.ServeM
})
mux.Handle("POST", pattern_GlobalService_StreamGlobalMessages_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
err := status.Error(codes.Unimplemented, "streaming calls are not yet supported in the in-process transport")
_, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
})
return nil
}
@ -1948,6 +1980,28 @@ func RegisterGlobalServiceHandlerClient(ctx context.Context, mux *runtime.ServeM
})
mux.Handle("POST", pattern_GlobalService_StreamGlobalMessages_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/quilibrium.node.global.pb.GlobalService/StreamGlobalMessages", runtime.WithHTTPPathPattern("/quilibrium.node.global.pb.GlobalService/StreamGlobalMessages"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_GlobalService_StreamGlobalMessages_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_GlobalService_StreamGlobalMessages_0(annotatedContext, mux, outboundMarshaler, w, req, func() (proto.Message, error) { return resp.Recv() }, mux.GetForwardResponseOptions()...)
})
return nil
}
@ -1963,6 +2017,8 @@ var (
pattern_GlobalService_GetLockedAddresses_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"quilibrium.node.global.pb.GlobalService", "GetLockedAddresses"}, ""))
pattern_GlobalService_GetWorkerInfo_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"quilibrium.node.global.pb.GlobalService", "GetWorkerInfo"}, ""))
pattern_GlobalService_StreamGlobalMessages_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"quilibrium.node.global.pb.GlobalService", "StreamGlobalMessages"}, ""))
)
var (
@ -1977,6 +2033,8 @@ var (
forward_GlobalService_GetLockedAddresses_0 = runtime.ForwardResponseMessage
forward_GlobalService_GetWorkerInfo_0 = runtime.ForwardResponseMessage
forward_GlobalService_StreamGlobalMessages_0 = runtime.ForwardResponseStream
)
// RegisterAppShardServiceHandlerFromEndpoint is same as RegisterAppShardServiceHandler but

View File

@ -484,6 +484,13 @@ message GlobalGetWorkerInfoResponse {
repeated GlobalGetWorkerInfoResponseItem workers = 1;
}
message StreamGlobalMessagesRequest {}
message StreamGlobalMessagesResponse {
bytes data = 1;
bytes bitmask = 2;
}
service GlobalService {
rpc GetGlobalFrame (GetGlobalFrameRequest) returns (GlobalFrameResponse);
rpc GetGlobalProposal (GetGlobalProposalRequest) returns (GlobalProposalResponse);
@ -491,6 +498,7 @@ service GlobalService {
rpc GetGlobalShards(GetGlobalShardsRequest) returns (GetGlobalShardsResponse);
rpc GetLockedAddresses(GetLockedAddressesRequest) returns (GetLockedAddressesResponse);
rpc GetWorkerInfo(GlobalGetWorkerInfoRequest) returns (GlobalGetWorkerInfoResponse);
rpc StreamGlobalMessages(StreamGlobalMessagesRequest) returns (stream StreamGlobalMessagesResponse);
}
service AppShardService {

View File

@ -20,12 +20,13 @@ import (
const _ = grpc.SupportPackageIsVersion7
const (
GlobalService_GetGlobalFrame_FullMethodName = "/quilibrium.node.global.pb.GlobalService/GetGlobalFrame"
GlobalService_GetGlobalProposal_FullMethodName = "/quilibrium.node.global.pb.GlobalService/GetGlobalProposal"
GlobalService_GetAppShards_FullMethodName = "/quilibrium.node.global.pb.GlobalService/GetAppShards"
GlobalService_GetGlobalShards_FullMethodName = "/quilibrium.node.global.pb.GlobalService/GetGlobalShards"
GlobalService_GetLockedAddresses_FullMethodName = "/quilibrium.node.global.pb.GlobalService/GetLockedAddresses"
GlobalService_GetWorkerInfo_FullMethodName = "/quilibrium.node.global.pb.GlobalService/GetWorkerInfo"
GlobalService_GetGlobalFrame_FullMethodName = "/quilibrium.node.global.pb.GlobalService/GetGlobalFrame"
GlobalService_GetGlobalProposal_FullMethodName = "/quilibrium.node.global.pb.GlobalService/GetGlobalProposal"
GlobalService_GetAppShards_FullMethodName = "/quilibrium.node.global.pb.GlobalService/GetAppShards"
GlobalService_GetGlobalShards_FullMethodName = "/quilibrium.node.global.pb.GlobalService/GetGlobalShards"
GlobalService_GetLockedAddresses_FullMethodName = "/quilibrium.node.global.pb.GlobalService/GetLockedAddresses"
GlobalService_GetWorkerInfo_FullMethodName = "/quilibrium.node.global.pb.GlobalService/GetWorkerInfo"
GlobalService_StreamGlobalMessages_FullMethodName = "/quilibrium.node.global.pb.GlobalService/StreamGlobalMessages"
)
// GlobalServiceClient is the client API for GlobalService service.
@ -38,6 +39,7 @@ type GlobalServiceClient interface {
GetGlobalShards(ctx context.Context, in *GetGlobalShardsRequest, opts ...grpc.CallOption) (*GetGlobalShardsResponse, error)
GetLockedAddresses(ctx context.Context, in *GetLockedAddressesRequest, opts ...grpc.CallOption) (*GetLockedAddressesResponse, error)
GetWorkerInfo(ctx context.Context, in *GlobalGetWorkerInfoRequest, opts ...grpc.CallOption) (*GlobalGetWorkerInfoResponse, error)
StreamGlobalMessages(ctx context.Context, in *StreamGlobalMessagesRequest, opts ...grpc.CallOption) (GlobalService_StreamGlobalMessagesClient, error)
}
type globalServiceClient struct {
@ -102,6 +104,38 @@ func (c *globalServiceClient) GetWorkerInfo(ctx context.Context, in *GlobalGetWo
return out, nil
}
func (c *globalServiceClient) StreamGlobalMessages(ctx context.Context, in *StreamGlobalMessagesRequest, opts ...grpc.CallOption) (GlobalService_StreamGlobalMessagesClient, error) {
stream, err := c.cc.NewStream(ctx, &GlobalService_ServiceDesc.Streams[0], GlobalService_StreamGlobalMessages_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &globalServiceStreamGlobalMessagesClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type GlobalService_StreamGlobalMessagesClient interface {
Recv() (*StreamGlobalMessagesResponse, error)
grpc.ClientStream
}
type globalServiceStreamGlobalMessagesClient struct {
grpc.ClientStream
}
func (x *globalServiceStreamGlobalMessagesClient) Recv() (*StreamGlobalMessagesResponse, error) {
m := new(StreamGlobalMessagesResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// GlobalServiceServer is the server API for GlobalService service.
// All implementations must embed UnimplementedGlobalServiceServer
// for forward compatibility
@ -112,6 +146,7 @@ type GlobalServiceServer interface {
GetGlobalShards(context.Context, *GetGlobalShardsRequest) (*GetGlobalShardsResponse, error)
GetLockedAddresses(context.Context, *GetLockedAddressesRequest) (*GetLockedAddressesResponse, error)
GetWorkerInfo(context.Context, *GlobalGetWorkerInfoRequest) (*GlobalGetWorkerInfoResponse, error)
StreamGlobalMessages(*StreamGlobalMessagesRequest, GlobalService_StreamGlobalMessagesServer) error
mustEmbedUnimplementedGlobalServiceServer()
}
@ -137,6 +172,9 @@ func (UnimplementedGlobalServiceServer) GetLockedAddresses(context.Context, *Get
func (UnimplementedGlobalServiceServer) GetWorkerInfo(context.Context, *GlobalGetWorkerInfoRequest) (*GlobalGetWorkerInfoResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetWorkerInfo not implemented")
}
func (UnimplementedGlobalServiceServer) StreamGlobalMessages(*StreamGlobalMessagesRequest, GlobalService_StreamGlobalMessagesServer) error {
return status.Errorf(codes.Unimplemented, "method StreamGlobalMessages not implemented")
}
func (UnimplementedGlobalServiceServer) mustEmbedUnimplementedGlobalServiceServer() {}
// UnsafeGlobalServiceServer may be embedded to opt out of forward compatibility for this service.
@ -258,6 +296,27 @@ func _GlobalService_GetWorkerInfo_Handler(srv interface{}, ctx context.Context,
return interceptor(ctx, in, info, handler)
}
func _GlobalService_StreamGlobalMessages_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(StreamGlobalMessagesRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(GlobalServiceServer).StreamGlobalMessages(m, &globalServiceStreamGlobalMessagesServer{stream})
}
type GlobalService_StreamGlobalMessagesServer interface {
Send(*StreamGlobalMessagesResponse) error
grpc.ServerStream
}
type globalServiceStreamGlobalMessagesServer struct {
grpc.ServerStream
}
func (x *globalServiceStreamGlobalMessagesServer) Send(m *StreamGlobalMessagesResponse) error {
return x.ServerStream.SendMsg(m)
}
// GlobalService_ServiceDesc is the grpc.ServiceDesc for GlobalService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@ -290,7 +349,13 @@ var GlobalService_ServiceDesc = grpc.ServiceDesc{
Handler: _GlobalService_GetWorkerInfo_Handler,
},
},
Streams: []grpc.StreamDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "StreamGlobalMessages",
Handler: _GlobalService_StreamGlobalMessages_Handler,
ServerStreams: true,
},
},
Metadata: "global.proto",
}

File diff suppressed because it is too large Load Diff

View File

@ -201,6 +201,40 @@ func local_request_NodeService_GetTokensByAccount_0(ctx context.Context, marshal
}
func request_NodeService_GetMetrics_0(ctx context.Context, marshaler runtime.Marshaler, client NodeServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq GetMetricsRequest
var metadata runtime.ServerMetadata
newReader, berr := utilities.IOReaderFactory(req.Body)
if berr != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr)
}
if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.GetMetrics(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_NodeService_GetMetrics_0(ctx context.Context, marshaler runtime.Marshaler, server NodeServiceServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq GetMetricsRequest
var metadata runtime.ServerMetadata
newReader, berr := utilities.IOReaderFactory(req.Body)
if berr != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr)
}
if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := server.GetMetrics(ctx, &protoReq)
return msg, metadata, err
}
func request_ConnectivityService_TestConnectivity_0(ctx context.Context, marshaler runtime.Marshaler, client ConnectivityServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq ConnectivityTestRequest
var metadata runtime.ServerMetadata
@ -434,6 +468,31 @@ func RegisterNodeServiceHandlerServer(ctx context.Context, mux *runtime.ServeMux
})
mux.Handle("POST", pattern_NodeService_GetMetrics_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/quilibrium.node.node.pb.NodeService/GetMetrics", runtime.WithHTTPPathPattern("/quilibrium.node.node.pb.NodeService/GetMetrics"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_NodeService_GetMetrics_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_NodeService_GetMetrics_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
@ -678,6 +737,28 @@ func RegisterNodeServiceHandlerClient(ctx context.Context, mux *runtime.ServeMux
})
mux.Handle("POST", pattern_NodeService_GetMetrics_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/quilibrium.node.node.pb.NodeService/GetMetrics", runtime.WithHTTPPathPattern("/quilibrium.node.node.pb.NodeService/GetMetrics"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_NodeService_GetMetrics_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_NodeService_GetMetrics_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
@ -691,6 +772,8 @@ var (
pattern_NodeService_Send_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"quilibrium.node.node.pb.NodeService", "Send"}, ""))
pattern_NodeService_GetTokensByAccount_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"quilibrium.node.node.pb.NodeService", "GetTokensByAccount"}, ""))
pattern_NodeService_GetMetrics_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"quilibrium.node.node.pb.NodeService", "GetMetrics"}, ""))
)
var (
@ -703,6 +786,8 @@ var (
forward_NodeService_Send_0 = runtime.ForwardResponseMessage
forward_NodeService_GetTokensByAccount_0 = runtime.ForwardResponseMessage
forward_NodeService_GetMetrics_0 = runtime.ForwardResponseMessage
)
// RegisterConnectivityServiceHandlerFromEndpoint is same as RegisterConnectivityServiceHandler but

View File

@ -51,6 +51,15 @@ message PeerInfoResponse {
repeated PeerInfo peer_info = 1;
}
message ShardAllocationInfo {
bytes filter = 1;
uint32 status = 2;
uint64 join_frame_number = 3;
uint64 join_confirm_frame_number = 4;
uint64 leave_frame_number = 5;
uint64 last_active_frame_number = 6;
}
message NodeInfoResponse {
string peer_id = 1;
uint64 peer_score = 2;
@ -62,6 +71,7 @@ message NodeInfoResponse {
uint64 last_received_frame = 8;
uint64 last_global_head_frame = 9;
bool reachable = 10;
repeated ShardAllocationInfo shard_allocations = 11;
}
message WorkerInfo {
@ -197,6 +207,14 @@ message MaterializedPendingTransaction {
uint64 expiration = 17;
}
message GetMetricsRequest {
string filter = 1;
}
message GetMetricsResponse {
bytes metrics = 1;
}
message GetTokensByAccountRequest {
bytes address = 1;
bytes domain = 2;
@ -215,6 +233,7 @@ service NodeService {
rpc Send(SendRequest) returns (SendResponse);
rpc GetTokensByAccount(GetTokensByAccountRequest)
returns (GetTokensByAccountResponse);
rpc GetMetrics(GetMetricsRequest) returns (GetMetricsResponse);
}
message ConnectivityTestRequest {

View File

@ -24,6 +24,7 @@ const (
NodeService_GetWorkerInfo_FullMethodName = "/quilibrium.node.node.pb.NodeService/GetWorkerInfo"
NodeService_Send_FullMethodName = "/quilibrium.node.node.pb.NodeService/Send"
NodeService_GetTokensByAccount_FullMethodName = "/quilibrium.node.node.pb.NodeService/GetTokensByAccount"
NodeService_GetMetrics_FullMethodName = "/quilibrium.node.node.pb.NodeService/GetMetrics"
)
// NodeServiceClient is the client API for NodeService service.
@ -35,6 +36,7 @@ type NodeServiceClient interface {
GetWorkerInfo(ctx context.Context, in *GetWorkerInfoRequest, opts ...grpc.CallOption) (*WorkerInfoResponse, error)
Send(ctx context.Context, in *SendRequest, opts ...grpc.CallOption) (*SendResponse, error)
GetTokensByAccount(ctx context.Context, in *GetTokensByAccountRequest, opts ...grpc.CallOption) (*GetTokensByAccountResponse, error)
GetMetrics(ctx context.Context, in *GetMetricsRequest, opts ...grpc.CallOption) (*GetMetricsResponse, error)
}
type nodeServiceClient struct {
@ -90,6 +92,15 @@ func (c *nodeServiceClient) GetTokensByAccount(ctx context.Context, in *GetToken
return out, nil
}
func (c *nodeServiceClient) GetMetrics(ctx context.Context, in *GetMetricsRequest, opts ...grpc.CallOption) (*GetMetricsResponse, error) {
out := new(GetMetricsResponse)
err := c.cc.Invoke(ctx, NodeService_GetMetrics_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// NodeServiceServer is the server API for NodeService service.
// All implementations must embed UnimplementedNodeServiceServer
// for forward compatibility
@ -99,6 +110,7 @@ type NodeServiceServer interface {
GetWorkerInfo(context.Context, *GetWorkerInfoRequest) (*WorkerInfoResponse, error)
Send(context.Context, *SendRequest) (*SendResponse, error)
GetTokensByAccount(context.Context, *GetTokensByAccountRequest) (*GetTokensByAccountResponse, error)
GetMetrics(context.Context, *GetMetricsRequest) (*GetMetricsResponse, error)
mustEmbedUnimplementedNodeServiceServer()
}
@ -121,6 +133,9 @@ func (UnimplementedNodeServiceServer) Send(context.Context, *SendRequest) (*Send
func (UnimplementedNodeServiceServer) GetTokensByAccount(context.Context, *GetTokensByAccountRequest) (*GetTokensByAccountResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetTokensByAccount not implemented")
}
func (UnimplementedNodeServiceServer) GetMetrics(context.Context, *GetMetricsRequest) (*GetMetricsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetMetrics not implemented")
}
func (UnimplementedNodeServiceServer) mustEmbedUnimplementedNodeServiceServer() {}
// UnsafeNodeServiceServer may be embedded to opt out of forward compatibility for this service.
@ -224,6 +239,24 @@ func _NodeService_GetTokensByAccount_Handler(srv interface{}, ctx context.Contex
return interceptor(ctx, in, info, handler)
}
func _NodeService_GetMetrics_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetMetricsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(NodeServiceServer).GetMetrics(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: NodeService_GetMetrics_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(NodeServiceServer).GetMetrics(ctx, req.(*GetMetricsRequest))
}
return interceptor(ctx, in, info, handler)
}
// NodeService_ServiceDesc is the grpc.ServiceDesc for NodeService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@ -251,6 +284,10 @@ var NodeService_ServiceDesc = grpc.ServiceDesc{
MethodName: "GetTokensByAccount",
Handler: _NodeService_GetTokensByAccount_Handler,
},
{
MethodName: "GetMetrics",
Handler: _NodeService_GetMetrics_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "node.proto",

View File

@ -135,4 +135,7 @@ type ProverRegistry interface {
// PruneOrphanJoins performs pruning of vertexes in the prover trie for
// expired joins.
PruneOrphanJoins(frameNumber uint64) error
// CurrentFrame returns the last frame number processed by the registry.
CurrentFrame() uint64
}

View File

@ -12,6 +12,12 @@ type MockProverRegistry struct {
var _ consensus.ProverRegistry = (*MockProverRegistry)(nil)
// CurrentFrame implements consensus.ProverRegistry.
func (m *MockProverRegistry) CurrentFrame() uint64 {
args := m.Called()
return args.Get(0).(uint64)
}
// PruneOrphanJoins implements consensus.ProverRegistry.
func (m *MockProverRegistry) PruneOrphanJoins(frameNumber uint64) error {
args := m.Called(frameNumber)

View File

@ -122,7 +122,10 @@ func (n *LazyVectorCommitmentBranchNode) Commit(
}
workers := runtime.WorkerCount(0, false, false)
throttle := make(chan struct{}, workers)
var throttle chan struct{}
if workers > 1 {
throttle = make(chan struct{}, workers)
}
commitment, err := commitNode(
inclusionProver,
@ -159,72 +162,11 @@ func commitNode(
}
vector := make([][]byte, len(node.Children))
var wg sync.WaitGroup
var mu sync.Mutex
var firstErr error
for i, child := range node.Children {
childPath := slices.Concat(node.FullPrefix, []int{i})
wg.Add(1)
select {
case throttle <- struct{}{}:
go func(i int, child LazyVectorCommitmentNode, childPath []int) {
defer wg.Done()
defer func() { <-throttle }()
if child == nil {
var err error
child, err = node.Store.GetNodeByPath(
setType,
phaseType,
shardKey,
childPath,
)
if err != nil && !strings.Contains(err.Error(), "item not found") {
mu.Lock()
if firstErr == nil {
firstErr = errors.Wrap(err, "failed to get node by path")
}
mu.Unlock()
return
}
}
if child != nil {
commit, err := commitNode(
inclusionProver,
child,
txn,
setType,
phaseType,
shardKey,
childPath,
recalculate,
throttle,
)
if err != nil {
mu.Lock()
if firstErr == nil {
firstErr = err
}
mu.Unlock()
return
}
if branchChild, ok := child.(*LazyVectorCommitmentBranchNode); ok {
h := sha512.New()
h.Write([]byte{1})
for _, p := range branchChild.Prefix {
h.Write(binary.BigEndian.AppendUint32([]byte{}, uint32(p)))
}
h.Write(commit)
commit = h.Sum(nil)
}
vector[i] = commit
} else {
vector[i] = make([]byte, 64)
}
}(i, child, childPath)
default:
if throttle == nil {
// Sequential path: no goroutines, no sync primitives
for i, child := range node.Children {
childPath := slices.Concat(node.FullPrefix, []int{i})
if child == nil {
var err error
child, err = node.Store.GetNodeByPath(
@ -265,13 +207,123 @@ func commitNode(
} else {
vector[i] = make([]byte, 64)
}
wg.Done()
}
}
wg.Wait()
} else {
// Parallel path: use goroutines with throttle channel
var wg sync.WaitGroup
var mu sync.Mutex
var firstErr error
if firstErr != nil {
return nil, firstErr
for i, child := range node.Children {
childPath := slices.Concat(node.FullPrefix, []int{i})
wg.Add(1)
select {
case throttle <- struct{}{}:
go func(i int, child LazyVectorCommitmentNode, childPath []int) {
defer wg.Done()
defer func() { <-throttle }()
if child == nil {
var err error
child, err = node.Store.GetNodeByPath(
setType,
phaseType,
shardKey,
childPath,
)
if err != nil && !strings.Contains(err.Error(), "item not found") {
mu.Lock()
if firstErr == nil {
firstErr = errors.Wrap(err, "failed to get node by path")
}
mu.Unlock()
return
}
}
if child != nil {
commit, err := commitNode(
inclusionProver,
child,
txn,
setType,
phaseType,
shardKey,
childPath,
recalculate,
throttle,
)
if err != nil {
mu.Lock()
if firstErr == nil {
firstErr = err
}
mu.Unlock()
return
}
if branchChild, ok := child.(*LazyVectorCommitmentBranchNode); ok {
h := sha512.New()
h.Write([]byte{1})
for _, p := range branchChild.Prefix {
h.Write(binary.BigEndian.AppendUint32([]byte{}, uint32(p)))
}
h.Write(commit)
commit = h.Sum(nil)
}
vector[i] = commit
} else {
vector[i] = make([]byte, 64)
}
}(i, child, childPath)
default:
if child == nil {
var err error
child, err = node.Store.GetNodeByPath(
setType,
phaseType,
shardKey,
childPath,
)
if err != nil && !strings.Contains(err.Error(), "item not found") {
return nil, errors.Wrap(err, "failed to get node by path")
}
}
if child != nil {
commit, err := commitNode(
inclusionProver,
child,
txn,
setType,
phaseType,
shardKey,
childPath,
recalculate,
throttle,
)
if err != nil {
return nil, err
}
if branchChild, ok := child.(*LazyVectorCommitmentBranchNode); ok {
h := sha512.New()
h.Write([]byte{1})
for _, p := range branchChild.Prefix {
h.Write(binary.BigEndian.AppendUint32([]byte{}, uint32(p)))
}
h.Write(commit)
commit = h.Sum(nil)
}
vector[i] = commit
} else {
vector[i] = make([]byte, 64)
}
wg.Done()
}
}
wg.Wait()
if firstErr != nil {
return nil, firstErr
}
}
data := []byte{}
@ -1942,10 +1994,15 @@ func (t *LazyVectorCommitmentTree) Commit(
return make([]byte, 64)
}
// Wrap txn for thread safety since commitNode uses parallel goroutines
// Wrap txn for thread safety when commitNode uses parallel goroutines.
// With GOMAXPROCS=1, commitNode runs sequentially so no wrapper needed.
var wrappedTxn TreeBackingStoreTransaction
if txn != nil {
wrappedTxn = &SyncTransaction{Txn: txn}
if runtime.WorkerCount(0, false, false) > 1 {
wrappedTxn = &SyncTransaction{Txn: txn}
} else {
wrappedTxn = txn
}
}
commitment := t.Root.Commit(

View File

@ -17,6 +17,8 @@ type WorkerManager interface {
RegisterWorker(info *store.WorkerInfo) error
ProposeAllocations(coreIds []uint, filters [][]byte) error
DecideAllocations(reject [][]byte, confirm [][]byte) error
ProposeLeave(filters [][]byte) error
DecideLeave(reject [][]byte, confirm [][]byte) error
RangeWorkers() ([]*store.WorkerInfo, error)
RespawnWorker(coreId uint, filter []byte) error
}