From 0242eafa3ec27fa0f14791d2ef9c04cd1a8adfa5 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Tue, 26 Nov 2024 23:45:20 -0600 Subject: [PATCH] add decay, make validation check a little smarter --- node/consensus/data/message_handler.go | 4 +- node/consensus/data/message_validators.go | 3 +- node/p2p/blossomsub.go | 70 ++++++++++++++++++++--- 3 files changed, 65 insertions(+), 12 deletions(-) diff --git a/node/consensus/data/message_handler.go b/node/consensus/data/message_handler.go index 970dd03..d8355ac 100644 --- a/node/consensus/data/message_handler.go +++ b/node/consensus/data/message_handler.go @@ -321,7 +321,7 @@ func (e *DataClockConsensusEngine) handleDataPeerListAnnounce( "peer provided outdated version, penalizing app score", zap.String("peer_id", peer.ID(peerID).String()), ) - e.pubSub.SetPeerScore(peerID, -1000000) + e.pubSub.AddPeerScore(peerID, -1000) return nil } @@ -332,7 +332,7 @@ func (e *DataClockConsensusEngine) handleDataPeerListAnnounce( } e.peerMapMx.RUnlock() - e.pubSub.SetPeerScore(peerID, 10) + e.pubSub.AddPeerScore(peerID, 10) e.peerMapMx.RLock() existing, ok := e.peerMap[string(peerID)] diff --git a/node/consensus/data/message_validators.go b/node/consensus/data/message_validators.go index cbde617..e54c149 100644 --- a/node/consensus/data/message_validators.go +++ b/node/consensus/data/message_validators.go @@ -64,7 +64,8 @@ func (e *DataClockConsensusEngine) validateTxMessage(peerID peer.ID, message *pb } if mint.Signature == nil || mint.Signature.PublicKey == nil || - mint.Signature.PublicKey.KeyValue == nil { + mint.Signature.PublicKey.KeyValue == nil || + len(mint.Signature.PublicKey.KeyValue) > 114 { return p2p.ValidationResultReject } head, err := e.dataTimeReel.Head() diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index 561f7a3..0bda834 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -60,8 +60,15 @@ const ( defaultPingTimeout = 5 * time.Second defaultPingPeriod = 30 * time.Second defaultPingAttempts = 3 + DecayInterval = 10 * time.Second + AppDecay = .9 ) +type appScore struct { + expire time.Time + score float64 +} + type BlossomSub struct { ps *blossomsub.PubSub ctx context.Context @@ -70,7 +77,7 @@ type BlossomSub struct { bitmaskMap map[string]*blossomsub.Bitmask h host.Host signKey crypto.PrivKey - peerScore map[string]int64 + peerScore map[string]*appScore peerScoreMx sync.Mutex network uint8 bootstrap internal.PeerConnector @@ -148,7 +155,7 @@ func NewBlossomSubStreamer( logger: logger, bitmaskMap: make(map[string]*blossomsub.Bitmask), signKey: privKey, - peerScore: make(map[string]int64), + peerScore: make(map[string]*appScore), network: p2pConfig.Network, } @@ -302,7 +309,7 @@ func NewBlossomSub( logger: logger, bitmaskMap: make(map[string]*blossomsub.Bitmask), signKey: privKey, - peerScore: make(map[string]int64), + peerScore: make(map[string]*appScore), network: p2pConfig.Network, } @@ -443,7 +450,7 @@ func NewBlossomSub( BehaviourPenaltyWeight: -10, BehaviourPenaltyThreshold: 100, BehaviourPenaltyDecay: .5, - DecayInterval: 10 * time.Second, + DecayInterval: DecayInterval, DecayToZero: .1, RetainScore: 60 * time.Minute, AppSpecificScore: func(p peer.ID) float64 { @@ -586,6 +593,39 @@ func resourceManager(highWatermark int, allowed []peer.AddrInfo) ( return mgr, nil } +func (b *BlossomSub) background(ctx context.Context) { + refreshScores := time.NewTicker(DecayInterval) + defer refreshScores.Stop() + + for { + select { + case <-refreshScores.C: + b.refreshScores() + case <-ctx.Done(): + return + } + } +} + +func (b *BlossomSub) refreshScores() { + b.peerScoreMx.Lock() + + now := time.Now() + for p, pstats := range b.peerScore { + if now.After(pstats.expire) { + delete(b.peerScore, p) + continue + } + + pstats.score *= AppDecay + if pstats.score < .1 { + pstats.score = 0 + } + } + + b.peerScoreMx.Unlock() +} + func (b *BlossomSub) PublishToBitmask(bitmask []byte, data []byte) error { return b.ps.Publish(b.ctx, bitmask, data) } @@ -768,23 +808,35 @@ func (b *BlossomSub) DiscoverPeers(ctx context.Context) error { func (b *BlossomSub) GetPeerScore(peerId []byte) int64 { b.peerScoreMx.Lock() - score := b.peerScore[string(peerId)] + score, ok := b.peerScore[string(peerId)] + if !ok { + return 0 + } b.peerScoreMx.Unlock() - return score + return int64(score.score) } func (b *BlossomSub) SetPeerScore(peerId []byte, score int64) { b.peerScoreMx.Lock() - b.peerScore[string(peerId)] = score + b.peerScore[string(peerId)] = &appScore{ + score: float64(score), + expire: time.Now().Add(1 * time.Hour), + } b.peerScoreMx.Unlock() } func (b *BlossomSub) AddPeerScore(peerId []byte, scoreDelta int64) { b.peerScoreMx.Lock() if _, ok := b.peerScore[string(peerId)]; !ok { - b.peerScore[string(peerId)] = scoreDelta + b.peerScore[string(peerId)] = &appScore{ + score: float64(scoreDelta), + expire: time.Now().Add(1 * time.Hour), + } } else { - b.peerScore[string(peerId)] = b.peerScore[string(peerId)] + scoreDelta + b.peerScore[string(peerId)] = &appScore{ + score: b.peerScore[string(peerId)].score + float64(scoreDelta), + expire: time.Now().Add(1 * time.Hour), + } } b.peerScoreMx.Unlock() }