diff --git a/node/consensus/data/consensus_frames.go b/node/consensus/data/consensus_frames.go index c05bff5..1012c10 100644 --- a/node/consensus/data/consensus_frames.go +++ b/node/consensus/data/consensus_frames.go @@ -2,6 +2,7 @@ package data import ( "bytes" + "context" "time" "golang.org/x/crypto/sha3" @@ -9,12 +10,51 @@ import ( "github.com/pkg/errors" "go.uber.org/zap" + "google.golang.org/grpc" "google.golang.org/protobuf/proto" "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token/application" "source.quilibrium.com/quilibrium/monorepo/node/p2p" "source.quilibrium.com/quilibrium/monorepo/node/protobufs" ) +func (e *DataClockConsensusEngine) collect( + enqueuedFrame *protobufs.ClockFrame, +) (*protobufs.ClockFrame, error) { + e.logger.Info("collecting vdf proofs") + + latest := enqueuedFrame + + for { + peerId, maxFrame, err := e.GetMostAheadPeer(latest.FrameNumber) + if maxFrame > latest.FrameNumber { + e.syncingStatus = SyncStatusSynchronizing + if err != nil { + e.logger.Info("no peers available for sync, waiting") + time.Sleep(5 * time.Second) + } else if maxFrame > latest.FrameNumber { + if maxFrame-latest.FrameNumber > 100 { + maxFrame = latest.FrameNumber + 100 + } + latest, err = e.sync(latest, maxFrame, peerId) + if err == nil { + break + } + } + } else { + break + } + } + + e.syncingStatus = SyncStatusNotSyncing + + e.logger.Info( + "returning leader frame", + zap.Uint64("frame_number", latest.FrameNumber), + ) + + return latest, nil +} + func (e *DataClockConsensusEngine) prove( previousFrame *protobufs.ClockFrame, ) (*protobufs.ClockFrame, error) { @@ -167,7 +207,7 @@ func (e *DataClockConsensusEngine) GetMostAheadPeer( uint64, error, ) { - e.logger.Info( + e.logger.Debug( "checking peer list", zap.Int("peers", len(e.peerMap)), zap.Int("uncooperative_peers", len(e.uncooperativePeersMap)), @@ -205,3 +245,111 @@ func (e *DataClockConsensusEngine) GetMostAheadPeer( return peer, max, nil } + +func (e *DataClockConsensusEngine) sync( + currentLatest *protobufs.ClockFrame, + maxFrame uint64, + peerId []byte, +) (*protobufs.ClockFrame, error) { + latest := currentLatest + e.logger.Info("polling peer for new frames", zap.Binary("peer_id", peerId)) + cc, err := e.pubSub.GetDirectChannel(peerId, "sync") + if err != nil { + e.logger.Debug( + "could not establish direct channel", + zap.Error(err), + ) + e.peerMapMx.Lock() + if _, ok := e.peerMap[string(peerId)]; ok { + e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)] + e.uncooperativePeersMap[string(peerId)].timestamp = time.Now().UnixMilli() + delete(e.peerMap, string(peerId)) + } + e.peerMapMx.Unlock() + return latest, errors.Wrap(err, "sync") + } + + client := protobufs.NewDataServiceClient(cc) + + for { + response, err := client.GetDataFrame( + context.TODO(), + &protobufs.GetDataFrameRequest{ + FrameNumber: latest.FrameNumber + 1, + }, + grpc.MaxCallRecvMsgSize(600*1024*1024), + ) + if err != nil { + e.logger.Debug( + "could not get frame", + zap.Error(err), + ) + e.peerMapMx.Lock() + if _, ok := e.peerMap[string(peerId)]; ok { + e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)] + e.uncooperativePeersMap[string(peerId)].timestamp = time.Now().UnixMilli() + delete(e.peerMap, string(peerId)) + } + e.peerMapMx.Unlock() + if err := cc.Close(); err != nil { + e.logger.Error("error while closing connection", zap.Error(err)) + } + return latest, errors.Wrap(err, "sync") + } + + if response == nil { + e.logger.Debug("received no response from peer") + if err := cc.Close(); err != nil { + e.logger.Error("error while closing connection", zap.Error(err)) + } + return latest, nil + } + + if response.ClockFrame == nil || + response.ClockFrame.FrameNumber != latest.FrameNumber+1 || + + response.ClockFrame.Timestamp < latest.Timestamp { + e.logger.Debug("received invalid response from peer") + e.peerMapMx.Lock() + if _, ok := e.peerMap[string(peerId)]; ok { + e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)] + e.uncooperativePeersMap[string(peerId)].timestamp = time.Now().UnixMilli() + delete(e.peerMap, string(peerId)) + } + e.peerMapMx.Unlock() + if err := cc.Close(); err != nil { + e.logger.Error("error while closing connection", zap.Error(err)) + } + return latest, nil + } + e.logger.Info( + "received new leading frame", + zap.Uint64("frame_number", response.ClockFrame.FrameNumber), + ) + if !e.IsInProverTrie( + response.ClockFrame.GetPublicKeySignatureEd448().PublicKey.KeyValue, + ) { + e.peerMapMx.Lock() + if _, ok := e.peerMap[string(peerId)]; ok { + e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)] + e.uncooperativePeersMap[string(peerId)].timestamp = time.Now().UnixMilli() + delete(e.peerMap, string(peerId)) + } + e.peerMapMx.Unlock() + } + if err := e.frameProver.VerifyDataClockFrame( + response.ClockFrame, + ); err != nil { + return nil, errors.Wrap(err, "sync") + } + e.dataTimeReel.Insert(response.ClockFrame, true) + latest = response.ClockFrame + if latest.FrameNumber >= maxFrame { + break + } + } + if err := cc.Close(); err != nil { + e.logger.Error("error while closing connection", zap.Error(err)) + } + return latest, nil +} diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index 3172e7f..316b39b 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -302,6 +302,20 @@ func (e *DataClockConsensusEngine) Start() <-chan error { e.logger.Info("subscribing to pubsub messages") e.pubSub.Subscribe(e.filter, e.handleMessage) + go func() { + server := grpc.NewServer( + grpc.MaxSendMsgSize(600*1024*1024), + grpc.MaxRecvMsgSize(600*1024*1024), + ) + protobufs.RegisterDataServiceServer(server, e) + if err := e.pubSub.StartDirectChannelListener( + e.pubSub.GetPeerID(), + "sync", + server, + ); err != nil { + panic(err) + } + }() go func() { if e.dataTimeReel.GetFrameProverTries()[0].Contains(e.provingKeyAddress) { diff --git a/node/consensus/data/main_data_loop.go b/node/consensus/data/main_data_loop.go index 57bb452..55064b1 100644 --- a/node/consensus/data/main_data_loop.go +++ b/node/consensus/data/main_data_loop.go @@ -61,6 +61,12 @@ func (e *DataClockConsensusEngine) runLoop() { "current frame head", zap.Uint64("frame_number", dataFrame.FrameNumber), ) + if !e.IsInProverTrie(e.provingKeyBytes) { + if latestFrame, err = e.collect(dataFrame); err != nil { + e.logger.Error("could not collect", zap.Error(err)) + } + } + if latestFrame != nil && dataFrame.FrameNumber > latestFrame.FrameNumber { latestFrame = dataFrame @@ -172,6 +178,12 @@ func (e *DataClockConsensusEngine) runLoop() { zap.Uint64("frame_number", dataFrame.FrameNumber), ) + if !e.IsInProverTrie(e.provingKeyBytes) { + if latestFrame, err = e.collect(dataFrame); err != nil { + e.logger.Error("could not collect", zap.Error(err)) + } + } + if latestFrame == nil || latestFrame.FrameNumber < dataFrame.FrameNumber { latestFrame, err = e.dataTimeReel.Head() diff --git a/node/consensus/data/message_handler.go b/node/consensus/data/message_handler.go index 21bd016..6fee6c0 100644 --- a/node/consensus/data/message_handler.go +++ b/node/consensus/data/message_handler.go @@ -192,13 +192,15 @@ func (e *DataClockConsensusEngine) handleRebroadcast( } for _, frame := range frames.ClockFrames { - if head.FrameNumber > frame.FrameNumber { + if head.FrameNumber >= frame.FrameNumber { continue } e.logger.Info("receiving synchronization data") if err := e.handleClockFrame(peerID, address, frame); err != nil { + // if they're sending invalid clock frames, nuke them. + e.pubSub.AddPeerScore(peerID, -100000) return errors.Wrap(err, "handle rebroadcast") } }