rate limiter logic

This commit is contained in:
Cassandra Heart 2024-11-11 13:13:20 -06:00
parent d5ebd54be6
commit 26cbb2092e
No known key found for this signature in database
GPG Key ID: 6352152859385958
5 changed files with 52 additions and 3 deletions

View File

@ -541,11 +541,11 @@ func (e *DataClockConsensusEngine) PerformTimeProof(
wg := sync.WaitGroup{}
wg.Add(len(actives))
for i, client := range actives {
i := i
client := client
go func() {
e.logger.Info("performing data proof")
resp, err :=
client.client.CalculateChallengeProof(
context.Background(),

View File

@ -51,7 +51,6 @@ func (rl *RateLimiter) Allow(peerId peer.ID) error {
return status.Errorf(codes.ResourceExhausted,
"maximum number of unique callers (%d) reached", rl.maxTokens)
}
return nil
}
rl.clients[peerId] = now

View File

@ -0,0 +1,38 @@
package data_test
import (
"crypto/rand"
"testing"
"time"
"github.com/cloudflare/circl/sign/ed448"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/assert"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/data"
)
func TestRateLimiter(t *testing.T) {
limiter := data.NewRateLimiter(5, 10*time.Second)
for i := 0; i < 7; i++ {
_, priv, _ := ed448.GenerateKey(rand.Reader)
privKey, err := crypto.UnmarshalEd448PrivateKey(priv)
if err != nil {
t.FailNow()
}
pub := privKey.GetPublic()
peer, _ := peer.IDFromPublicKey(pub)
err = limiter.Allow(peer)
if i < 5 {
assert.NoError(t, err)
} else {
assert.Error(t, err)
}
time.Sleep(time.Second)
}
}

View File

@ -176,12 +176,17 @@ func (e *DataClockConsensusEngine) processFrame(
} else if len(e.config.Engine.DataWorkerMultiaddrs) == 0 {
e.logger.Error(
"client failed, reconnecting after 50ms",
zap.Uint32("client", uint32(i)),
)
time.Sleep(50 * time.Millisecond)
client, err =
e.createParallelDataClientsFromBaseMultiaddrAndIndex(uint32(i))
if err != nil {
e.logger.Error("failed to reconnect", zap.Error(err))
e.logger.Error(
"failed to reconnect",
zap.Uint32("client", uint32(i)),
zap.Error(err),
)
}
}
e.clients[i] = client
@ -223,6 +228,12 @@ func (e *DataClockConsensusEngine) processFrame(
panic(err)
}
e.logger.Info(
"submitting data proof",
zap.Int("ring", ring),
zap.Int("active_workers", len(outputs)),
)
e.publishMessage(e.txFilter, &protobufs.TokenRequest{
Request: &protobufs.TokenRequest_Mint{
Mint: &protobufs.MintCoinRequest{

View File

@ -52,6 +52,7 @@ func (pubsub) GetMultiaddrOfPeerStream(ctx context.Context, peerId []byte) <-cha
return nil
}
func (pubsub) GetMultiaddrOfPeer(peerId []byte) string { return "" }
func (pubsub) GetNetwork() uint { return 1 }
func (pubsub) StartDirectChannelListener(
key []byte,
purpose string,