From da5b7a6126edce5590ae4b2497625edec1ebe33b Mon Sep 17 00:00:00 2001 From: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> Date: Sun, 17 Nov 2024 00:50:25 +0100 Subject: [PATCH] Add sync timeout and make it async (#356) * Add sync timeout * Make sync async --- node/consensus/data/consensus_frames.go | 69 +++++++------------ .../data/data_clock_consensus_engine.go | 21 ++++-- node/consensus/data/main_data_loop.go | 18 ++++- node/consensus/data/peer_messaging.go | 2 +- .../data/pre_midnight_proof_worker.go | 5 +- 5 files changed, 61 insertions(+), 54 deletions(-) diff --git a/node/consensus/data/consensus_frames.go b/node/consensus/data/consensus_frames.go index a905179..103cd02 100644 --- a/node/consensus/data/consensus_frames.go +++ b/node/consensus/data/consensus_frames.go @@ -280,73 +280,65 @@ func (e *DataClockConsensusEngine) sync( zap.Uint64("current_frame", latest.FrameNumber), zap.Uint64("max_frame", maxFrame), ) + var cooperative bool = true + defer func() { + if cooperative { + return + } + e.peerMapMx.Lock() + defer e.peerMapMx.Unlock() + if _, ok := e.peerMap[string(peerId)]; ok { + e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)] + e.uncooperativePeersMap[string(peerId)].timestamp = time.Now().UnixMilli() + delete(e.peerMap, string(peerId)) + } + }() cc, err := e.pubSub.GetDirectChannel(peerId, "sync") if err != nil { e.logger.Debug( "could not establish direct channel", zap.Error(err), ) - e.peerMapMx.Lock() - if _, ok := e.peerMap[string(peerId)]; ok { - e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)] - e.uncooperativePeersMap[string(peerId)].timestamp = time.Now().UnixMilli() - delete(e.peerMap, string(peerId)) - } - e.peerMapMx.Unlock() + cooperative = false return latest, errors.Wrap(err, "sync") } + defer func() { + if err := cc.Close(); err != nil { + e.logger.Error("error while closing connection", zap.Error(err)) + } + }() client := protobufs.NewDataServiceClient(cc) for e.GetState() < consensus.EngineStateStopping { + ctx, cancel := context.WithTimeout(e.ctx, 2*time.Second) response, err := client.GetDataFrame( - context.TODO(), + ctx, &protobufs.GetDataFrameRequest{ FrameNumber: latest.FrameNumber + 1, }, grpc.MaxCallRecvMsgSize(600*1024*1024), ) + cancel() if err != nil { e.logger.Debug( "could not get frame", zap.Error(err), ) - e.peerMapMx.Lock() - if _, ok := e.peerMap[string(peerId)]; ok { - e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)] - e.uncooperativePeersMap[string(peerId)].timestamp = time.Now().UnixMilli() - delete(e.peerMap, string(peerId)) - } - e.peerMapMx.Unlock() - if err := cc.Close(); err != nil { - e.logger.Error("error while closing connection", zap.Error(err)) - } + cooperative = false return latest, errors.Wrap(err, "sync") } if response == nil { e.logger.Debug("received no response from peer") - if err := cc.Close(); err != nil { - e.logger.Error("error while closing connection", zap.Error(err)) - } return latest, nil } if response.ClockFrame == nil || response.ClockFrame.FrameNumber != latest.FrameNumber+1 || - response.ClockFrame.Timestamp < latest.Timestamp { e.logger.Debug("received invalid response from peer") - e.peerMapMx.Lock() - if _, ok := e.peerMap[string(peerId)]; ok { - e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)] - e.uncooperativePeersMap[string(peerId)].timestamp = time.Now().UnixMilli() - delete(e.peerMap, string(peerId)) - } - e.peerMapMx.Unlock() - if err := cc.Close(); err != nil { - e.logger.Error("error while closing connection", zap.Error(err)) - } + cooperative = false return latest, nil } e.logger.Info( @@ -357,13 +349,7 @@ func (e *DataClockConsensusEngine) sync( if !e.IsInProverTrie( response.ClockFrame.GetPublicKeySignatureEd448().PublicKey.KeyValue, ) { - e.peerMapMx.Lock() - if _, ok := e.peerMap[string(peerId)]; ok { - e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)] - e.uncooperativePeersMap[string(peerId)].timestamp = time.Now().UnixMilli() - delete(e.peerMap, string(peerId)) - } - e.peerMapMx.Unlock() + cooperative = false } if err := e.frameProver.VerifyDataClockFrame( response.ClockFrame, @@ -373,11 +359,8 @@ func (e *DataClockConsensusEngine) sync( e.dataTimeReel.Insert(response.ClockFrame, true) latest = response.ClockFrame if latest.FrameNumber >= maxFrame { - break + return latest, nil } } - if err := cc.Close(); err != nil { - e.logger.Error("error while closing connection", zap.Error(err)) - } return latest, nil } diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index d34a718..047a920 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -61,6 +61,10 @@ type ChannelServer = protobufs.DataService_GetPublicChannelServer type DataClockConsensusEngine struct { protobufs.UnimplementedDataServiceServer + + ctx context.Context + cancel context.CancelFunc + lastProven uint64 difficulty uint32 config *config.Config @@ -126,6 +130,7 @@ type DataClockConsensusEngine struct { previousFrameProven *protobufs.ClockFrame previousTree *mt.MerkleTree clientReconnectTest int + requestSyncCh chan *protobufs.ClockFrame } var _ consensus.DataConsensusEngine = (*DataClockConsensusEngine)(nil) @@ -215,7 +220,10 @@ func NewDataClockConsensusEngine( rateLimit = 10 } + ctx, cancel := context.WithCancel(context.Background()) e := &DataClockConsensusEngine{ + ctx: ctx, + cancel: cancel, difficulty: difficulty, logger: logger, state: consensus.EngineStateStopped, @@ -256,6 +264,7 @@ func NewDataClockConsensusEngine( rateLimit, time.Minute, ), + requestSyncCh: make(chan *protobufs.ClockFrame, 1), } logger.Info("constructing consensus engine") @@ -479,6 +488,7 @@ func (e *DataClockConsensusEngine) Start() <-chan error { }() go e.runLoop() + go e.runSync() go func() { time.Sleep(30 * time.Second) e.logger.Info("checking for snapshots to play forward") @@ -558,7 +568,7 @@ func (e *DataClockConsensusEngine) PerformTimeProof( go func() { resp, err := client.client.CalculateChallengeProof( - context.Background(), + e.ctx, &protobufs.ChallengeProofRequest{ PeerId: e.pubSub.GetPeerID(), Core: uint32(i), @@ -594,6 +604,7 @@ func (e *DataClockConsensusEngine) PerformTimeProof( func (e *DataClockConsensusEngine) Stop(force bool) <-chan error { e.logger.Info("stopping ceremony consensus engine") + e.cancel() e.stateMx.Lock() e.state = consensus.EngineStateStopping e.stateMx.Unlock() @@ -765,7 +776,7 @@ func (e *DataClockConsensusEngine) createParallelDataClientsFromListAndIndex( return nil, errors.Wrap(err, "create parallel data client") } - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second) defer cancel() conn, err := grpc.DialContext( ctx, @@ -828,7 +839,7 @@ func ( return nil, errors.Wrap(err, "create parallel data client") } - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second) defer cancel() conn, err := grpc.DialContext( ctx, @@ -880,7 +891,7 @@ func (e *DataClockConsensusEngine) createParallelDataClientsFromList() ( continue } - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second) defer cancel() conn, err := grpc.DialContext( ctx, @@ -943,7 +954,7 @@ func (e *DataClockConsensusEngine) createParallelDataClientsFromBaseMultiaddr( e.logger.Error("could not get dial args", zap.Error(err)) continue } - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second) defer cancel() conn, err := grpc.DialContext( ctx, diff --git a/node/consensus/data/main_data_loop.go b/node/consensus/data/main_data_loop.go index 92ef5d7..169a8f7 100644 --- a/node/consensus/data/main_data_loop.go +++ b/node/consensus/data/main_data_loop.go @@ -41,6 +41,19 @@ func ( return frameProverTries } +func (e *DataClockConsensusEngine) runSync() { + for { + select { + case <-e.ctx.Done(): + return + case enqueuedFrame := <-e.requestSyncCh: + if _, err := e.collect(enqueuedFrame); err != nil { + e.logger.Error("could not collect", zap.Error(err)) + } + } + } +} + func (e *DataClockConsensusEngine) runLoop() { dataFrameCh := e.dataTimeReel.NewFrameCh() runOnce := true @@ -103,8 +116,9 @@ func (e *DataClockConsensusEngine) processFrame( ) var err error if !e.GetFrameProverTries()[0].Contains(e.provingKeyBytes) { - if latestFrame, err = e.collect(dataFrame); err != nil { - e.logger.Error("could not collect", zap.Error(err)) + select { + case e.requestSyncCh <- dataFrame: + default: } } diff --git a/node/consensus/data/peer_messaging.go b/node/consensus/data/peer_messaging.go index 9f6e2b6..430181b 100644 --- a/node/consensus/data/peer_messaging.go +++ b/node/consensus/data/peer_messaging.go @@ -652,7 +652,7 @@ func (e *DataClockConsensusEngine) GetPublicChannelForProvingKey( } client := protobufs.NewDataServiceClient(cc) s, err := client.GetPublicChannel( - context.Background(), + e.ctx, grpc.MaxCallSendMsgSize(600*1024*1024), grpc.MaxCallRecvMsgSize(600*1024*1024), ) diff --git a/node/consensus/data/pre_midnight_proof_worker.go b/node/consensus/data/pre_midnight_proof_worker.go index d0161a5..24ef1fb 100644 --- a/node/consensus/data/pre_midnight_proof_worker.go +++ b/node/consensus/data/pre_midnight_proof_worker.go @@ -2,7 +2,6 @@ package data import ( "bytes" - "context" "encoding/binary" "strings" "time" @@ -127,7 +126,7 @@ func (e *DataClockConsensusEngine) runPreMidnightProofWorker() { if bytes.Equal(resume, make([]byte, 32)) { status, err := client.GetPreMidnightMintStatus( - context.Background(), + e.ctx, &protobufs.PreMidnightMintStatusRequest{ Owner: addr, }, @@ -210,7 +209,7 @@ func (e *DataClockConsensusEngine) runPreMidnightProofWorker() { } resp, err := client.HandlePreMidnightMint( - context.Background(), + e.ctx, &protobufs.MintCoinRequest{ Proofs: proofs, Signature: &protobufs.Ed448Signature{