reintroduce sync

This commit is contained in:
Cassandra Heart 2024-10-26 23:54:23 -05:00
parent 5e5c421ce9
commit 730e0b9f68
No known key found for this signature in database
GPG Key ID: 6352152859385958
4 changed files with 178 additions and 2 deletions

View File

@ -2,6 +2,7 @@ package data
import (
"bytes"
"context"
"time"
"golang.org/x/crypto/sha3"
@ -9,12 +10,51 @@ import (
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
"source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token/application"
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
)
func (e *DataClockConsensusEngine) collect(
enqueuedFrame *protobufs.ClockFrame,
) (*protobufs.ClockFrame, error) {
e.logger.Info("collecting vdf proofs")
latest := enqueuedFrame
for {
peerId, maxFrame, err := e.GetMostAheadPeer(latest.FrameNumber)
if maxFrame > latest.FrameNumber {
e.syncingStatus = SyncStatusSynchronizing
if err != nil {
e.logger.Info("no peers available for sync, waiting")
time.Sleep(5 * time.Second)
} else if maxFrame > latest.FrameNumber {
if maxFrame-latest.FrameNumber > 100 {
maxFrame = latest.FrameNumber + 100
}
latest, err = e.sync(latest, maxFrame, peerId)
if err == nil {
break
}
}
} else {
break
}
}
e.syncingStatus = SyncStatusNotSyncing
e.logger.Info(
"returning leader frame",
zap.Uint64("frame_number", latest.FrameNumber),
)
return latest, nil
}
func (e *DataClockConsensusEngine) prove(
previousFrame *protobufs.ClockFrame,
) (*protobufs.ClockFrame, error) {
@ -167,7 +207,7 @@ func (e *DataClockConsensusEngine) GetMostAheadPeer(
uint64,
error,
) {
e.logger.Info(
e.logger.Debug(
"checking peer list",
zap.Int("peers", len(e.peerMap)),
zap.Int("uncooperative_peers", len(e.uncooperativePeersMap)),
@ -205,3 +245,111 @@ func (e *DataClockConsensusEngine) GetMostAheadPeer(
return peer, max, nil
}
func (e *DataClockConsensusEngine) sync(
currentLatest *protobufs.ClockFrame,
maxFrame uint64,
peerId []byte,
) (*protobufs.ClockFrame, error) {
latest := currentLatest
e.logger.Info("polling peer for new frames", zap.Binary("peer_id", peerId))
cc, err := e.pubSub.GetDirectChannel(peerId, "sync")
if err != nil {
e.logger.Debug(
"could not establish direct channel",
zap.Error(err),
)
e.peerMapMx.Lock()
if _, ok := e.peerMap[string(peerId)]; ok {
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
e.uncooperativePeersMap[string(peerId)].timestamp = time.Now().UnixMilli()
delete(e.peerMap, string(peerId))
}
e.peerMapMx.Unlock()
return latest, errors.Wrap(err, "sync")
}
client := protobufs.NewDataServiceClient(cc)
for {
response, err := client.GetDataFrame(
context.TODO(),
&protobufs.GetDataFrameRequest{
FrameNumber: latest.FrameNumber + 1,
},
grpc.MaxCallRecvMsgSize(600*1024*1024),
)
if err != nil {
e.logger.Debug(
"could not get frame",
zap.Error(err),
)
e.peerMapMx.Lock()
if _, ok := e.peerMap[string(peerId)]; ok {
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
e.uncooperativePeersMap[string(peerId)].timestamp = time.Now().UnixMilli()
delete(e.peerMap, string(peerId))
}
e.peerMapMx.Unlock()
if err := cc.Close(); err != nil {
e.logger.Error("error while closing connection", zap.Error(err))
}
return latest, errors.Wrap(err, "sync")
}
if response == nil {
e.logger.Debug("received no response from peer")
if err := cc.Close(); err != nil {
e.logger.Error("error while closing connection", zap.Error(err))
}
return latest, nil
}
if response.ClockFrame == nil ||
response.ClockFrame.FrameNumber != latest.FrameNumber+1 ||
response.ClockFrame.Timestamp < latest.Timestamp {
e.logger.Debug("received invalid response from peer")
e.peerMapMx.Lock()
if _, ok := e.peerMap[string(peerId)]; ok {
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
e.uncooperativePeersMap[string(peerId)].timestamp = time.Now().UnixMilli()
delete(e.peerMap, string(peerId))
}
e.peerMapMx.Unlock()
if err := cc.Close(); err != nil {
e.logger.Error("error while closing connection", zap.Error(err))
}
return latest, nil
}
e.logger.Info(
"received new leading frame",
zap.Uint64("frame_number", response.ClockFrame.FrameNumber),
)
if !e.IsInProverTrie(
response.ClockFrame.GetPublicKeySignatureEd448().PublicKey.KeyValue,
) {
e.peerMapMx.Lock()
if _, ok := e.peerMap[string(peerId)]; ok {
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
e.uncooperativePeersMap[string(peerId)].timestamp = time.Now().UnixMilli()
delete(e.peerMap, string(peerId))
}
e.peerMapMx.Unlock()
}
if err := e.frameProver.VerifyDataClockFrame(
response.ClockFrame,
); err != nil {
return nil, errors.Wrap(err, "sync")
}
e.dataTimeReel.Insert(response.ClockFrame, true)
latest = response.ClockFrame
if latest.FrameNumber >= maxFrame {
break
}
}
if err := cc.Close(); err != nil {
e.logger.Error("error while closing connection", zap.Error(err))
}
return latest, nil
}

View File

@ -302,6 +302,20 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
e.logger.Info("subscribing to pubsub messages")
e.pubSub.Subscribe(e.filter, e.handleMessage)
go func() {
server := grpc.NewServer(
grpc.MaxSendMsgSize(600*1024*1024),
grpc.MaxRecvMsgSize(600*1024*1024),
)
protobufs.RegisterDataServiceServer(server, e)
if err := e.pubSub.StartDirectChannelListener(
e.pubSub.GetPeerID(),
"sync",
server,
); err != nil {
panic(err)
}
}()
go func() {
if e.dataTimeReel.GetFrameProverTries()[0].Contains(e.provingKeyAddress) {

View File

@ -61,6 +61,12 @@ func (e *DataClockConsensusEngine) runLoop() {
"current frame head",
zap.Uint64("frame_number", dataFrame.FrameNumber),
)
if !e.IsInProverTrie(e.provingKeyBytes) {
if latestFrame, err = e.collect(dataFrame); err != nil {
e.logger.Error("could not collect", zap.Error(err))
}
}
if latestFrame != nil &&
dataFrame.FrameNumber > latestFrame.FrameNumber {
latestFrame = dataFrame
@ -172,6 +178,12 @@ func (e *DataClockConsensusEngine) runLoop() {
zap.Uint64("frame_number", dataFrame.FrameNumber),
)
if !e.IsInProverTrie(e.provingKeyBytes) {
if latestFrame, err = e.collect(dataFrame); err != nil {
e.logger.Error("could not collect", zap.Error(err))
}
}
if latestFrame == nil ||
latestFrame.FrameNumber < dataFrame.FrameNumber {
latestFrame, err = e.dataTimeReel.Head()

View File

@ -192,13 +192,15 @@ func (e *DataClockConsensusEngine) handleRebroadcast(
}
for _, frame := range frames.ClockFrames {
if head.FrameNumber > frame.FrameNumber {
if head.FrameNumber >= frame.FrameNumber {
continue
}
e.logger.Info("receiving synchronization data")
if err := e.handleClockFrame(peerID, address, frame); err != nil {
// if they're sending invalid clock frames, nuke them.
e.pubSub.AddPeerScore(peerID, -100000)
return errors.Wrap(err, "handle rebroadcast")
}
}