From 2ce7eb26d42249782ef4c5e4e7e51e303bc7d311 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Mon, 11 Nov 2024 04:39:52 -0600 Subject: [PATCH] change order of operations, don't commit an empty tree? --- .../data/data_clock_consensus_engine.go | 167 +----------------- node/consensus/data/main_data_loop.go | 158 ++++++++++++++++- .../token/application/token_handle_mint.go | 2 +- .../token/token_execution_engine.go | 1 - 4 files changed, 154 insertions(+), 174 deletions(-) diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index 3d56f35..4d89bd8 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -9,7 +9,6 @@ import ( "sync" "time" - "github.com/iden3/go-iden3-crypto/poseidon" "github.com/libp2p/go-libp2p/p2p/discovery/backoff" "github.com/multiformats/go-multiaddr" mn "github.com/multiformats/go-multiaddr/net" @@ -121,6 +120,8 @@ type DataClockConsensusEngine struct { report *protobufs.SelfTestReport clients []protobufs.DataIPCServiceClient grpcRateLimiter *RateLimiter + previousTree *mt.MerkleTree + clientReconnectTest int } var _ consensus.DataConsensusEngine = (*DataClockConsensusEngine)(nil) @@ -490,19 +491,6 @@ func (e *DataClockConsensusEngine) Start() <-chan error { go e.runPreMidnightProofWorker() go func() { - h, err := poseidon.HashBytes(e.pubSub.GetPeerID()) - if err != nil { - panic(err) - } - peerProvingKeyAddress := h.FillBytes(make([]byte, 32)) - - frame, err := e.dataTimeReel.Head() - if err != nil { - panic(err) - } - - // Let it sit until we at least have a few more peers inbound - time.Sleep(30 * time.Second) parallelism := e.report.Cores - 1 if parallelism < 3 { @@ -522,157 +510,6 @@ func (e *DataClockConsensusEngine) Start() <-chan error { panic(err) } } - - var previousTree *mt.MerkleTree - - clientReconnectTest := 0 - - for e.GetState() < consensus.EngineStateStopping { - nextFrame, err := e.dataTimeReel.Head() - if err != nil { - panic(err) - } - - if frame.FrameNumber == nextFrame.FrameNumber { - time.Sleep(1 * time.Second) - continue - } - - frame = nextFrame - - clientReconnectTest++ - if clientReconnectTest >= 10 { - wg := sync.WaitGroup{} - wg.Add(len(e.clients)) - for i, client := range e.clients { - i := i - client := client - go func() { - for j := 3; j >= 0; j-- { - var err error - if client == nil { - if len(e.config.Engine.DataWorkerMultiaddrs) != 0 { - e.logger.Error( - "client failed, reconnecting after 50ms", - zap.Uint32("client", uint32(i)), - ) - time.Sleep(50 * time.Millisecond) - client, err = e.createParallelDataClientsFromListAndIndex(uint32(i)) - if err != nil { - e.logger.Error("failed to reconnect", zap.Error(err)) - } - } else if len(e.config.Engine.DataWorkerMultiaddrs) == 0 { - e.logger.Error( - "client failed, reconnecting after 50ms", - ) - time.Sleep(50 * time.Millisecond) - client, err = - e.createParallelDataClientsFromBaseMultiaddrAndIndex(uint32(i)) - if err != nil { - e.logger.Error("failed to reconnect", zap.Error(err)) - } - } - e.clients[i] = client - continue - } - } - wg.Done() - }() - } - wg.Wait() - clientReconnectTest = 0 - } - - for i, trie := range e.GetFrameProverTries()[1:] { - if trie.Contains(peerProvingKeyAddress) { - outputs := e.PerformTimeProof(frame, frame.Difficulty, i) - if outputs == nil || len(outputs) < 3 { - e.logger.Error("could not successfully build proof, reattempting") - break - } - modulo := len(outputs) - proofTree, payload, output, err := tries.PackOutputIntoPayloadAndProof( - outputs, - modulo, - frame, - previousTree, - ) - if err != nil { - e.logger.Error( - "could not successfully pack proof, reattempting", - zap.Error(err), - ) - break - } - previousTree = proofTree - - sig, err := e.pubSub.SignMessage( - payload, - ) - if err != nil { - panic(err) - } - - e.publishMessage(e.txFilter, &protobufs.TokenRequest{ - Request: &protobufs.TokenRequest_Mint{ - Mint: &protobufs.MintCoinRequest{ - Proofs: output, - Signature: &protobufs.Ed448Signature{ - PublicKey: &protobufs.Ed448PublicKey{ - KeyValue: e.pubSub.GetPublicKey(), - }, - Signature: sig, - }, - }, - }, - }) - - if e.config.Engine.AutoMergeCoins { - _, addrs, _, err := e.coinStore.GetCoinsForOwner( - peerProvingKeyAddress, - ) - if err != nil { - e.logger.Error( - "received error while iterating coins", - zap.Error(err), - ) - break - } - - if len(addrs) > 25 { - message := []byte("merge") - refs := []*protobufs.CoinRef{} - for _, addr := range addrs { - message = append(message, addr...) - refs = append(refs, &protobufs.CoinRef{ - Address: addr, - }) - } - - sig, _ := e.pubSub.SignMessage( - message, - ) - - e.publishMessage(e.txFilter, &protobufs.TokenRequest{ - Request: &protobufs.TokenRequest_Merge{ - Merge: &protobufs.MergeCoinRequest{ - Coins: refs, - Signature: &protobufs.Ed448Signature{ - PublicKey: &protobufs.Ed448PublicKey{ - KeyValue: e.pubSub.GetPublicKey(), - }, - Signature: sig, - }, - }, - }, - }) - } - } - - break - } - } - } }() return errChan diff --git a/node/consensus/data/main_data_loop.go b/node/consensus/data/main_data_loop.go index 9e7d524..853194b 100644 --- a/node/consensus/data/main_data_loop.go +++ b/node/consensus/data/main_data_loop.go @@ -2,8 +2,10 @@ package data import ( "bytes" + "sync" "time" + "github.com/iden3/go-iden3-crypto/poseidon" "go.uber.org/zap" "source.quilibrium.com/quilibrium/monorepo/node/consensus" "source.quilibrium.com/quilibrium/monorepo/node/protobufs" @@ -127,13 +129,155 @@ func (e *DataClockConsensusEngine) processFrame( return nextFrame } else { - if !e.IsInProverTrie(e.pubSub.GetPeerID()) && - dataFrame.Timestamp > time.Now().UnixMilli()-30000 { - e.logger.Info("announcing prover join") - for _, eng := range e.executionEngines { - eng.AnnounceProverMerge() - eng.AnnounceProverJoin() - break + if latestFrame.Timestamp > time.Now().UnixMilli()-30000 { + if !e.IsInProverTrie(e.pubSub.GetPeerID()) { + e.logger.Info("announcing prover join") + for _, eng := range e.executionEngines { + eng.AnnounceProverMerge() + eng.AnnounceProverJoin() + break + } + } else { + h, err := poseidon.HashBytes(e.pubSub.GetPeerID()) + if err != nil { + panic(err) + } + peerProvingKeyAddress := h.FillBytes(make([]byte, 32)) + + ring := -1 + for i, tries := range e.GetFrameProverTries()[1:] { + i := i + if tries.Contains(peerProvingKeyAddress) { + ring = i + } + } + + e.clientReconnectTest++ + if e.clientReconnectTest >= 10 { + wg := sync.WaitGroup{} + wg.Add(len(e.clients)) + for i, client := range e.clients { + i := i + client := client + go func() { + for j := 3; j >= 0; j-- { + var err error + if client == nil { + if len(e.config.Engine.DataWorkerMultiaddrs) != 0 { + e.logger.Error( + "client failed, reconnecting after 50ms", + zap.Uint32("client", uint32(i)), + ) + time.Sleep(50 * time.Millisecond) + client, err = e.createParallelDataClientsFromListAndIndex(uint32(i)) + if err != nil { + e.logger.Error("failed to reconnect", zap.Error(err)) + } + } else if len(e.config.Engine.DataWorkerMultiaddrs) == 0 { + e.logger.Error( + "client failed, reconnecting after 50ms", + ) + time.Sleep(50 * time.Millisecond) + client, err = + e.createParallelDataClientsFromBaseMultiaddrAndIndex(uint32(i)) + if err != nil { + e.logger.Error("failed to reconnect", zap.Error(err)) + } + } + e.clients[i] = client + continue + } + } + wg.Done() + }() + } + wg.Wait() + e.clientReconnectTest = 0 + } + + outputs := e.PerformTimeProof(latestFrame, latestFrame.Difficulty, ring) + if outputs == nil || len(outputs) < 3 { + e.logger.Error("could not successfully build proof, reattempting") + return latestFrame + } + modulo := len(outputs) + proofTree, payload, output, err := tries.PackOutputIntoPayloadAndProof( + outputs, + modulo, + latestFrame, + e.previousTree, + ) + if err != nil { + e.logger.Error( + "could not successfully pack proof, reattempting", + zap.Error(err), + ) + return latestFrame + } + e.previousTree = proofTree + + sig, err := e.pubSub.SignMessage( + payload, + ) + if err != nil { + panic(err) + } + + e.publishMessage(e.txFilter, &protobufs.TokenRequest{ + Request: &protobufs.TokenRequest_Mint{ + Mint: &protobufs.MintCoinRequest{ + Proofs: output, + Signature: &protobufs.Ed448Signature{ + PublicKey: &protobufs.Ed448PublicKey{ + KeyValue: e.pubSub.GetPublicKey(), + }, + Signature: sig, + }, + }, + }, + }) + + if e.config.Engine.AutoMergeCoins { + _, addrs, _, err := e.coinStore.GetCoinsForOwner( + peerProvingKeyAddress, + ) + if err != nil { + e.logger.Error( + "received error while iterating coins", + zap.Error(err), + ) + return latestFrame + } + + if len(addrs) > 25 { + message := []byte("merge") + refs := []*protobufs.CoinRef{} + for _, addr := range addrs { + message = append(message, addr...) + refs = append(refs, &protobufs.CoinRef{ + Address: addr, + }) + } + + sig, _ := e.pubSub.SignMessage( + message, + ) + + e.publishMessage(e.txFilter, &protobufs.TokenRequest{ + Request: &protobufs.TokenRequest_Merge{ + Merge: &protobufs.MergeCoinRequest{ + Coins: refs, + Signature: &protobufs.Ed448Signature{ + PublicKey: &protobufs.Ed448PublicKey{ + KeyValue: e.pubSub.GetPublicKey(), + }, + Signature: sig, + }, + }, + }, + }) + } + } } } return latestFrame diff --git a/node/execution/intrinsics/token/application/token_handle_mint.go b/node/execution/intrinsics/token/application/token_handle_mint.go index 51abfae..c111043 100644 --- a/node/execution/intrinsics/token/application/token_handle_mint.go +++ b/node/execution/intrinsics/token/application/token_handle_mint.go @@ -19,7 +19,7 @@ import ( ) const PROOF_FRAME_CUTOFF = 1 -const PROOF_FRAME_RING_RESET = 5650 +const PROOF_FRAME_RING_RESET = 5750 func (a *TokenApplication) handleMint( currentFrameNumber uint64, diff --git a/node/execution/intrinsics/token/token_execution_engine.go b/node/execution/intrinsics/token/token_execution_engine.go index 72ce03d..bbe32d8 100644 --- a/node/execution/intrinsics/token/token_execution_engine.go +++ b/node/execution/intrinsics/token/token_execution_engine.go @@ -786,7 +786,6 @@ func (e *TokenExecutionEngine) ProcessFrame( app.Tries = []*tries.RollingFrecencyCritbitTrie{ app.Tries[0], - &tries.RollingFrecencyCritbitTrie{}, } err = e.clockStore.PutPeerSeniorityMap(