From 26cbb2092e0b7c2851d9145281aaddf683b50da4 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Mon, 11 Nov 2024 13:13:20 -0600 Subject: [PATCH] rate limiter logic --- .../data/data_clock_consensus_engine.go | 2 +- .../data/grpc_worker_rate_limiter.go | 1 - .../data/grpc_worker_rate_limiter_test.go | 38 +++++++++++++++++++ node/consensus/data/main_data_loop.go | 13 ++++++- node/consensus/data/token_handle_mint_test.go | 1 + 5 files changed, 52 insertions(+), 3 deletions(-) create mode 100644 node/consensus/data/grpc_worker_rate_limiter_test.go diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index 515636a..b4c65b0 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -541,11 +541,11 @@ func (e *DataClockConsensusEngine) PerformTimeProof( wg := sync.WaitGroup{} wg.Add(len(actives)) + for i, client := range actives { i := i client := client go func() { - e.logger.Info("performing data proof") resp, err := client.client.CalculateChallengeProof( context.Background(), diff --git a/node/consensus/data/grpc_worker_rate_limiter.go b/node/consensus/data/grpc_worker_rate_limiter.go index d190591..7f56d33 100644 --- a/node/consensus/data/grpc_worker_rate_limiter.go +++ b/node/consensus/data/grpc_worker_rate_limiter.go @@ -51,7 +51,6 @@ func (rl *RateLimiter) Allow(peerId peer.ID) error { return status.Errorf(codes.ResourceExhausted, "maximum number of unique callers (%d) reached", rl.maxTokens) } - return nil } rl.clients[peerId] = now diff --git a/node/consensus/data/grpc_worker_rate_limiter_test.go b/node/consensus/data/grpc_worker_rate_limiter_test.go new file mode 100644 index 0000000..85cf2a2 --- /dev/null +++ b/node/consensus/data/grpc_worker_rate_limiter_test.go @@ -0,0 +1,38 @@ +package data_test + +import ( + "crypto/rand" + "testing" + "time" + + "github.com/cloudflare/circl/sign/ed448" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/assert" + "source.quilibrium.com/quilibrium/monorepo/node/consensus/data" +) + +func TestRateLimiter(t *testing.T) { + limiter := data.NewRateLimiter(5, 10*time.Second) + + for i := 0; i < 7; i++ { + _, priv, _ := ed448.GenerateKey(rand.Reader) + privKey, err := crypto.UnmarshalEd448PrivateKey(priv) + if err != nil { + t.FailNow() + } + + pub := privKey.GetPublic() + + peer, _ := peer.IDFromPublicKey(pub) + err = limiter.Allow(peer) + if i < 5 { + assert.NoError(t, err) + } else { + assert.Error(t, err) + } + + time.Sleep(time.Second) + } + +} diff --git a/node/consensus/data/main_data_loop.go b/node/consensus/data/main_data_loop.go index 853194b..2a7b7f6 100644 --- a/node/consensus/data/main_data_loop.go +++ b/node/consensus/data/main_data_loop.go @@ -176,12 +176,17 @@ func (e *DataClockConsensusEngine) processFrame( } else if len(e.config.Engine.DataWorkerMultiaddrs) == 0 { e.logger.Error( "client failed, reconnecting after 50ms", + zap.Uint32("client", uint32(i)), ) time.Sleep(50 * time.Millisecond) client, err = e.createParallelDataClientsFromBaseMultiaddrAndIndex(uint32(i)) if err != nil { - e.logger.Error("failed to reconnect", zap.Error(err)) + e.logger.Error( + "failed to reconnect", + zap.Uint32("client", uint32(i)), + zap.Error(err), + ) } } e.clients[i] = client @@ -223,6 +228,12 @@ func (e *DataClockConsensusEngine) processFrame( panic(err) } + e.logger.Info( + "submitting data proof", + zap.Int("ring", ring), + zap.Int("active_workers", len(outputs)), + ) + e.publishMessage(e.txFilter, &protobufs.TokenRequest{ Request: &protobufs.TokenRequest_Mint{ Mint: &protobufs.MintCoinRequest{ diff --git a/node/consensus/data/token_handle_mint_test.go b/node/consensus/data/token_handle_mint_test.go index 269c389..ac2d39f 100644 --- a/node/consensus/data/token_handle_mint_test.go +++ b/node/consensus/data/token_handle_mint_test.go @@ -52,6 +52,7 @@ func (pubsub) GetMultiaddrOfPeerStream(ctx context.Context, peerId []byte) <-cha return nil } func (pubsub) GetMultiaddrOfPeer(peerId []byte) string { return "" } +func (pubsub) GetNetwork() uint { return 1 } func (pubsub) StartDirectChannelListener( key []byte, purpose string,