Sample peers which are ahead during sync (#337)

This commit is contained in:
petricadaipegsp 2024-11-13 17:43:28 +01:00 committed by GitHub
parent f06d2c0ff2
commit 5333b4a8cc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 198 additions and 41 deletions

View File

@ -8,13 +8,13 @@ import (
"golang.org/x/crypto/sha3"
"source.quilibrium.com/quilibrium/monorepo/node/config"
"source.quilibrium.com/quilibrium/monorepo/node/consensus"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/data/internal"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
"source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token/application"
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
)
@ -24,30 +24,24 @@ func (e *DataClockConsensusEngine) collect(
e.logger.Info("collecting vdf proofs")
latest := enqueuedFrame
for {
peerId, maxFrame, err := e.GetMostAheadPeer(e.latestFrameReceived)
if maxFrame > e.latestFrameReceived {
e.syncingStatus = SyncStatusSynchronizing
if err != nil {
e.logger.Info("no peers available for sync, waiting")
time.Sleep(5 * time.Second)
} else if maxFrame > e.latestFrameReceived {
if maxFrame-e.latestFrameReceived > 100 {
maxFrame = e.latestFrameReceived + 100
}
latest, err = e.sync(latest, maxFrame, peerId)
if err == nil {
break
}
}
} else {
candidates := e.GetAheadPeers(max(latest.FrameNumber, e.latestFrameReceived))
if len(candidates) == 0 {
break
}
for _, candidate := range candidates {
if candidate.MaxFrame <= max(latest.FrameNumber, e.latestFrameReceived) {
continue
}
var err error
latest, err = e.sync(latest, candidate.MaxFrame, candidate.PeerID)
if err != nil {
e.logger.Debug("error syncing frame", zap.Error(err))
continue
}
}
}
e.syncingStatus = SyncStatusNotSyncing
e.logger.Info(
"returning leader frame",
zap.Uint64("frame_number", latest.FrameNumber),
@ -213,13 +207,11 @@ func (e *DataClockConsensusEngine) prove(
return frame, nil
}
func (e *DataClockConsensusEngine) GetMostAheadPeer(
frameNumber uint64,
) (
[]byte,
uint64,
error,
) {
func (e *DataClockConsensusEngine) GetAheadPeers(frameNumber uint64) []internal.PeerCandidate {
if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) {
return nil
}
e.logger.Debug(
"checking peer list",
zap.Int("peers", len(e.peerMap)),
@ -227,12 +219,9 @@ func (e *DataClockConsensusEngine) GetMostAheadPeer(
zap.Uint64("current_head_frame", frameNumber),
)
if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) {
return e.pubSub.GetPeerID(), frameNumber, nil
}
candidates := make([]internal.WeightedPeerCandidate, 0, len(e.peerMap))
maxDiff := uint64(0)
max := frameNumber
var peer []byte = nil
e.peerMapMx.RLock()
for _, v := range e.peerMap {
e.logger.Debug(
@ -242,21 +231,36 @@ func (e *DataClockConsensusEngine) GetMostAheadPeer(
zap.Int64("timestamp", v.timestamp),
zap.Binary("version", v.version),
)
_, ok := e.uncooperativePeersMap[string(v.peerId)]
if v.maxFrame > max &&
v.timestamp > config.GetMinimumVersionCutoff().UnixMilli() &&
bytes.Compare(v.version, config.GetMinimumVersion()) >= 0 && !ok {
peer = v.peerId
max = v.maxFrame
if v.maxFrame <= frameNumber {
continue
}
if _, ok := e.uncooperativePeersMap[string(v.peerId)]; ok {
continue
}
if v.timestamp <= config.GetMinimumVersionCutoff().UnixMilli() {
continue
}
if bytes.Compare(v.version, config.GetMinimumVersion()) < 0 {
continue
}
maxDiff = max(maxDiff, v.maxFrame-frameNumber)
candidates = append(candidates, internal.WeightedPeerCandidate{
PeerCandidate: internal.PeerCandidate{
PeerID: v.peerId,
MaxFrame: v.maxFrame,
},
})
}
e.peerMapMx.RUnlock()
if peer == nil {
return nil, 0, p2p.ErrNoPeersAvailable
if len(candidates) == 0 {
return nil
}
return peer, max, nil
for i := range candidates {
candidates[i].Weight = float64(candidates[i].MaxFrame-frameNumber) / float64(maxDiff)
}
return internal.WeightedSampleWithoutReplacement(candidates, len(candidates))
}
func (e *DataClockConsensusEngine) sync(
@ -264,6 +268,8 @@ func (e *DataClockConsensusEngine) sync(
maxFrame uint64,
peerId []byte,
) (*protobufs.ClockFrame, error) {
e.syncingStatus = SyncStatusSynchronizing
defer func() { e.syncingStatus = SyncStatusNotSyncing }()
latest := currentLatest
e.logger.Info("polling peer for new frames", zap.Binary("peer_id", peerId))
cc, err := e.pubSub.GetDirectChannel(peerId, "sync")

View File

@ -0,0 +1,25 @@
package internal
// PeerCandidate is a candidate for a peer to be used for syncing.
type PeerCandidate struct {
PeerID []byte
MaxFrame uint64
}
// WeightedPeerCandidate is a weighted peer candidate.
type WeightedPeerCandidate struct {
PeerCandidate
Weight float64
}
var _ Weighted[PeerCandidate] = (*WeightedPeerCandidate)(nil)
// GetItem implements Weighted[PeerCandidate].
func (p WeightedPeerCandidate) GetItem() PeerCandidate {
return p.PeerCandidate
}
// GetWeight implements Weighted[PeerCandidate].
func (p WeightedPeerCandidate) GetWeight() float64 {
return p.Weight
}

View File

@ -0,0 +1,69 @@
package internal
import (
"math"
"math/rand"
"sort"
)
// Weighted is an interface for items that have a weight.
type Weighted[T any] interface {
GetItem() T
GetWeight() float64
}
type weightedSort[T any] struct {
items []T
weights []float64
}
var _ sort.Interface = (*weightedSort[any])(nil)
// Len implements sort.Interface.
func (w weightedSort[T]) Len() int {
return len(w.items)
}
// Less implements sort.Interface.
func (w weightedSort[T]) Less(i, j int) bool {
return w.weights[i] >= w.weights[j]
}
// Swap implements sort.Interface.
func (w weightedSort[T]) Swap(i, j int) {
w.items[i], w.items[j] = w.items[j], w.items[i]
w.weights[i], w.weights[j] = w.weights[j], w.weights[i]
}
// WeightedSampleWithoutReplacementWithSource samples without replacement
// from a list of weighted items using a given random source.
// Based on work by Efraimidis and Spirakis.
func WeightedSampleWithoutReplacementWithSource[T any, W Weighted[T]](
items []W,
sampleSize int,
random *rand.Rand,
) []T {
ws := weightedSort[T]{
items: make([]T, len(items)),
weights: make([]float64, len(items)),
}
for i, item := range items {
ws.items[i] = item.GetItem()
ws.weights[i] = math.Pow(random.Float64(), 1.0/item.GetWeight())
}
sort.Sort(ws)
return ws.items[:sampleSize]
}
// WeightedSampleWithoutReplacement samples without replacement from a list
// of weighted items.
func WeightedSampleWithoutReplacement[T any, W Weighted[T]](
items []W,
sampleSize int,
) []T {
return WeightedSampleWithoutReplacementWithSource(
items,
sampleSize,
rand.New(rand.NewSource(rand.Int63())),
)
}

View File

@ -0,0 +1,57 @@
package internal_test
import (
"math/rand"
"testing"
"github.com/stretchr/testify/assert"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/data/internal"
)
type mockWeighted struct {
item int64
weight float64
}
var _ internal.Weighted[int64] = (*mockWeighted)(nil)
// GetWeight implements Weighted[int64].
func (m mockWeighted) GetItem() int64 {
return m.item
}
// GetWeight implements Weighted[int64].
func (m mockWeighted) GetWeight() float64 {
return m.weight
}
func TestWeightedSampleWithoutReplacementWithSource(t *testing.T) {
items := []mockWeighted{
{item: 0, weight: 0.1},
{item: 1, weight: 0.2},
{item: 2, weight: 0.4},
{item: 3, weight: 0.6},
{item: 4, weight: 0.8},
{item: 5, weight: 1.0},
}
frequencies := [6]int{}
random := rand.New(rand.NewSource(0))
for i := 0; i < 10_000; i++ {
sample := internal.WeightedSampleWithoutReplacementWithSource(items, 3, random)
seen := [6]bool{}
for _, item := range sample {
assert.False(t, seen[item])
frequencies[item]++
seen[item] = true
}
}
for i := 0; i < 6; i++ {
assert.Greater(t, frequencies[i], 0)
if i > 0 {
assert.Greater(t, frequencies[i], frequencies[i-1])
}
t.Logf("item %d: %d", i, frequencies[i])
}
}