diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index 0ca6f89..3216c4f 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -10,6 +10,7 @@ import ( "sync" "time" + lru "github.com/hashicorp/golang-lru/v2" "github.com/libp2p/go-libp2p/p2p/discovery/backoff" "github.com/multiformats/go-multiaddr" mn "github.com/multiformats/go-multiaddr/net" @@ -125,6 +126,7 @@ type DataClockConsensusEngine struct { previousHead *protobufs.ClockFrame engineMx sync.Mutex dependencyMapMx sync.Mutex + recentlyProcessedFrames *lru.Cache[string, struct{}] stagedTransactions *protobufs.TokenRequests stagedTransactionsSet map[string]struct{} stagedTransactionsMx sync.Mutex @@ -235,6 +237,11 @@ func NewDataClockConsensusEngine( panic(err) } + cache, err := lru.New[string, struct{}](25) + if err != nil { + panic(err) + } + ctx, cancel := context.WithCancel(context.Background()) e := &DataClockConsensusEngine{ ctx: ctx, @@ -283,6 +290,7 @@ func NewDataClockConsensusEngine( requestSyncCh: make(chan struct{}, 1), validationFilter: map[string]struct{}{}, clockFrameFragmentBuffer: clockFrameFragmentBuffer, + recentlyProcessedFrames: cache, } logger.Info("constructing consensus engine") @@ -847,9 +855,9 @@ func (e *DataClockConsensusEngine) connectToClient( _, addr, err := mn.DialArgs(ma) if err != nil { - e.logger.Error("could not get dial args", - zap.Error(err), - zap.String("multiaddr", ma.String()), + e.logger.Error("could not get dial args", + zap.Error(err), + zap.String("multiaddr", ma.String()), zap.Int("index", index), ) return nil, err @@ -870,9 +878,9 @@ func (e *DataClockConsensusEngine) connectToClient( grpc.WithBlock(), ) if err != nil { - e.logger.Error("could not dial", - zap.Error(err), - zap.String("multiaddr", ma.String()), + e.logger.Error("could not dial", + zap.Error(err), + zap.String("multiaddr", ma.String()), zap.Int("index", index), ) return nil, err @@ -919,7 +927,7 @@ func (e *DataClockConsensusEngine) createParallelDataWorkerClients() { return } e.clientsMx.Lock() - e.clients[index] = client + e.clients[index] = client e.clientsMx.Unlock() }() } @@ -939,7 +947,7 @@ func (e *DataClockConsensusEngine) tryReconnectDataWorkerClients() { wg.Add(parallelism) for i := 0; i < parallelism; i++ { index := i - + go func() { defer wg.Done() if e.clients[index] != nil { @@ -951,18 +959,18 @@ func (e *DataClockConsensusEngine) tryReconnectDataWorkerClients() { e.clientsMx.Lock() e.clients[index] = nil e.clientsMx.Unlock() - e.logger.Error("failed to connect to data worker", - zap.Error(err), + e.logger.Error("failed to connect to data worker", + zap.Error(err), zap.Int("index", index), ) time.Sleep(50 * time.Millisecond) continue } e.clientsMx.Lock() - e.logger.Info("reconnected to data worker", + e.logger.Info("reconnected to data worker", zap.Int("index", index), ) - e.clients[index] = client + e.clients[index] = client e.clientsMx.Unlock() break } diff --git a/node/consensus/data/message_handler.go b/node/consensus/data/message_handler.go index 4dfa8cf..2020b82 100644 --- a/node/consensus/data/message_handler.go +++ b/node/consensus/data/message_handler.go @@ -227,6 +227,12 @@ func (e *DataClockConsensusEngine) handleClockFrame( return nil } + if _, ok := e.recentlyProcessedFrames.Peek(string(frame.Output)); ok { + return nil + } + + e.recentlyProcessedFrames.Add(string(frame.Output), struct{}{}) + e.logger.Debug( "got clock frame", zap.Binary("address", address),