add cache to processing

This commit is contained in:
Cassandra Heart 2025-01-16 00:48:16 -06:00
parent 6dab41ca49
commit 3722901956
No known key found for this signature in database
GPG Key ID: 6352152859385958
2 changed files with 26 additions and 12 deletions

View File

@ -10,6 +10,7 @@ import (
"sync"
"time"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
"github.com/multiformats/go-multiaddr"
mn "github.com/multiformats/go-multiaddr/net"
@ -125,6 +126,7 @@ type DataClockConsensusEngine struct {
previousHead *protobufs.ClockFrame
engineMx sync.Mutex
dependencyMapMx sync.Mutex
recentlyProcessedFrames *lru.Cache[string, struct{}]
stagedTransactions *protobufs.TokenRequests
stagedTransactionsSet map[string]struct{}
stagedTransactionsMx sync.Mutex
@ -235,6 +237,11 @@ func NewDataClockConsensusEngine(
panic(err)
}
cache, err := lru.New[string, struct{}](25)
if err != nil {
panic(err)
}
ctx, cancel := context.WithCancel(context.Background())
e := &DataClockConsensusEngine{
ctx: ctx,
@ -283,6 +290,7 @@ func NewDataClockConsensusEngine(
requestSyncCh: make(chan struct{}, 1),
validationFilter: map[string]struct{}{},
clockFrameFragmentBuffer: clockFrameFragmentBuffer,
recentlyProcessedFrames: cache,
}
logger.Info("constructing consensus engine")
@ -847,9 +855,9 @@ func (e *DataClockConsensusEngine) connectToClient(
_, addr, err := mn.DialArgs(ma)
if err != nil {
e.logger.Error("could not get dial args",
zap.Error(err),
zap.String("multiaddr", ma.String()),
e.logger.Error("could not get dial args",
zap.Error(err),
zap.String("multiaddr", ma.String()),
zap.Int("index", index),
)
return nil, err
@ -870,9 +878,9 @@ func (e *DataClockConsensusEngine) connectToClient(
grpc.WithBlock(),
)
if err != nil {
e.logger.Error("could not dial",
zap.Error(err),
zap.String("multiaddr", ma.String()),
e.logger.Error("could not dial",
zap.Error(err),
zap.String("multiaddr", ma.String()),
zap.Int("index", index),
)
return nil, err
@ -919,7 +927,7 @@ func (e *DataClockConsensusEngine) createParallelDataWorkerClients() {
return
}
e.clientsMx.Lock()
e.clients[index] = client
e.clients[index] = client
e.clientsMx.Unlock()
}()
}
@ -939,7 +947,7 @@ func (e *DataClockConsensusEngine) tryReconnectDataWorkerClients() {
wg.Add(parallelism)
for i := 0; i < parallelism; i++ {
index := i
go func() {
defer wg.Done()
if e.clients[index] != nil {
@ -951,18 +959,18 @@ func (e *DataClockConsensusEngine) tryReconnectDataWorkerClients() {
e.clientsMx.Lock()
e.clients[index] = nil
e.clientsMx.Unlock()
e.logger.Error("failed to connect to data worker",
zap.Error(err),
e.logger.Error("failed to connect to data worker",
zap.Error(err),
zap.Int("index", index),
)
time.Sleep(50 * time.Millisecond)
continue
}
e.clientsMx.Lock()
e.logger.Info("reconnected to data worker",
e.logger.Info("reconnected to data worker",
zap.Int("index", index),
)
e.clients[index] = client
e.clients[index] = client
e.clientsMx.Unlock()
break
}

View File

@ -227,6 +227,12 @@ func (e *DataClockConsensusEngine) handleClockFrame(
return nil
}
if _, ok := e.recentlyProcessedFrames.Peek(string(frame.Output)); ok {
return nil
}
e.recentlyProcessedFrames.Add(string(frame.Output), struct{}{})
e.logger.Debug(
"got clock frame",
zap.Binary("address", address),