From 5333b4a8cceea6c089fa2efa33af09ce41d9575e Mon Sep 17 00:00:00 2001 From: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> Date: Wed, 13 Nov 2024 17:43:28 +0100 Subject: [PATCH] Sample peers which are ahead during sync (#337) --- node/consensus/data/consensus_frames.go | 88 ++++++++++--------- .../consensus/data/internal/peer_candidate.go | 25 ++++++ node/consensus/data/internal/random.go | 69 +++++++++++++++ node/consensus/data/internal/random_test.go | 57 ++++++++++++ 4 files changed, 198 insertions(+), 41 deletions(-) create mode 100644 node/consensus/data/internal/peer_candidate.go create mode 100644 node/consensus/data/internal/random.go create mode 100644 node/consensus/data/internal/random_test.go diff --git a/node/consensus/data/consensus_frames.go b/node/consensus/data/consensus_frames.go index 1688702..bb71b06 100644 --- a/node/consensus/data/consensus_frames.go +++ b/node/consensus/data/consensus_frames.go @@ -8,13 +8,13 @@ import ( "golang.org/x/crypto/sha3" "source.quilibrium.com/quilibrium/monorepo/node/config" "source.quilibrium.com/quilibrium/monorepo/node/consensus" + "source.quilibrium.com/quilibrium/monorepo/node/consensus/data/internal" "github.com/pkg/errors" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/protobuf/proto" "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token/application" - "source.quilibrium.com/quilibrium/monorepo/node/p2p" "source.quilibrium.com/quilibrium/monorepo/node/protobufs" ) @@ -24,30 +24,24 @@ func (e *DataClockConsensusEngine) collect( e.logger.Info("collecting vdf proofs") latest := enqueuedFrame - for { - peerId, maxFrame, err := e.GetMostAheadPeer(e.latestFrameReceived) - if maxFrame > e.latestFrameReceived { - e.syncingStatus = SyncStatusSynchronizing - if err != nil { - e.logger.Info("no peers available for sync, waiting") - time.Sleep(5 * time.Second) - } else if maxFrame > e.latestFrameReceived { - if maxFrame-e.latestFrameReceived > 100 { - maxFrame = e.latestFrameReceived + 100 - } - latest, err = e.sync(latest, maxFrame, peerId) - if err == nil { - break - } - } - } else { + candidates := e.GetAheadPeers(max(latest.FrameNumber, e.latestFrameReceived)) + if len(candidates) == 0 { break } + for _, candidate := range candidates { + if candidate.MaxFrame <= max(latest.FrameNumber, e.latestFrameReceived) { + continue + } + var err error + latest, err = e.sync(latest, candidate.MaxFrame, candidate.PeerID) + if err != nil { + e.logger.Debug("error syncing frame", zap.Error(err)) + continue + } + } } - e.syncingStatus = SyncStatusNotSyncing - e.logger.Info( "returning leader frame", zap.Uint64("frame_number", latest.FrameNumber), @@ -213,13 +207,11 @@ func (e *DataClockConsensusEngine) prove( return frame, nil } -func (e *DataClockConsensusEngine) GetMostAheadPeer( - frameNumber uint64, -) ( - []byte, - uint64, - error, -) { +func (e *DataClockConsensusEngine) GetAheadPeers(frameNumber uint64) []internal.PeerCandidate { + if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) { + return nil + } + e.logger.Debug( "checking peer list", zap.Int("peers", len(e.peerMap)), @@ -227,12 +219,9 @@ func (e *DataClockConsensusEngine) GetMostAheadPeer( zap.Uint64("current_head_frame", frameNumber), ) - if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) { - return e.pubSub.GetPeerID(), frameNumber, nil - } + candidates := make([]internal.WeightedPeerCandidate, 0, len(e.peerMap)) + maxDiff := uint64(0) - max := frameNumber - var peer []byte = nil e.peerMapMx.RLock() for _, v := range e.peerMap { e.logger.Debug( @@ -242,21 +231,36 @@ func (e *DataClockConsensusEngine) GetMostAheadPeer( zap.Int64("timestamp", v.timestamp), zap.Binary("version", v.version), ) - _, ok := e.uncooperativePeersMap[string(v.peerId)] - if v.maxFrame > max && - v.timestamp > config.GetMinimumVersionCutoff().UnixMilli() && - bytes.Compare(v.version, config.GetMinimumVersion()) >= 0 && !ok { - peer = v.peerId - max = v.maxFrame + if v.maxFrame <= frameNumber { + continue } + if _, ok := e.uncooperativePeersMap[string(v.peerId)]; ok { + continue + } + if v.timestamp <= config.GetMinimumVersionCutoff().UnixMilli() { + continue + } + if bytes.Compare(v.version, config.GetMinimumVersion()) < 0 { + continue + } + maxDiff = max(maxDiff, v.maxFrame-frameNumber) + candidates = append(candidates, internal.WeightedPeerCandidate{ + PeerCandidate: internal.PeerCandidate{ + PeerID: v.peerId, + MaxFrame: v.maxFrame, + }, + }) } e.peerMapMx.RUnlock() - if peer == nil { - return nil, 0, p2p.ErrNoPeersAvailable + if len(candidates) == 0 { + return nil } - return peer, max, nil + for i := range candidates { + candidates[i].Weight = float64(candidates[i].MaxFrame-frameNumber) / float64(maxDiff) + } + return internal.WeightedSampleWithoutReplacement(candidates, len(candidates)) } func (e *DataClockConsensusEngine) sync( @@ -264,6 +268,8 @@ func (e *DataClockConsensusEngine) sync( maxFrame uint64, peerId []byte, ) (*protobufs.ClockFrame, error) { + e.syncingStatus = SyncStatusSynchronizing + defer func() { e.syncingStatus = SyncStatusNotSyncing }() latest := currentLatest e.logger.Info("polling peer for new frames", zap.Binary("peer_id", peerId)) cc, err := e.pubSub.GetDirectChannel(peerId, "sync") diff --git a/node/consensus/data/internal/peer_candidate.go b/node/consensus/data/internal/peer_candidate.go new file mode 100644 index 0000000..ea88e1f --- /dev/null +++ b/node/consensus/data/internal/peer_candidate.go @@ -0,0 +1,25 @@ +package internal + +// PeerCandidate is a candidate for a peer to be used for syncing. +type PeerCandidate struct { + PeerID []byte + MaxFrame uint64 +} + +// WeightedPeerCandidate is a weighted peer candidate. +type WeightedPeerCandidate struct { + PeerCandidate + Weight float64 +} + +var _ Weighted[PeerCandidate] = (*WeightedPeerCandidate)(nil) + +// GetItem implements Weighted[PeerCandidate]. +func (p WeightedPeerCandidate) GetItem() PeerCandidate { + return p.PeerCandidate +} + +// GetWeight implements Weighted[PeerCandidate]. +func (p WeightedPeerCandidate) GetWeight() float64 { + return p.Weight +} diff --git a/node/consensus/data/internal/random.go b/node/consensus/data/internal/random.go new file mode 100644 index 0000000..79045f8 --- /dev/null +++ b/node/consensus/data/internal/random.go @@ -0,0 +1,69 @@ +package internal + +import ( + "math" + "math/rand" + "sort" +) + +// Weighted is an interface for items that have a weight. +type Weighted[T any] interface { + GetItem() T + GetWeight() float64 +} + +type weightedSort[T any] struct { + items []T + weights []float64 +} + +var _ sort.Interface = (*weightedSort[any])(nil) + +// Len implements sort.Interface. +func (w weightedSort[T]) Len() int { + return len(w.items) +} + +// Less implements sort.Interface. +func (w weightedSort[T]) Less(i, j int) bool { + return w.weights[i] >= w.weights[j] +} + +// Swap implements sort.Interface. +func (w weightedSort[T]) Swap(i, j int) { + w.items[i], w.items[j] = w.items[j], w.items[i] + w.weights[i], w.weights[j] = w.weights[j], w.weights[i] +} + +// WeightedSampleWithoutReplacementWithSource samples without replacement +// from a list of weighted items using a given random source. +// Based on work by Efraimidis and Spirakis. +func WeightedSampleWithoutReplacementWithSource[T any, W Weighted[T]]( + items []W, + sampleSize int, + random *rand.Rand, +) []T { + ws := weightedSort[T]{ + items: make([]T, len(items)), + weights: make([]float64, len(items)), + } + for i, item := range items { + ws.items[i] = item.GetItem() + ws.weights[i] = math.Pow(random.Float64(), 1.0/item.GetWeight()) + } + sort.Sort(ws) + return ws.items[:sampleSize] +} + +// WeightedSampleWithoutReplacement samples without replacement from a list +// of weighted items. +func WeightedSampleWithoutReplacement[T any, W Weighted[T]]( + items []W, + sampleSize int, +) []T { + return WeightedSampleWithoutReplacementWithSource( + items, + sampleSize, + rand.New(rand.NewSource(rand.Int63())), + ) +} diff --git a/node/consensus/data/internal/random_test.go b/node/consensus/data/internal/random_test.go new file mode 100644 index 0000000..8287975 --- /dev/null +++ b/node/consensus/data/internal/random_test.go @@ -0,0 +1,57 @@ +package internal_test + +import ( + "math/rand" + "testing" + + "github.com/stretchr/testify/assert" + "source.quilibrium.com/quilibrium/monorepo/node/consensus/data/internal" +) + +type mockWeighted struct { + item int64 + weight float64 +} + +var _ internal.Weighted[int64] = (*mockWeighted)(nil) + +// GetWeight implements Weighted[int64]. +func (m mockWeighted) GetItem() int64 { + return m.item +} + +// GetWeight implements Weighted[int64]. +func (m mockWeighted) GetWeight() float64 { + return m.weight +} + +func TestWeightedSampleWithoutReplacementWithSource(t *testing.T) { + items := []mockWeighted{ + {item: 0, weight: 0.1}, + {item: 1, weight: 0.2}, + {item: 2, weight: 0.4}, + {item: 3, weight: 0.6}, + {item: 4, weight: 0.8}, + {item: 5, weight: 1.0}, + } + + frequencies := [6]int{} + random := rand.New(rand.NewSource(0)) + for i := 0; i < 10_000; i++ { + sample := internal.WeightedSampleWithoutReplacementWithSource(items, 3, random) + seen := [6]bool{} + for _, item := range sample { + assert.False(t, seen[item]) + frequencies[item]++ + seen[item] = true + } + } + + for i := 0; i < 6; i++ { + assert.Greater(t, frequencies[i], 0) + if i > 0 { + assert.Greater(t, frequencies[i], frequencies[i-1]) + } + t.Logf("item %d: %d", i, frequencies[i]) + } +}