Add sync timeout and make it async (#356)

* Add sync timeout

* Make sync async
This commit is contained in:
petricadaipegsp 2024-11-17 00:50:25 +01:00 committed by GitHub
parent 8b18568bae
commit da5b7a6126
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 61 additions and 54 deletions

View File

@ -280,73 +280,65 @@ func (e *DataClockConsensusEngine) sync(
zap.Uint64("current_frame", latest.FrameNumber),
zap.Uint64("max_frame", maxFrame),
)
var cooperative bool = true
defer func() {
if cooperative {
return
}
e.peerMapMx.Lock()
defer e.peerMapMx.Unlock()
if _, ok := e.peerMap[string(peerId)]; ok {
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
e.uncooperativePeersMap[string(peerId)].timestamp = time.Now().UnixMilli()
delete(e.peerMap, string(peerId))
}
}()
cc, err := e.pubSub.GetDirectChannel(peerId, "sync")
if err != nil {
e.logger.Debug(
"could not establish direct channel",
zap.Error(err),
)
e.peerMapMx.Lock()
if _, ok := e.peerMap[string(peerId)]; ok {
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
e.uncooperativePeersMap[string(peerId)].timestamp = time.Now().UnixMilli()
delete(e.peerMap, string(peerId))
}
e.peerMapMx.Unlock()
cooperative = false
return latest, errors.Wrap(err, "sync")
}
defer func() {
if err := cc.Close(); err != nil {
e.logger.Error("error while closing connection", zap.Error(err))
}
}()
client := protobufs.NewDataServiceClient(cc)
for e.GetState() < consensus.EngineStateStopping {
ctx, cancel := context.WithTimeout(e.ctx, 2*time.Second)
response, err := client.GetDataFrame(
context.TODO(),
ctx,
&protobufs.GetDataFrameRequest{
FrameNumber: latest.FrameNumber + 1,
},
grpc.MaxCallRecvMsgSize(600*1024*1024),
)
cancel()
if err != nil {
e.logger.Debug(
"could not get frame",
zap.Error(err),
)
e.peerMapMx.Lock()
if _, ok := e.peerMap[string(peerId)]; ok {
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
e.uncooperativePeersMap[string(peerId)].timestamp = time.Now().UnixMilli()
delete(e.peerMap, string(peerId))
}
e.peerMapMx.Unlock()
if err := cc.Close(); err != nil {
e.logger.Error("error while closing connection", zap.Error(err))
}
cooperative = false
return latest, errors.Wrap(err, "sync")
}
if response == nil {
e.logger.Debug("received no response from peer")
if err := cc.Close(); err != nil {
e.logger.Error("error while closing connection", zap.Error(err))
}
return latest, nil
}
if response.ClockFrame == nil ||
response.ClockFrame.FrameNumber != latest.FrameNumber+1 ||
response.ClockFrame.Timestamp < latest.Timestamp {
e.logger.Debug("received invalid response from peer")
e.peerMapMx.Lock()
if _, ok := e.peerMap[string(peerId)]; ok {
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
e.uncooperativePeersMap[string(peerId)].timestamp = time.Now().UnixMilli()
delete(e.peerMap, string(peerId))
}
e.peerMapMx.Unlock()
if err := cc.Close(); err != nil {
e.logger.Error("error while closing connection", zap.Error(err))
}
cooperative = false
return latest, nil
}
e.logger.Info(
@ -357,13 +349,7 @@ func (e *DataClockConsensusEngine) sync(
if !e.IsInProverTrie(
response.ClockFrame.GetPublicKeySignatureEd448().PublicKey.KeyValue,
) {
e.peerMapMx.Lock()
if _, ok := e.peerMap[string(peerId)]; ok {
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
e.uncooperativePeersMap[string(peerId)].timestamp = time.Now().UnixMilli()
delete(e.peerMap, string(peerId))
}
e.peerMapMx.Unlock()
cooperative = false
}
if err := e.frameProver.VerifyDataClockFrame(
response.ClockFrame,
@ -373,11 +359,8 @@ func (e *DataClockConsensusEngine) sync(
e.dataTimeReel.Insert(response.ClockFrame, true)
latest = response.ClockFrame
if latest.FrameNumber >= maxFrame {
break
return latest, nil
}
}
if err := cc.Close(); err != nil {
e.logger.Error("error while closing connection", zap.Error(err))
}
return latest, nil
}

View File

@ -61,6 +61,10 @@ type ChannelServer = protobufs.DataService_GetPublicChannelServer
type DataClockConsensusEngine struct {
protobufs.UnimplementedDataServiceServer
ctx context.Context
cancel context.CancelFunc
lastProven uint64
difficulty uint32
config *config.Config
@ -126,6 +130,7 @@ type DataClockConsensusEngine struct {
previousFrameProven *protobufs.ClockFrame
previousTree *mt.MerkleTree
clientReconnectTest int
requestSyncCh chan *protobufs.ClockFrame
}
var _ consensus.DataConsensusEngine = (*DataClockConsensusEngine)(nil)
@ -215,7 +220,10 @@ func NewDataClockConsensusEngine(
rateLimit = 10
}
ctx, cancel := context.WithCancel(context.Background())
e := &DataClockConsensusEngine{
ctx: ctx,
cancel: cancel,
difficulty: difficulty,
logger: logger,
state: consensus.EngineStateStopped,
@ -256,6 +264,7 @@ func NewDataClockConsensusEngine(
rateLimit,
time.Minute,
),
requestSyncCh: make(chan *protobufs.ClockFrame, 1),
}
logger.Info("constructing consensus engine")
@ -479,6 +488,7 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
}()
go e.runLoop()
go e.runSync()
go func() {
time.Sleep(30 * time.Second)
e.logger.Info("checking for snapshots to play forward")
@ -558,7 +568,7 @@ func (e *DataClockConsensusEngine) PerformTimeProof(
go func() {
resp, err :=
client.client.CalculateChallengeProof(
context.Background(),
e.ctx,
&protobufs.ChallengeProofRequest{
PeerId: e.pubSub.GetPeerID(),
Core: uint32(i),
@ -594,6 +604,7 @@ func (e *DataClockConsensusEngine) PerformTimeProof(
func (e *DataClockConsensusEngine) Stop(force bool) <-chan error {
e.logger.Info("stopping ceremony consensus engine")
e.cancel()
e.stateMx.Lock()
e.state = consensus.EngineStateStopping
e.stateMx.Unlock()
@ -765,7 +776,7 @@ func (e *DataClockConsensusEngine) createParallelDataClientsFromListAndIndex(
return nil, errors.Wrap(err, "create parallel data client")
}
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second)
defer cancel()
conn, err := grpc.DialContext(
ctx,
@ -828,7 +839,7 @@ func (
return nil, errors.Wrap(err, "create parallel data client")
}
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second)
defer cancel()
conn, err := grpc.DialContext(
ctx,
@ -880,7 +891,7 @@ func (e *DataClockConsensusEngine) createParallelDataClientsFromList() (
continue
}
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second)
defer cancel()
conn, err := grpc.DialContext(
ctx,
@ -943,7 +954,7 @@ func (e *DataClockConsensusEngine) createParallelDataClientsFromBaseMultiaddr(
e.logger.Error("could not get dial args", zap.Error(err))
continue
}
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second)
defer cancel()
conn, err := grpc.DialContext(
ctx,

View File

@ -41,6 +41,19 @@ func (
return frameProverTries
}
func (e *DataClockConsensusEngine) runSync() {
for {
select {
case <-e.ctx.Done():
return
case enqueuedFrame := <-e.requestSyncCh:
if _, err := e.collect(enqueuedFrame); err != nil {
e.logger.Error("could not collect", zap.Error(err))
}
}
}
}
func (e *DataClockConsensusEngine) runLoop() {
dataFrameCh := e.dataTimeReel.NewFrameCh()
runOnce := true
@ -103,8 +116,9 @@ func (e *DataClockConsensusEngine) processFrame(
)
var err error
if !e.GetFrameProverTries()[0].Contains(e.provingKeyBytes) {
if latestFrame, err = e.collect(dataFrame); err != nil {
e.logger.Error("could not collect", zap.Error(err))
select {
case e.requestSyncCh <- dataFrame:
default:
}
}

View File

@ -652,7 +652,7 @@ func (e *DataClockConsensusEngine) GetPublicChannelForProvingKey(
}
client := protobufs.NewDataServiceClient(cc)
s, err := client.GetPublicChannel(
context.Background(),
e.ctx,
grpc.MaxCallSendMsgSize(600*1024*1024),
grpc.MaxCallRecvMsgSize(600*1024*1024),
)

View File

@ -2,7 +2,6 @@ package data
import (
"bytes"
"context"
"encoding/binary"
"strings"
"time"
@ -127,7 +126,7 @@ func (e *DataClockConsensusEngine) runPreMidnightProofWorker() {
if bytes.Equal(resume, make([]byte, 32)) {
status, err := client.GetPreMidnightMintStatus(
context.Background(),
e.ctx,
&protobufs.PreMidnightMintStatusRequest{
Owner: addr,
},
@ -210,7 +209,7 @@ func (e *DataClockConsensusEngine) runPreMidnightProofWorker() {
}
resp, err := client.HandlePreMidnightMint(
context.Background(),
e.ctx,
&protobufs.MintCoinRequest{
Proofs: proofs,
Signature: &protobufs.Ed448Signature{