ceremonyclient/node/p2p/blossomsub.go
2025-02-13 12:24:51 -06:00

1078 lines
28 KiB
Go

package p2p
import (
"bytes"
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"math"
"math/big"
"math/bits"
"net"
"sync"
"sync/atomic"
"time"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
libp2pconfig "github.com/libp2p/go-libp2p/config"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/libp2p/go-libp2p/p2p/discovery/util"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
routedhost "github.com/libp2p/go-libp2p/p2p/host/routed"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/libp2p/go-libp2p/p2p/net/gostream"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/mr-tron/base58"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/wrapperspb"
blossomsub "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub"
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
"source.quilibrium.com/quilibrium/monorepo/node/config"
qgrpc "source.quilibrium.com/quilibrium/monorepo/node/internal/grpc"
"source.quilibrium.com/quilibrium/monorepo/node/internal/observability"
"source.quilibrium.com/quilibrium/monorepo/node/p2p/internal"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
)
const (
DecayInterval = 10 * time.Second
AppDecay = .9
)
type appScore struct {
expire time.Time
score float64
}
type BlossomSub struct {
ps *blossomsub.PubSub
ctx context.Context
logger *zap.Logger
peerID peer.ID
bitmaskMap map[string]*blossomsub.Bitmask
h host.Host
signKey crypto.PrivKey
peerScore map[string]*appScore
peerScoreMx sync.Mutex
bootstrap internal.PeerConnector
discovery internal.PeerConnector
reachability atomic.Pointer[network.Reachability]
p2pConfig config.P2PConfig
}
var _ PubSub = (*BlossomSub)(nil)
var ErrNoPeersAvailable = errors.New("no peers available")
var BITMASK_ALL = []byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
}
var ANNOUNCE_PREFIX = "quilibrium-2.0.2-dusk-"
func getPeerID(p2pConfig *config.P2PConfig) peer.ID {
peerPrivKey, err := hex.DecodeString(p2pConfig.PeerPrivKey)
if err != nil {
panic(errors.Wrap(err, "error unmarshaling peerkey"))
}
privKey, err := crypto.UnmarshalEd448PrivateKey(peerPrivKey)
if err != nil {
panic(errors.Wrap(err, "error unmarshaling peerkey"))
}
pub := privKey.GetPublic()
id, err := peer.IDFromPublicKey(pub)
if err != nil {
panic(errors.Wrap(err, "error getting peer id"))
}
return id
}
func NewBlossomSubStreamer(
p2pConfig *config.P2PConfig,
logger *zap.Logger,
) *BlossomSub {
ctx := context.Background()
opts := []libp2pconfig.Option{
libp2p.ListenAddrStrings(p2pConfig.ListenMultiaddr),
}
bootstrappers := []peer.AddrInfo{}
peerinfo, err := peer.AddrInfoFromString("/ip4/185.209.178.191/udp/8336/quic-v1/p2p/QmcKQjpQmLpbDsiif2MuakhHFyxWvqYauPsJDaXnLav7PJ")
if err != nil {
panic(err)
}
bootstrappers = append(bootstrappers, *peerinfo)
var privKey crypto.PrivKey
if p2pConfig.PeerPrivKey != "" {
peerPrivKey, err := hex.DecodeString(p2pConfig.PeerPrivKey)
if err != nil {
panic(errors.Wrap(err, "error unmarshaling peerkey"))
}
privKey, err = crypto.UnmarshalEd448PrivateKey(peerPrivKey)
if err != nil {
panic(errors.Wrap(err, "error unmarshaling peerkey"))
}
opts = append(opts, libp2p.Identity(privKey))
}
bs := &BlossomSub{
ctx: ctx,
logger: logger,
bitmaskMap: make(map[string]*blossomsub.Bitmask),
signKey: privKey,
peerScore: make(map[string]*appScore),
p2pConfig: *p2pConfig,
}
h, err := libp2p.New(opts...)
if err != nil {
panic(errors.Wrap(err, "error constructing p2p"))
}
logger.Info("established peer id", zap.String("peer_id", h.ID().String()))
_ = initDHT(
ctx,
logger,
h,
false,
bootstrappers,
p2pConfig.Network,
)
peerID := h.ID()
bs.peerID = peerID
bs.h = h
bs.signKey = privKey
return bs
}
func NewBlossomSub(
p2pConfig *config.P2PConfig,
logger *zap.Logger,
) *BlossomSub {
ctx := context.Background()
opts := []libp2pconfig.Option{
libp2p.ListenAddrStrings(p2pConfig.ListenMultiaddr),
libp2p.EnableNATService(),
libp2p.NATPortMap(),
}
isBootstrapPeer := false
peerId := getPeerID(p2pConfig)
if p2pConfig.Network == 0 {
for _, peerAddr := range config.BootstrapPeers {
peerinfo, err := peer.AddrInfoFromString(peerAddr)
if err != nil {
panic(err)
}
if bytes.Equal([]byte(peerinfo.ID), []byte(peerId)) {
isBootstrapPeer = true
break
}
}
} else {
for _, peerAddr := range p2pConfig.BootstrapPeers {
peerinfo, err := peer.AddrInfoFromString(peerAddr)
if err != nil {
panic(err)
}
if bytes.Equal([]byte(peerinfo.ID), []byte(peerId)) {
isBootstrapPeer = true
break
}
}
}
defaultBootstrapPeers := append([]string{}, p2pConfig.BootstrapPeers...)
if p2pConfig.Network == 0 {
defaultBootstrapPeers = config.BootstrapPeers
}
bootstrappers := []peer.AddrInfo{}
for _, peerAddr := range defaultBootstrapPeers {
peerinfo, err := peer.AddrInfoFromString(peerAddr)
if err != nil {
panic(err)
}
bootstrappers = append(bootstrappers, *peerinfo)
}
var privKey crypto.PrivKey
if p2pConfig.PeerPrivKey != "" {
peerPrivKey, err := hex.DecodeString(p2pConfig.PeerPrivKey)
if err != nil {
panic(errors.Wrap(err, "error unmarshaling peerkey"))
}
privKey, err = crypto.UnmarshalEd448PrivateKey(peerPrivKey)
if err != nil {
panic(errors.Wrap(err, "error unmarshaling peerkey"))
}
opts = append(opts, libp2p.Identity(privKey))
}
allowedPeers := []peer.AddrInfo{}
allowedPeers = append(allowedPeers, bootstrappers...)
directPeers := []peer.AddrInfo{}
if len(p2pConfig.DirectPeers) > 0 {
logger.Info("found direct peers in config")
for _, peerAddr := range p2pConfig.DirectPeers {
peerinfo, err := peer.AddrInfoFromString(peerAddr)
if err != nil {
panic(err)
}
logger.Info("adding direct peer", zap.String("peer", peerinfo.ID.String()))
directPeers = append(directPeers, *peerinfo)
}
}
allowedPeers = append(allowedPeers, directPeers...)
if p2pConfig.LowWatermarkConnections != -1 &&
p2pConfig.HighWatermarkConnections != -1 {
cm, err := connmgr.NewConnManager(
p2pConfig.LowWatermarkConnections,
p2pConfig.HighWatermarkConnections,
connmgr.WithEmergencyTrim(true),
)
if err != nil {
panic(err)
}
rm, err := resourceManager(
p2pConfig.HighWatermarkConnections,
allowedPeers,
)
if err != nil {
panic(err)
}
opts = append(
opts,
libp2p.SwarmOpts(
swarm.WithIPv6BlackHoleConfig(false, 0, 0),
swarm.WithUDPBlackHoleConfig(false, 0, 0),
),
)
opts = append(opts, libp2p.ConnectionManager(cm))
opts = append(opts, libp2p.ResourceManager(rm))
}
bs := &BlossomSub{
ctx: ctx,
logger: logger,
bitmaskMap: make(map[string]*blossomsub.Bitmask),
signKey: privKey,
peerScore: make(map[string]*appScore),
p2pConfig: *p2pConfig,
}
h, err := libp2p.New(opts...)
if err != nil {
panic(errors.Wrap(err, "error constructing p2p"))
}
idService := internal.IDServiceFromHost(h)
logger.Info("established peer id", zap.String("peer_id", h.ID().String()))
reachabilitySub, err := h.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{}, eventbus.Name("blossomsub"))
if err != nil {
panic(err)
}
go func() {
defer reachabilitySub.Close()
logger := logger.Named("reachability")
for {
select {
case <-ctx.Done():
return
case evt, ok := <-reachabilitySub.Out():
if !ok {
return
}
state := evt.(event.EvtLocalReachabilityChanged).Reachability
bs.reachability.Store(&state)
switch state {
case network.ReachabilityPublic:
logger.Info("node is externally reachable")
case network.ReachabilityPrivate:
logger.Info("node is not externally reachable")
case network.ReachabilityUnknown:
logger.Info("node reachability is unknown")
default:
logger.Debug("unknown reachability state", zap.Any("state", state))
}
}
}
}()
kademliaDHT := initDHT(
ctx,
logger,
h,
isBootstrapPeer,
bootstrappers,
p2pConfig.Network,
)
h = routedhost.Wrap(h, kademliaDHT)
routingDiscovery := routing.NewRoutingDiscovery(kademliaDHT)
util.Advertise(ctx, routingDiscovery, getNetworkNamespace(p2pConfig.Network))
minBootstrapPeers := min(len(bootstrappers), p2pConfig.MinBootstrapPeers)
bootstrap := internal.NewPeerConnector(
ctx,
logger.Named("bootstrap"),
h,
idService,
minBootstrapPeers,
p2pConfig.BootstrapParallelism,
internal.NewStaticPeerSource(bootstrappers, true),
)
if err := bootstrap.Connect(ctx); err != nil {
panic(err)
}
bootstrap = internal.NewConditionalPeerConnector(
ctx,
internal.NewNotEnoughPeersCondition(
h,
minBootstrapPeers,
internal.PeerAddrInfosToPeerIDMap(bootstrappers),
),
bootstrap,
)
bs.bootstrap = bootstrap
discovery := internal.NewPeerConnector(
ctx,
logger.Named("discovery"),
h,
idService,
p2pConfig.D,
p2pConfig.DiscoveryParallelism,
internal.NewRoutingDiscoveryPeerSource(
routingDiscovery,
getNetworkNamespace(p2pConfig.Network),
p2pConfig.DiscoveryPeerLookupLimit,
),
)
if err := discovery.Connect(ctx); err != nil {
panic(err)
}
discovery = internal.NewChainedPeerConnector(ctx, bootstrap, discovery)
bs.discovery = discovery
internal.MonitorPeers(
ctx,
logger.Named("peer-monitor"),
h,
p2pConfig.PingTimeout,
p2pConfig.PingPeriod,
p2pConfig.PingAttempts,
)
// TODO: turn into an option flag for console logging, this is too noisy for
// default logging behavior
var tracer *blossomsub.JSONTracer
if p2pConfig.TraceLogFile == "" {
// tracer, err = blossomsub.NewStdoutJSONTracer()
// if err != nil {
// panic(errors.Wrap(err, "error building stdout tracer"))
// }
} else {
tracer, err = blossomsub.NewJSONTracer(p2pConfig.TraceLogFile)
if err != nil {
panic(errors.Wrap(err, "error building file tracer"))
}
}
blossomOpts := []blossomsub.Option{
blossomsub.WithStrictSignatureVerification(true),
}
if len(directPeers) > 0 {
blossomOpts = append(blossomOpts, blossomsub.WithDirectPeers(directPeers))
}
if tracer != nil {
blossomOpts = append(blossomOpts, blossomsub.WithEventTracer(tracer))
}
blossomOpts = append(blossomOpts, blossomsub.WithPeerScore(
&blossomsub.PeerScoreParams{
SkipAtomicValidation: false,
BitmaskScoreCap: 0,
IPColocationFactorWeight: 0,
IPColocationFactorThreshold: 6,
BehaviourPenaltyWeight: -10,
BehaviourPenaltyThreshold: 100,
BehaviourPenaltyDecay: .5,
DecayInterval: DecayInterval,
DecayToZero: .1,
RetainScore: 60 * time.Minute,
AppSpecificScore: func(p peer.ID) float64 {
return float64(bs.GetPeerScore([]byte(p)))
},
AppSpecificWeight: 10.0,
},
&blossomsub.PeerScoreThresholds{
SkipAtomicValidation: false,
GossipThreshold: -2000,
PublishThreshold: -5000,
GraylistThreshold: -10000,
AcceptPXThreshold: 1,
OpportunisticGraftThreshold: 2,
},
))
blossomOpts = append(blossomOpts,
blossomsub.WithValidateQueueSize(p2pConfig.ValidateQueueSize),
blossomsub.WithValidateWorkers(p2pConfig.ValidateWorkers),
blossomsub.WithPeerOutboundQueueSize(p2pConfig.PeerOutboundQueueSize),
)
blossomOpts = append(blossomOpts, observability.WithPrometheusRawTracer())
blossomOpts = append(blossomOpts, blossomsub.WithPeerFilter(internal.NewStaticPeerFilter(
// We filter out the bootstrap peers explicitly from BlossomSub
// as they do not subscribe to relevant topics anymore.
// However, the beacon is one of the bootstrap peers usually
// and as such it gets special treatment - it is the only bootstrap
// peer which is engaged in the network.
[]peer.ID{internal.BeaconPeerID(uint(p2pConfig.Network))},
internal.PeerAddrInfosToPeerIDSlice(bootstrappers),
true,
)))
blossomOpts = append(blossomOpts, blossomsub.WithDiscovery(
internal.NewPeerConnectorDiscovery(discovery),
))
params := toBlossomSubParams(p2pConfig)
rt := blossomsub.NewBlossomSubRouter(h, params, bs.p2pConfig.Network)
blossomOpts = append(blossomOpts, rt.WithDefaultTagTracer())
pubsub, err := blossomsub.NewBlossomSubWithRouter(ctx, h, rt, blossomOpts...)
if err != nil {
panic(err)
}
peerID := h.ID()
bs.ps = pubsub
bs.peerID = peerID
bs.h = h
bs.signKey = privKey
go bs.background(ctx)
return bs
}
// adjusted from Lotus' reference implementation, addressing
// https://github.com/libp2p/go-libp2p/issues/1640
func resourceManager(highWatermark int, allowed []peer.AddrInfo) (
network.ResourceManager,
error,
) {
defaultLimits := rcmgr.DefaultLimits
libp2p.SetDefaultServiceLimits(&defaultLimits)
defaultLimits.SystemBaseLimit.Memory = 1 << 28
defaultLimits.SystemLimitIncrease.Memory = 1 << 28
defaultLimitConfig := defaultLimits.AutoScale()
changes := rcmgr.PartialLimitConfig{}
if defaultLimitConfig.ToPartialLimitConfig().System.Memory > 2<<30 {
changes.System.Memory = 2 << 30
}
maxconns := uint(highWatermark)
if rcmgr.LimitVal(3*maxconns) > defaultLimitConfig.
ToPartialLimitConfig().System.ConnsInbound {
changes.System.ConnsInbound = rcmgr.LimitVal(1 << bits.Len(3*maxconns))
changes.System.ConnsOutbound = rcmgr.LimitVal(1 << bits.Len(3*maxconns))
changes.System.Conns = rcmgr.LimitVal(1 << bits.Len(6*maxconns))
changes.System.StreamsInbound = rcmgr.LimitVal(1 << bits.Len(36*maxconns))
changes.System.StreamsOutbound = rcmgr.LimitVal(1 << bits.Len(216*maxconns))
changes.System.Streams = rcmgr.LimitVal(1 << bits.Len(216*maxconns))
if rcmgr.LimitVal(3*maxconns) > defaultLimitConfig.
ToPartialLimitConfig().System.FD {
changes.System.FD = rcmgr.LimitVal(1 << bits.Len(3*maxconns))
}
changes.ServiceDefault.StreamsInbound = rcmgr.LimitVal(
1 << bits.Len(12*maxconns),
)
changes.ServiceDefault.StreamsOutbound = rcmgr.LimitVal(
1 << bits.Len(48*maxconns),
)
changes.ServiceDefault.Streams = rcmgr.LimitVal(1 << bits.Len(48*maxconns))
changes.ProtocolDefault.StreamsInbound = rcmgr.LimitVal(
1 << bits.Len(12*maxconns),
)
changes.ProtocolDefault.StreamsOutbound = rcmgr.LimitVal(
1 << bits.Len(48*maxconns),
)
changes.ProtocolDefault.Streams = rcmgr.LimitVal(1 << bits.Len(48*maxconns))
}
changedLimitConfig := changes.Build(defaultLimitConfig)
limiter := rcmgr.NewFixedLimiter(changedLimitConfig)
str, err := rcmgr.NewStatsTraceReporter()
if err != nil {
return nil, errors.Wrap(err, "resource manager")
}
rcmgr.MustRegisterWith(prometheus.DefaultRegisterer)
// Metrics
opts := append(
[]rcmgr.Option{},
rcmgr.WithTraceReporter(str),
)
resolver := madns.DefaultResolver
var allowedMaddrs []ma.Multiaddr
for _, pi := range allowed {
for _, addr := range pi.Addrs {
resolved, err := resolver.Resolve(context.Background(), addr)
if err != nil {
continue
}
allowedMaddrs = append(allowedMaddrs, resolved...)
}
}
opts = append(opts, rcmgr.WithAllowlistedMultiaddrs(allowedMaddrs))
mgr, err := rcmgr.NewResourceManager(limiter, opts...)
if err != nil {
return nil, errors.Wrap(err, "resource manager")
}
return mgr, nil
}
func (b *BlossomSub) background(ctx context.Context) {
refreshScores := time.NewTicker(DecayInterval)
defer refreshScores.Stop()
for {
select {
case <-refreshScores.C:
b.refreshScores()
case <-ctx.Done():
return
}
}
}
func (b *BlossomSub) refreshScores() {
b.peerScoreMx.Lock()
now := time.Now()
for p, pstats := range b.peerScore {
if now.After(pstats.expire) {
delete(b.peerScore, p)
continue
}
pstats.score *= AppDecay
if math.Abs(pstats.score) < .1 {
pstats.score = 0
}
}
b.peerScoreMx.Unlock()
}
func (b *BlossomSub) PublishToBitmask(bitmask []byte, data []byte) error {
return b.ps.Publish(b.ctx, bitmask, data)
}
func (b *BlossomSub) Publish(address []byte, data []byte) error {
bitmask := GetBloomFilter(address, 256, 3)
return b.PublishToBitmask(bitmask, data)
}
func (b *BlossomSub) Subscribe(
bitmask []byte,
handler func(message *pb.Message) error,
) error {
b.logger.Info("joining broadcast")
bm, err := b.ps.Join(bitmask)
if err != nil {
b.logger.Error("join failed", zap.Error(err))
return errors.Wrap(err, "subscribe")
}
b.logger.Info("subscribe to bitmask", zap.Binary("bitmask", bitmask))
subs := []*blossomsub.Subscription{}
for _, bit := range bm {
sub, err := bit.Subscribe(blossomsub.WithBufferSize(b.p2pConfig.SubscriptionQueueSize))
if err != nil {
b.logger.Error("subscription failed", zap.Error(err))
return errors.Wrap(err, "subscribe")
}
_, ok := b.bitmaskMap[string(bit.Bitmask())]
if !ok {
b.bitmaskMap[string(bit.Bitmask())] = bit
}
subs = append(subs, sub)
}
b.logger.Info(
"begin streaming from bitmask",
zap.Binary("bitmask", bitmask),
)
for _, sub := range subs {
copiedBitmask := make([]byte, len(bitmask))
copy(copiedBitmask[:], bitmask[:])
sub := sub
go func() {
for {
m, err := sub.Next(b.ctx)
if err != nil {
b.logger.Error(
"got error when fetching the next message",
zap.Error(err),
)
}
if bytes.Equal(m.Bitmask, copiedBitmask) {
if err = handler(m.Message); err != nil {
b.logger.Debug("message handler returned error", zap.Error(err))
}
}
}
}()
}
return nil
}
func (b *BlossomSub) Unsubscribe(bitmask []byte, raw bool) {
// TODO: Fix this, it is broken - the bitmask parameter is not sliced, and the
// network is not pre-pended to the bitmask.
networkBitmask := append([]byte{b.p2pConfig.Network}, bitmask...)
bm, ok := b.bitmaskMap[string(networkBitmask)]
if !ok {
return
}
bm.Close()
}
func (b *BlossomSub) RegisterValidator(
bitmask []byte, validator func(peerID peer.ID, message *pb.Message) ValidationResult, sync bool,
) error {
validatorEx := func(
ctx context.Context, peerID peer.ID, message *blossomsub.Message,
) blossomsub.ValidationResult {
switch v := validator(peerID, message.Message); v {
case ValidationResultAccept:
return blossomsub.ValidationAccept
case ValidationResultReject:
return blossomsub.ValidationReject
case ValidationResultIgnore:
return blossomsub.ValidationIgnore
default:
panic("unreachable")
}
}
var _ blossomsub.ValidatorEx = validatorEx
return b.ps.RegisterBitmaskValidator(bitmask, validatorEx, blossomsub.WithValidatorInline(sync))
}
func (b *BlossomSub) UnregisterValidator(bitmask []byte) error {
return b.ps.UnregisterBitmaskValidator(bitmask)
}
func (b *BlossomSub) GetPeerID() []byte {
return []byte(b.peerID)
}
func (b *BlossomSub) GetRandomPeer(bitmask []byte) ([]byte, error) {
peers := b.ps.ListPeers(bitmask)
if len(peers) == 0 {
return nil, errors.Wrap(
ErrNoPeersAvailable,
"get random peer",
)
}
b.logger.Debug("selecting from peers", zap.Any("peer_ids", peers))
sel, err := rand.Int(rand.Reader, big.NewInt(int64(len(peers))))
if err != nil {
return nil, errors.Wrap(err, "get random peer")
}
return []byte(peers[sel.Int64()]), nil
}
func (b *BlossomSub) IsPeerConnected(peerId []byte) bool {
peerID := peer.ID(peerId)
connectedness := b.h.Network().Connectedness(peerID)
return connectedness == network.Connected || connectedness == network.Limited
}
func (b *BlossomSub) Reachability() *wrapperspb.BoolValue {
reachability := b.reachability.Load()
if reachability == nil {
return nil
}
switch *reachability {
case network.ReachabilityPublic:
return wrapperspb.Bool(true)
case network.ReachabilityPrivate:
return wrapperspb.Bool(false)
default:
return nil
}
}
func initDHT(
ctx context.Context,
logger *zap.Logger,
h host.Host,
isBootstrapPeer bool,
bootstrappers []peer.AddrInfo,
network uint8,
) *dht.IpfsDHT {
logger.Info("establishing dht")
var mode dht.ModeOpt
if isBootstrapPeer || network != 0 {
mode = dht.ModeServer
} else {
mode = dht.ModeClient
}
opts := []dht.Option{
dht.Mode(mode),
dht.BootstrapPeers(bootstrappers...),
}
if network != 0 {
opts = append(opts, dht.ProtocolPrefix(protocol.ID("/testnet")))
}
kademliaDHT, err := dht.New(
ctx,
h,
opts...,
)
if err != nil {
panic(err)
}
if err := kademliaDHT.Bootstrap(ctx); err != nil {
panic(err)
}
return kademliaDHT
}
func (b *BlossomSub) Reconnect(peerId []byte) error {
peer := peer.ID(peerId)
info := b.h.Peerstore().PeerInfo(peer)
b.h.ConnManager().Unprotect(info.ID, "bootstrap")
time.Sleep(10 * time.Second)
if err := b.h.Connect(b.ctx, info); err != nil {
return errors.Wrap(err, "reconnect")
}
b.h.ConnManager().Protect(info.ID, "bootstrap")
return nil
}
func (b *BlossomSub) Bootstrap(ctx context.Context) error {
return b.bootstrap.Connect(ctx)
}
func (b *BlossomSub) DiscoverPeers(ctx context.Context) error {
return b.discovery.Connect(ctx)
}
func (b *BlossomSub) GetPeerScore(peerId []byte) int64 {
b.peerScoreMx.Lock()
peerScore, ok := b.peerScore[string(peerId)]
if !ok {
b.peerScoreMx.Unlock()
return 0
}
score := peerScore.score
b.peerScoreMx.Unlock()
return int64(score)
}
func (b *BlossomSub) SetPeerScore(peerId []byte, score int64) {
b.peerScoreMx.Lock()
b.peerScore[string(peerId)] = &appScore{
score: float64(score),
expire: time.Now().Add(1 * time.Hour),
}
b.peerScoreMx.Unlock()
}
func (b *BlossomSub) AddPeerScore(peerId []byte, scoreDelta int64) {
b.peerScoreMx.Lock()
if _, ok := b.peerScore[string(peerId)]; !ok {
b.peerScore[string(peerId)] = &appScore{
score: float64(scoreDelta),
expire: time.Now().Add(1 * time.Hour),
}
} else {
b.peerScore[string(peerId)] = &appScore{
score: b.peerScore[string(peerId)].score + float64(scoreDelta),
expire: time.Now().Add(1 * time.Hour),
}
}
b.peerScoreMx.Unlock()
}
func (b *BlossomSub) GetBitmaskPeers() map[string][]string {
peers := map[string][]string{}
// TODO: Fix this, it is broken - the bitmask parameter is not sliced, and the
// network is not pre-pended to the bitmask.
for _, k := range b.bitmaskMap {
peers[fmt.Sprintf("%+x", k.Bitmask()[1:])] = []string{}
for _, p := range k.ListPeers() {
peers[fmt.Sprintf("%+x", k.Bitmask()[1:])] = append(
peers[fmt.Sprintf("%+x", k.Bitmask()[1:])],
p.String(),
)
}
}
return peers
}
func (b *BlossomSub) GetPeerstoreCount() int {
return len(b.h.Peerstore().Peers())
}
func (b *BlossomSub) GetNetworkInfo() *protobufs.NetworkInfoResponse {
resp := &protobufs.NetworkInfoResponse{}
for _, p := range b.h.Network().Peers() {
addrs := b.h.Peerstore().Addrs(p)
multiaddrs := []string{}
for _, a := range addrs {
multiaddrs = append(multiaddrs, a.String())
}
resp.NetworkInfo = append(resp.NetworkInfo, &protobufs.NetworkInfo{
PeerId: []byte(p),
Multiaddrs: multiaddrs,
PeerScore: b.ps.PeerScore(p),
})
}
return resp
}
func (b *BlossomSub) GetNetworkPeersCount() int {
return len(b.h.Network().Peers())
}
func (b *BlossomSub) GetMultiaddrOfPeerStream(
ctx context.Context,
peerId []byte,
) <-chan ma.Multiaddr {
return b.h.Peerstore().AddrStream(ctx, peer.ID(peerId))
}
func (b *BlossomSub) GetMultiaddrOfPeer(peerId []byte) string {
addrs := b.h.Peerstore().Addrs(peer.ID(peerId))
if len(addrs) == 0 {
return ""
}
return addrs[0].String()
}
func (b *BlossomSub) GetNetwork() uint {
return uint(b.p2pConfig.Network)
}
func (b *BlossomSub) StartDirectChannelListener(
key []byte,
purpose string,
server *grpc.Server,
) error {
bind, err := gostream.Listen(
b.h,
protocol.ID(
"/p2p/direct-channel/"+base58.Encode(key)+purpose,
),
)
if err != nil {
return errors.Wrap(err, "start direct channel listener")
}
return errors.Wrap(server.Serve(bind), "start direct channel listener")
}
type extraCloseConn struct {
net.Conn
extraClose func()
}
func (c *extraCloseConn) Close() error {
err := c.Conn.Close()
c.extraClose()
return err
}
func (b *BlossomSub) GetDirectChannel(ctx context.Context, peerID []byte, purpose string) (
cc *grpc.ClientConn, err error,
) {
// Kind of a weird hack, but gostream can induce panics if the peer drops at
// the time of connection, this avoids the problem.
defer func() {
if r := recover(); r != nil {
cc = nil
err = errors.New("connection failed")
}
}()
id := peer.ID(peerID)
// Open question: should we prefix this so a node can run both in mainnet and
// testnet? Feels like a bad idea and would be preferable to discourage.
cc, err = qgrpc.DialContext(
ctx,
"passthrough:///",
grpc.WithContextDialer(
func(ctx context.Context, _ string) (net.Conn, error) {
// If we are not already connected to the peer, we will manually dial it
// before opening the direct channel. We will close the peer connection
// when the direct channel is closed.
alreadyConnected := false
switch connectedness := b.h.Network().Connectedness(id); connectedness {
case network.Connected, network.Limited:
alreadyConnected = true
default:
if err := b.h.Connect(ctx, peer.AddrInfo{ID: id}); err != nil {
return nil, errors.Wrap(err, "connect")
}
}
c, err := gostream.Dial(
network.WithNoDial(ctx, "direct-channel"),
b.h,
id,
protocol.ID(
"/p2p/direct-channel/"+id.String()+purpose,
),
)
if err != nil {
return nil, errors.Wrap(err, "dial direct channel")
}
if alreadyConnected {
return c, nil
}
return &extraCloseConn{
Conn: c,
extraClose: func() { _ = b.h.Network().ClosePeer(id) },
}, nil
},
),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
if err != nil {
return nil, errors.Wrap(err, "dial context")
}
return cc, nil
}
func (b *BlossomSub) GetPublicKey() []byte {
pub, _ := b.signKey.GetPublic().Raw()
return pub
}
func (b *BlossomSub) SignMessage(msg []byte) ([]byte, error) {
sig, err := b.signKey.Sign(msg)
return sig, errors.Wrap(err, "sign message")
}
func toBlossomSubParams(p2pConfig *config.P2PConfig) blossomsub.BlossomSubParams {
return blossomsub.BlossomSubParams{
D: p2pConfig.D,
Dlo: p2pConfig.DLo,
Dhi: p2pConfig.DHi,
Dscore: p2pConfig.DScore,
Dout: p2pConfig.DOut,
HistoryLength: p2pConfig.HistoryLength,
HistoryGossip: p2pConfig.HistoryGossip,
Dlazy: p2pConfig.DLazy,
GossipFactor: p2pConfig.GossipFactor,
GossipRetransmission: p2pConfig.GossipRetransmission,
HeartbeatInitialDelay: p2pConfig.HeartbeatInitialDelay,
HeartbeatInterval: p2pConfig.HeartbeatInterval,
FanoutTTL: p2pConfig.FanoutTTL,
PrunePeers: p2pConfig.PrunePeers,
PruneBackoff: p2pConfig.PruneBackoff,
UnsubscribeBackoff: p2pConfig.UnsubscribeBackoff,
Connectors: p2pConfig.Connectors,
MaxPendingConnections: p2pConfig.MaxPendingConnections,
ConnectionTimeout: p2pConfig.ConnectionTimeout,
DirectConnectTicks: p2pConfig.DirectConnectTicks,
DirectConnectInitialDelay: p2pConfig.DirectConnectInitialDelay,
OpportunisticGraftTicks: p2pConfig.OpportunisticGraftTicks,
OpportunisticGraftPeers: p2pConfig.OpportunisticGraftPeers,
GraftFloodThreshold: p2pConfig.GraftFloodThreshold,
MaxIHaveLength: p2pConfig.MaxIHaveLength,
MaxIHaveMessages: p2pConfig.MaxIHaveMessages,
MaxIDontWantMessages: p2pConfig.MaxIDontWantMessages,
IWantFollowupTime: p2pConfig.IWantFollowupTime,
IDontWantMessageThreshold: p2pConfig.IDontWantMessageThreshold,
IDontWantMessageTTL: p2pConfig.IDontWantMessageTTL,
SlowHeartbeatWarning: 0.1,
}
}
func getNetworkNamespace(network uint8) string {
var network_name string
switch network {
case 0:
network_name = "mainnet"
case 1:
network_name = "testnet-primary"
default:
network_name = fmt.Sprintf("network-%d", network)
}
return ANNOUNCE_PREFIX + network_name
}