diff --git a/node/consensus/app/consensus_liveness_provider.go b/node/consensus/app/consensus_liveness_provider.go index 8568357..dc90a86 100644 --- a/node/consensus/app/consensus_liveness_provider.go +++ b/node/consensus/app/consensus_liveness_provider.go @@ -114,7 +114,7 @@ func (p *AppLivenessProvider) Collect( state, ) if err != nil { - p.engine.logger.Error( + p.engine.logger.Info( "could not validate for execution", zap.Int("message_index", i), zap.Error(err), diff --git a/node/consensus/app/consensus_transition_listener.go b/node/consensus/app/consensus_transition_listener.go index 3a98780..7f71ce6 100644 --- a/node/consensus/app/consensus_transition_listener.go +++ b/node/consensus/app/consensus_transition_listener.go @@ -10,7 +10,7 @@ type AppTracer struct { } func (t *AppTracer) Trace(message string) { - t.logger.Debug(message) + // t.logger.Debug(message) } func (t *AppTracer) Error(message string, err error) { diff --git a/node/consensus/global/consensus_liveness_provider.go b/node/consensus/global/consensus_liveness_provider.go index 2e12771..dc03e2a 100644 --- a/node/consensus/global/consensus_liveness_provider.go +++ b/node/consensus/global/consensus_liveness_provider.go @@ -11,10 +11,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "golang.org/x/crypto/sha3" - "source.quilibrium.com/quilibrium/monorepo/node/consensus/reward" - hgstate "source.quilibrium.com/quilibrium/monorepo/node/execution/state/hypergraph" "source.quilibrium.com/quilibrium/monorepo/protobufs" - "source.quilibrium.com/quilibrium/monorepo/types/execution/state" "source.quilibrium.com/quilibrium/monorepo/types/tries" ) @@ -72,9 +69,6 @@ func (p *GlobalLivenessProvider) Collect( acceptedMessages := []*protobufs.Message{} - var state state.State - state = hgstate.NewHypergraphState(p.engine.hypergraph) - frameNumber := uint64(0) currentFrame, _ := p.engine.globalTimeReel.GetHead() if currentFrame != nil && currentFrame.Header != nil { @@ -88,9 +82,13 @@ func (p *GlobalLivenessProvider) Collect( zap.Int("message_count", len(messages)), ) for i, message := range messages { - costBasis, err := p.engine.executionManager.GetCost(message.Payload) + err := p.engine.executionManager.ValidateMessage( + frameNumber, + message.Address, + message.Payload, + ) if err != nil { - p.engine.logger.Error( + p.engine.logger.Debug( "invalid message", zap.Int("message_index", i), zap.Error(err), @@ -98,39 +96,6 @@ func (p *GlobalLivenessProvider) Collect( continue } - p.engine.currentDifficultyMu.RLock() - difficulty := uint64(p.engine.currentDifficulty) - p.engine.currentDifficultyMu.RUnlock() - var baseline *big.Int - if costBasis.Cmp(big.NewInt(0)) == 0 { - baseline = big.NewInt(0) - } else { - baseline = reward.GetBaselineFee( - difficulty, - p.engine.hypergraph.GetSize(nil, nil).Uint64(), - costBasis.Uint64(), - 8000000000, - ) - baseline.Quo(baseline, costBasis) - } - - result, err := p.engine.executionManager.ProcessMessage( - frameNumber, - baseline, - message.Address, - message.Payload, - state, - ) - if err != nil { - p.engine.logger.Error( - "error processing message", - zap.Int("message_index", i), - zap.Error(err), - ) - continue - } - - state = result.State acceptedMessages = append(acceptedMessages, message) } diff --git a/node/consensus/global/consensus_transition_listener.go b/node/consensus/global/consensus_transition_listener.go index af0a3ee..8f0df39 100644 --- a/node/consensus/global/consensus_transition_listener.go +++ b/node/consensus/global/consensus_transition_listener.go @@ -10,7 +10,7 @@ type GlobalTracer struct { } func (t *GlobalTracer) Trace(message string) { - t.logger.Debug(message) + // t.logger.Debug(message) } func (t *GlobalTracer) Error(message string, err error) { diff --git a/node/consensus/global/event_distributor.go b/node/consensus/global/event_distributor.go index 60f145e..524f5ca 100644 --- a/node/consensus/global/event_distributor.go +++ b/node/consensus/global/event_distributor.go @@ -419,11 +419,19 @@ func (e *GlobalConsensusEngine) evaluateForProposals( allocated := false pending := false if self != nil { + e.logger.Debug("checking allocations") for _, allocation := range self.Allocations { + e.logger.Debug("checking allocation", zap.String("filter", hex.EncodeToString(allocation.ConfirmationFilter))) if bytes.Equal(allocation.ConfirmationFilter, filter) { allocated = allocation.Status != 4 if e.config.P2P.Network != 0 || data.Frame.Header.FrameNumber > 252840 { + e.logger.Debug( + "checking pending status of allocation", + zap.Int("status", int(allocation.Status)), + zap.Uint64("join_frame_number", allocation.JoinFrameNumber), + zap.Uint64("frame_number", data.Frame.Header.FrameNumber), + ) pending = allocation.Status == 0 && allocation.JoinFrameNumber+360 <= data.Frame.Header.FrameNumber } diff --git a/node/consensus/global/global_consensus_engine.go b/node/consensus/global/global_consensus_engine.go index 545dd58..61ec572 100644 --- a/node/consensus/global/global_consensus_engine.go +++ b/node/consensus/global/global_consensus_engine.go @@ -1166,8 +1166,6 @@ func (e *GlobalConsensusEngine) materialize( var state state.State state = hgstate.NewHypergraphState(e.hypergraph) - acceptedMessages := []*protobufs.MessageBundle{} - e.logger.Debug( "materializing messages", zap.Int("message_count", len(requests)), @@ -1235,7 +1233,10 @@ func (e *GlobalConsensusEngine) materialize( } state = result.State - acceptedMessages = append(acceptedMessages, request) + } + + if err := state.Commit(); err != nil { + return errors.Wrap(err, "materialize") } err := e.proverRegistry.ProcessStateTransition(state, frameNumber) @@ -1243,10 +1244,6 @@ func (e *GlobalConsensusEngine) materialize( return errors.Wrap(err, "materialize") } - if err := state.Commit(); err != nil { - return errors.Wrap(err, "materialize") - } - return nil } diff --git a/node/consensus/global/message_subscription.go b/node/consensus/global/message_subscription.go index 06b94af..ec610f3 100644 --- a/node/consensus/global/message_subscription.go +++ b/node/consensus/global/message_subscription.go @@ -126,6 +126,7 @@ func (e *GlobalConsensusEngine) subscribeToProverMessages() error { GLOBAL_PROVER_BITMASK, func(message *pb.Message) error { if e.config.P2P.Network != 99 && !e.config.Engine.ArchiveMode { + e.logger.Debug("dropping prover message, not in archive mode") return nil } @@ -133,6 +134,7 @@ func (e *GlobalConsensusEngine) subscribeToProverMessages() error { case <-e.haltCtx.Done(): return nil case e.globalProverMessageQueue <- message: + e.logger.Debug("received prover message") return nil case <-e.ctx.Done(): return errors.New("context cancelled") diff --git a/node/consensus/global/message_validation.go b/node/consensus/global/message_validation.go index de373cb..e9b6039 100644 --- a/node/consensus/global/message_validation.go +++ b/node/consensus/global/message_validation.go @@ -118,6 +118,10 @@ func (e *GlobalConsensusEngine) validateProverMessage( peerID peer.ID, message *pb.Message, ) tp2p.ValidationResult { + e.logger.Debug( + "validating prover message from peer", + zap.String("peer_id", peerID.String()), + ) // Check if data is long enough to contain type prefix if len(message.Data) < 4 { e.logger.Error( @@ -133,6 +137,10 @@ func (e *GlobalConsensusEngine) validateProverMessage( switch typePrefix { case protobufs.MessageBundleType: + e.logger.Debug( + "validating message bundle from peer", + zap.String("peer_id", peerID.String()), + ) // Prover messages come wrapped in MessageBundle messageBundle := &protobufs.MessageBundle{} if err := messageBundle.FromCanonicalBytes(message.Data); err != nil { @@ -147,6 +155,7 @@ func (e *GlobalConsensusEngine) validateProverMessage( now := time.Now().UnixMilli() if messageBundle.Timestamp > now+5000 || messageBundle.Timestamp < now-5000 { + e.logger.Debug("message too late or too early") return tp2p.ValidationResultIgnore } diff --git a/node/consensus/provers/prover_registry.go b/node/consensus/provers/prover_registry.go index 0168e3b..c13f001 100644 --- a/node/consensus/provers/prover_registry.go +++ b/node/consensus/provers/prover_registry.go @@ -11,6 +11,7 @@ import ( "go.uber.org/zap" "golang.org/x/exp/slices" "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/global" + hgstate "source.quilibrium.com/quilibrium/monorepo/node/execution/state/hypergraph" "source.quilibrium.com/quilibrium/monorepo/types/consensus" "source.quilibrium.com/quilibrium/monorepo/types/execution/intrinsics" "source.quilibrium.com/quilibrium/monorepo/types/execution/state" @@ -149,13 +150,6 @@ func (r *ProverRegistry) GetProverInfo( ) if info, exists := r.proverCache[string(address)]; exists { - r.logger.Debug( - "prover info found", - zap.String("address", fmt.Sprintf("%x", address)), - zap.String("public_key", fmt.Sprintf("%x", info.PublicKey)), - zap.Uint8("status", uint8(info.Status)), - zap.Int("allocation_count", len(info.Allocations)), - ) return info, nil } @@ -1125,20 +1119,38 @@ func (r *ProverRegistry) processProverChange( switch change.StateChange { case state.CreateStateChangeEvent, state.UpdateStateChangeEvent: + if !bytes.Equal(change.Discriminator, hgstate.VertexAddsDiscriminator) { + return nil + } + // A prover was created or updated if change.Value != nil && change.Value.DataValue() != nil { data := change.Value.DataValue() - // Check if this is a Prover or ProverAllocation - publicKey, err := r.rdfMultiprover.Get( + t, err := r.rdfMultiprover.GetType( global.GLOBAL_RDF_SCHEMA, - "prover:Prover", - "PublicKey", + intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], data, ) + if err != nil { + return nil + } + + // Check if this is a Prover or ProverAllocation + switch t { + case "prover:Prover": + r.logger.Debug("processing prover change") + publicKey, err := r.rdfMultiprover.Get( + global.GLOBAL_RDF_SCHEMA, + "prover:Prover", + "PublicKey", + data, + ) + if err != nil { + r.logger.Debug("no public key") + return nil + } - if err == nil && len(publicKey) > 0 { - // This is a Prover vertex statusBytes, err := r.rdfMultiprover.Get( global.GLOBAL_RDF_SCHEMA, "prover:Prover", @@ -1146,9 +1158,11 @@ func (r *ProverRegistry) processProverChange( data, ) if err != nil || len(statusBytes) == 0 { + r.logger.Debug("no status") return nil // Skip if no status } status := statusBytes[0] + r.logger.Debug("status of prover change", zap.Int("status", int(status))) // Map internal status to our ProverStatus enum var mappedStatus consensus.ProverStatus @@ -1234,33 +1248,16 @@ func (r *ProverRegistry) processProverChange( proverInfo.DelegateAddress = delegateAddress proverInfo.KickFrameNumber = kickFrameNumber } - - // If global prover is active, add to global trie - if mappedStatus == consensus.ProverStatusActive { - if err := r.addProverToTrie( - proverAddress, - publicKey, - nil, - frameNumber, - ); err != nil { - return errors.Wrap(err, "failed to add prover to global trie") - } - } else { - // Remove from global trie if not active - if err := r.removeProverFromTrie(proverAddress, nil); err != nil { - return errors.Wrap(err, "failed to remove prover from global trie") - } - } - } else { - // Try to read as ProverAllocation + case "allocation:ProverAllocation": + r.logger.Debug("processing prover allocation change") proverRef, err := r.rdfMultiprover.Get( global.GLOBAL_RDF_SCHEMA, "allocation:ProverAllocation", - "prover:Prover", + "Prover", data, ) if err != nil || len(proverRef) == 0 { - // Neither Prover nor ProverAllocation, skip + r.logger.Debug("no prover") return nil } @@ -1272,6 +1269,7 @@ func (r *ProverRegistry) processProverChange( data, ) if err != nil || len(statusBytes) == 0 { + r.logger.Debug("no status") return nil } status := statusBytes[0] @@ -1302,12 +1300,15 @@ func (r *ProverRegistry) processProverChange( ) // Extract filters - confirmationFilter, _ := r.rdfMultiprover.Get( + confirmationFilter, err := r.rdfMultiprover.Get( global.GLOBAL_RDF_SCHEMA, "allocation:ProverAllocation", "ConfirmationFilter", data, ) + if err != nil { + return errors.Wrap(err, "process prover change") + } // Find the prover this allocation belongs to if proverInfo, exists := r.proverCache[string(proverRef)]; exists { @@ -1320,7 +1321,7 @@ func (r *ProverRegistry) processProverChange( confirmationFilter, frameNumber, ); err != nil { - return errors.Wrap(err, "failed to add prover to filter trie") + return errors.Wrap(err, "process prover change") } } else { // Remove from filter trie if not active @@ -1330,7 +1331,7 @@ func (r *ProverRegistry) processProverChange( ); err != nil { return errors.Wrap( err, - "failed to remove prover from filter trie", + "process prover change", ) } } diff --git a/node/execution/intrinsics/global/global_prover_update.go b/node/execution/intrinsics/global/global_prover_update.go index 2ab218d..643ae9d 100644 --- a/node/execution/intrinsics/global/global_prover_update.go +++ b/node/execution/intrinsics/global/global_prover_update.go @@ -88,6 +88,14 @@ func (p *ProverUpdate) Materialize( ) } + rewardAddress, err := poseidon.HashBytes(slices.Concat( + token.QUIL_TOKEN_ADDRESS[:], + proverAddress, + )) + if err != nil { + return nil, errors.Wrap(err, "materialize") + } + // Ensure the prover exists (under GLOBAL_INTRINSIC_ADDRESS + proverAddress) proverFullAddr := [64]byte{} copy(proverFullAddr[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:]) @@ -132,12 +140,11 @@ func (p *ProverUpdate) Materialize( ) } - // Now update only the reward entry under the QUIL token namespace: - // key = (token.QUIL_TOKEN_ADDRESS , proverAddress) in VertexAddsDiscriminator + // Now update only the reward entry in VertexAddsDiscriminator // We will preserve the existing Balance and only set DelegateAddress. rewardPriorVertex, err := hg.Get( - token.QUIL_TOKEN_ADDRESS, - proverAddress, + intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], + rewardAddress.FillBytes(make([]byte, 32)), hgstate.VertexAddsDiscriminator, ) if err != nil { @@ -162,7 +169,7 @@ func (p *ProverUpdate) Materialize( // Set new DelegateAddress if err := p.rdfMultiprover.Set( GLOBAL_RDF_SCHEMA, - token.QUIL_TOKEN_ADDRESS, + intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], "reward:ProverReward", "DelegateAddress", p.DelegateAddress, @@ -172,8 +179,8 @@ func (p *ProverUpdate) Materialize( } unmodifiedPrior, err := hg.Get( - token.QUIL_TOKEN_ADDRESS, - proverAddress, + intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], + rewardAddress.FillBytes(make([]byte, 32)), hgstate.VertexAddsDiscriminator, ) var unmodifiedTree *tries.VectorCommitmentTree @@ -190,16 +197,16 @@ func (p *ProverUpdate) Materialize( // Build the updated reward vertex rewardVertex := hg.NewVertexAddMaterializedState( - [32]byte(token.QUIL_TOKEN_ADDRESS), - [32]byte(slices.Clone(proverAddress)), + [32]byte(intrinsics.GLOBAL_INTRINSIC_ADDRESS), + [32]byte(slices.Clone(rewardAddress.FillBytes(make([]byte, 32)))), frameNumber, unmodifiedTree, rewardPriorTree, ) if err := hg.Set( - token.QUIL_TOKEN_ADDRESS, - proverAddress, + intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], + rewardAddress.FillBytes(make([]byte, 32)), hgstate.VertexAddsDiscriminator, frameNumber, rewardVertex, diff --git a/node/execution/intrinsics/global/global_prover_update_test.go b/node/execution/intrinsics/global/global_prover_update_test.go index 98f0481..06ee974 100644 --- a/node/execution/intrinsics/global/global_prover_update_test.go +++ b/node/execution/intrinsics/global/global_prover_update_test.go @@ -241,23 +241,24 @@ func TestProverUpdate_Materialize_PreservesBalance(t *testing.T) { nonZero := make([]byte, 32) binary.BigEndian.PutUint64(nonZero[24:], 12345) require.NoError(t, rdf.Set( - global.GLOBAL_RDF_SCHEMA, token.QUIL_TOKEN_ADDRESS, + global.GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], "reward:ProverReward", "Balance", nonZero, rewardPrior, )) fullProver := [64]byte{} fullReward := [64]byte{} + rewardAddr, err := poseidon.HashBytes(slices.Concat(token.QUIL_TOKEN_ADDRESS[:], addr)) copy(fullProver[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:]) - copy(fullReward[:32], token.QUIL_TOKEN_ADDRESS) + copy(fullReward[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:]) copy(fullProver[32:], addr) - copy(fullReward[32:], addr) + copy(fullReward[32:], rewardAddr.FillBytes(make([]byte, 32))) mockHG.On("GetVertex", fullProver).Return(nil, nil) mockHG.On("GetVertexData", fullProver).Return(proverTree, nil) mockHG.On("GetVertex", fullReward).Return(nil, nil) mockHG.On("GetVertexData", fullReward).Return(rewardPrior, nil) // Hypergraph lookups - mockHG.On("Get", token.QUIL_TOKEN_ADDRESS, addr, hgstate.VertexAddsDiscriminator).Return(rewardPrior, nil) + mockHG.On("Get", intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], rewardAddr.FillBytes(make([]byte, 32)), hgstate.VertexAddsDiscriminator).Return(rewardPrior, nil) mockHG.On("Get", intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], addr, hgstate.HyperedgeAddsDiscriminator).Return(nil, assert.AnError) delegate := make([]byte, 32) @@ -271,8 +272,8 @@ func TestProverUpdate_Materialize_PreservesBalance(t *testing.T) { On("SetVertexData", mock.Anything, mock.MatchedBy(func(id [64]byte) bool { - return bytes.Equal(id[:32], token.QUIL_TOKEN_ADDRESS) && - bytes.Equal(id[32:], addr) + return bytes.Equal(id[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:]) && + bytes.Equal(id[32:], rewardAddr.FillBytes(make([]byte, 32))) }), mock.MatchedBy(func(tree *qcrypto.VectorCommitmentTree) bool { d, err := rdf.Get(global.GLOBAL_RDF_SCHEMA, "reward:ProverReward", "DelegateAddress", tree)