ceremonyclient/node/p2p/blossomsub.go
Cassandra Heart c797d482f9
v2.1.0.5 (#457)
* wip: conversion of hotstuff from flow into Q-oriented model

* bulk of tests

* remaining non-integration tests

* add integration test, adjust log interface, small tweaks

* further adjustments, restore full pacemaker shape

* add component lifecycle management+supervisor

* further refinements

* resolve timeout hanging

* mostly finalized state for consensus

* bulk of engine swap out

* lifecycle-ify most types

* wiring nearly complete, missing needed hooks for proposals

* plugged in, vetting message validation paths

* global consensus, plugged in and verified

* app shard now wired in too

* do not decode empty keys.yml (#456)

* remove obsolete engine.maxFrames config parameter (#454)

* default to Info log level unless debug is enabled (#453)

* respect config's  "logging" section params, remove obsolete single-file logging (#452)

* Trivial code cleanup aiming to reduce Go compiler warnings (#451)

* simplify range traversal

* simplify channel read for single select case

* delete rand.Seed() deprecated in Go 1.20 and no-op as of Go 1.24

* simplify range traversal

* simplify channel read for single select case

* remove redundant type from array

* simplify range traversal

* simplify channel read for single select case

* RC slate

* finalize 2.1.0.5

* Update comments in StrictMonotonicCounter

Fix comment formatting and clarify description.

---------

Co-authored-by: Black Swan <3999712+blacks1ne@users.noreply.github.com>
2025-11-11 05:00:17 -06:00

1609 lines
42 KiB
Go

package p2p
import (
"bytes"
"context"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"fmt"
"log"
"math"
"math/big"
"math/bits"
"net"
"runtime/debug"
"sync"
"sync/atomic"
"time"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
libp2pconfig "github.com/libp2p/go-libp2p/config"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/libp2p/go-libp2p/p2p/discovery/util"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
routedhost "github.com/libp2p/go-libp2p/p2p/host/routed"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/libp2p/go-libp2p/p2p/net/gostream"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/mr-tron/base58"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
mn "github.com/multiformats/go-multiaddr/net"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/wrapperspb"
"source.quilibrium.com/quilibrium/monorepo/config"
blossomsub "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub"
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
"source.quilibrium.com/quilibrium/monorepo/node/internal/observability"
"source.quilibrium.com/quilibrium/monorepo/node/p2p/internal"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/channel"
"source.quilibrium.com/quilibrium/monorepo/types/p2p"
up2p "source.quilibrium.com/quilibrium/monorepo/utils/p2p"
)
const (
DecayInterval = 10 * time.Second
AppDecay = .9
)
type appScore struct {
expire time.Time
score float64
}
type BlossomSub struct {
ps *blossomsub.PubSub
ctx context.Context
logger *zap.Logger
peerID peer.ID
derivedPeerID peer.ID
bitmaskMap map[string]*blossomsub.Bitmask
// Track which bit slices belong to which original bitmasks, used to reference
// count bitmasks for closed subscriptions
subscriptionTracker map[string][][]byte
subscriptionMutex sync.RWMutex
h host.Host
signKey crypto.PrivKey
peerScore map[string]*appScore
peerScoreMx sync.Mutex
bootstrap internal.PeerConnector
discovery internal.PeerConnector
reachability atomic.Pointer[network.Reachability]
p2pConfig config.P2PConfig
dht *dht.IpfsDHT
}
var _ p2p.PubSub = (*BlossomSub)(nil)
var ErrNoPeersAvailable = errors.New("no peers available")
var ANNOUNCE_PREFIX = "quilibrium-2.0.2-dusk-"
func getPeerID(p2pConfig *config.P2PConfig) peer.ID {
peerPrivKey, err := hex.DecodeString(p2pConfig.PeerPrivKey)
if err != nil {
log.Panic("error unmarshaling peerkey", zap.Error(err))
}
privKey, err := crypto.UnmarshalEd448PrivateKey(peerPrivKey)
if err != nil {
log.Panic("error unmarshaling peerkey", zap.Error(err))
}
pub := privKey.GetPublic()
id, err := peer.IDFromPublicKey(pub)
if err != nil {
log.Panic("error getting peer id", zap.Error(err))
}
return id
}
// NewBlossomSubWithHost creates a new blossomsub instance with a pre-defined
// host. This method is intended for integration tests that need something
// more realistic for pubsub purposes, with a supplied simulator (see
// node/tests/simnet.go for utility methods to construct a flaky host)
func NewBlossomSubWithHost(
p2pConfig *config.P2PConfig,
engineConfig *config.EngineConfig,
logger *zap.Logger,
coreId uint,
isBootstrapPeer bool,
host host.Host,
privKey crypto.PrivKey,
bootstrapHosts []host.Host,
) *BlossomSub {
ctx := context.Background()
if coreId == 0 {
logger = logger.With(zap.String("process", "master"))
} else {
logger = logger.With(zap.String(
"process",
fmt.Sprintf("worker %d", coreId),
))
}
bs := &BlossomSub{
ctx: ctx,
logger: logger,
bitmaskMap: make(map[string]*blossomsub.Bitmask),
subscriptionTracker: make(map[string][][]byte),
signKey: privKey,
peerScore: make(map[string]*appScore),
p2pConfig: *p2pConfig,
}
idService := internal.IDServiceFromHost(host)
logger.Info("established peer id", zap.String("peer_id", host.ID().String()))
reachabilitySub, err := host.EventBus().Subscribe(
&event.EvtLocalReachabilityChanged{},
eventbus.Name("blossomsub"),
)
if err != nil {
logger.Panic("error subscribing to reachability events", zap.Error(err))
}
go func() {
defer reachabilitySub.Close()
instance := "node"
if coreId != 0 {
instance = "worker"
}
logger := logger.Named("reachability")
for {
select {
case <-ctx.Done():
return
case evt, ok := <-reachabilitySub.Out():
if !ok {
return
}
state := evt.(event.EvtLocalReachabilityChanged).Reachability
bs.reachability.Store(&state)
switch state {
case network.ReachabilityPublic:
logger.Info(
instance+" is externally reachable",
zap.Uint("core_id", coreId),
)
case network.ReachabilityPrivate:
logger.Error(
instance+" is not externally reachable",
zap.Uint("core_id", coreId),
)
case network.ReachabilityUnknown:
logger.Info(
instance+" reachability is unknown",
zap.Uint("core_id", coreId),
)
default:
logger.Debug("unknown reachability state", zap.Any("state", state))
}
}
}
}()
bootstrappers := []peer.AddrInfo{}
for _, bh := range bootstrapHosts {
// manually construct the p2p string, kind of kludgy, but this is intended
// for use with tests
ai, err := peer.AddrInfoFromString(
bh.Addrs()[0].String() + "/p2p/" + bh.ID().String(),
)
if err != nil {
panic(fmt.Sprintf("error for addr %v, %+v:", bh.Addrs()[0], err))
}
bootstrappers = append(bootstrappers, *ai)
}
kademliaDHT := initDHT(
ctx,
logger,
host,
isBootstrapPeer,
bootstrappers,
p2pConfig.Network,
)
host = routedhost.Wrap(host, kademliaDHT)
routingDiscovery := routing.NewRoutingDiscovery(kademliaDHT)
util.Advertise(ctx, routingDiscovery, getNetworkNamespace(p2pConfig.Network))
minBootstrapPeers := min(len(bootstrappers), p2pConfig.MinBootstrapPeers)
bootstrap := internal.NewPeerConnector(
ctx,
zap.NewNop(),
host,
idService,
minBootstrapPeers,
p2pConfig.BootstrapParallelism,
internal.NewStaticPeerSource(bootstrappers, true),
)
if err := bootstrap.Connect(ctx); err != nil {
logger.Panic("error connecting to bootstrap peers", zap.Error(err))
}
bs.bootstrap = bootstrap
discovery := internal.NewPeerConnector(
ctx,
zap.NewNop(),
host,
idService,
p2pConfig.D,
p2pConfig.DiscoveryParallelism,
internal.NewRoutingDiscoveryPeerSource(
routingDiscovery,
getNetworkNamespace(p2pConfig.Network),
p2pConfig.DiscoveryPeerLookupLimit,
),
)
if err := discovery.Connect(ctx); err != nil {
logger.Panic("error connecting to discovery peers", zap.Error(err))
}
discovery = internal.NewChainedPeerConnector(ctx, bootstrap, discovery)
bs.discovery = discovery
internal.MonitorPeers(
ctx,
logger.Named("peer-monitor"),
host,
p2pConfig.PingTimeout,
p2pConfig.PingPeriod,
p2pConfig.PingAttempts,
nil,
)
var tracer *blossomsub.JSONTracer
if p2pConfig.TraceLogStdout {
tracer, err = blossomsub.NewStdoutJSONTracer()
if err != nil {
panic(errors.Wrap(err, "error building stdout tracer"))
}
} else if p2pConfig.TraceLogFile != "" {
tracer, err = blossomsub.NewJSONTracer(p2pConfig.TraceLogFile)
if err != nil {
logger.Panic("error building file tracer", zap.Error(err))
}
}
blossomOpts := []blossomsub.Option{
blossomsub.WithStrictSignatureVerification(true),
blossomsub.WithValidateQueueSize(blossomsub.DefaultValidateQueueSize),
blossomsub.WithValidateWorkers(1),
blossomsub.WithPeerOutboundQueueSize(
blossomsub.DefaultPeerOutboundQueueSize,
),
}
if tracer != nil {
blossomOpts = append(blossomOpts, blossomsub.WithEventTracer(tracer))
}
blossomOpts = append(blossomOpts, blossomsub.WithPeerScore(
&blossomsub.PeerScoreParams{
SkipAtomicValidation: false,
BitmaskScoreCap: 0,
IPColocationFactorWeight: 0,
IPColocationFactorThreshold: 6,
BehaviourPenaltyWeight: -10,
BehaviourPenaltyThreshold: 100,
BehaviourPenaltyDecay: .5,
DecayInterval: DecayInterval,
DecayToZero: .1,
RetainScore: 60 * time.Minute,
AppSpecificScore: func(p peer.ID) float64 {
return float64(bs.GetPeerScore([]byte(p)))
},
AppSpecificWeight: 10.0,
},
&blossomsub.PeerScoreThresholds{
SkipAtomicValidation: false,
GossipThreshold: -2000,
PublishThreshold: -5000,
GraylistThreshold: -10000,
AcceptPXThreshold: 1,
OpportunisticGraftThreshold: 2,
},
))
blossomOpts = append(blossomOpts, observability.WithPrometheusRawTracer())
if p2pConfig.Network == 0 {
logger.Info("enabling blacklist for bootstrappers for blossomsub")
blossomOpts = append(blossomOpts, blossomsub.WithPeerFilter(
internal.NewStaticPeerFilter(
[]peer.ID{},
internal.PeerAddrInfosToPeerIDSlice(bootstrappers),
true,
),
))
}
blossomOpts = append(blossomOpts, blossomsub.WithDiscovery(
internal.NewPeerConnectorDiscovery(discovery),
))
blossomOpts = append(blossomOpts, blossomsub.WithMessageIdFn(
func(pmsg *pb.Message) []byte {
id := sha256.Sum256(pmsg.Data)
return id[:]
}),
)
params := toBlossomSubParams(p2pConfig)
rt := blossomsub.NewBlossomSubRouter(host, params, bs.p2pConfig.Network)
blossomOpts = append(blossomOpts, rt.WithDefaultTagTracer())
pubsub, err := blossomsub.NewBlossomSubWithRouter(ctx, host, rt, blossomOpts...)
if err != nil {
logger.Panic("error creating pubsub", zap.Error(err))
}
peerID := host.ID()
bs.dht = kademliaDHT
bs.ps = pubsub
bs.peerID = peerID
bs.h = host
bs.signKey = privKey
bs.derivedPeerID = peerID
go bs.background(ctx)
return bs
}
func NewBlossomSub(
p2pConfig *config.P2PConfig,
engineConfig *config.EngineConfig,
logger *zap.Logger,
coreId uint,
) *BlossomSub {
ctx := context.Background()
// Determine the appropriate listen address based on coreId
var listenAddr string
if coreId == 0 {
logger = logger.With(zap.String("process", "master"))
// For main node (coreId == 0), use the standard p2pConfig.ListenMultiaddr
listenAddr = p2pConfig.ListenMultiaddr
} else {
logger = logger.With(zap.String(
"process",
fmt.Sprintf("worker %d", coreId),
))
// For data workers (coreId > 0), check if DataWorkerP2PMultiaddrs is
// provided
if engineConfig != nil && len(engineConfig.DataWorkerP2PMultiaddrs) > 0 &&
int(coreId-1) < len(engineConfig.DataWorkerP2PMultiaddrs) {
listenAddr = engineConfig.DataWorkerP2PMultiaddrs[coreId-1]
logger.Info(
"Using configured data worker P2P multiaddr",
zap.String("multiaddr", listenAddr),
zap.Uint("core_id", coreId),
)
} else if engineConfig != nil && engineConfig.DataWorkerBaseP2PPort > 0 {
port := engineConfig.DataWorkerBaseP2PPort + uint16(coreId-1)
listenAddr = fmt.Sprintf(engineConfig.DataWorkerBaseListenMultiaddr, port)
logger.Info(
"worker p2p listen address calculated",
zap.String("multiaddr", listenAddr),
zap.Uint("core_id", coreId),
zap.Uint16("port", port),
)
} else {
logger.Error(
"no data worker configuration found",
zap.Uint("core_id", coreId),
)
time.Sleep(120 * time.Second)
panic("no data worker configuration found")
}
}
opts := []libp2pconfig.Option{
libp2p.ListenAddrStrings(listenAddr),
libp2p.EnableNATService(),
libp2p.NATPortMap(),
}
isBootstrapPeer := false
if coreId == 0 {
peerId := getPeerID(p2pConfig)
if p2pConfig.Network == 0 {
for _, peerAddr := range config.BootstrapPeers {
peerinfo, err := peer.AddrInfoFromString(peerAddr)
if err != nil {
logger.Panic("error getting peer info", zap.Error(err))
}
if bytes.Equal([]byte(peerinfo.ID), []byte(peerId)) {
isBootstrapPeer = true
break
}
}
} else {
for _, peerAddr := range p2pConfig.BootstrapPeers {
peerinfo, err := peer.AddrInfoFromString(peerAddr)
if err != nil {
logger.Panic("error getting peer info", zap.Error(err))
}
if bytes.Equal([]byte(peerinfo.ID), []byte(peerId)) {
isBootstrapPeer = true
break
}
}
}
}
defaultBootstrapPeers := append([]string{}, p2pConfig.BootstrapPeers...)
if p2pConfig.Network == 0 {
defaultBootstrapPeers = config.BootstrapPeers
}
bootstrappers := []peer.AddrInfo{}
for _, peerAddr := range defaultBootstrapPeers {
peerinfo, err := peer.AddrInfoFromString(peerAddr)
if err != nil {
logger.Panic("error getting peer info", zap.Error(err))
}
bootstrappers = append(bootstrappers, *peerinfo)
}
var privKey crypto.PrivKey
var derivedPeerId peer.ID
if p2pConfig.PeerPrivKey != "" {
peerPrivKey, err := hex.DecodeString(p2pConfig.PeerPrivKey)
if err != nil {
logger.Panic("error unmarshaling peerkey", zap.Error(err))
}
privKey, err = crypto.UnmarshalEd448PrivateKey(peerPrivKey)
if err != nil {
logger.Panic("error unmarshaling peerkey", zap.Error(err))
}
derivedPeerId, err = peer.IDFromPrivateKey(privKey)
if err != nil {
logger.Panic("error deriving peer id", zap.Error(err))
}
if coreId == 0 {
opts = append(opts, libp2p.Identity(privKey))
} else {
workerKey, _, err := crypto.GenerateEd448Key(rand.Reader)
if err != nil {
logger.Panic("error generating worker peerkey", zap.Error(err))
}
opts = append(opts, libp2p.Identity(workerKey))
}
}
allowedPeers := []peer.AddrInfo{}
allowedPeers = append(allowedPeers, bootstrappers...)
directPeers := []peer.AddrInfo{}
if len(p2pConfig.DirectPeers) > 0 {
logger.Info("found direct peers in config")
for _, peerAddr := range p2pConfig.DirectPeers {
peerinfo, err := peer.AddrInfoFromString(peerAddr)
if err != nil {
logger.Panic("error getting peer info", zap.Error(err))
}
logger.Info(
"adding direct peer",
zap.String("peer", peerinfo.ID.String()),
)
directPeers = append(directPeers, *peerinfo)
}
}
allowedPeers = append(allowedPeers, directPeers...)
opts = append(
opts,
libp2p.SwarmOpts(
swarm.WithUDPBlackHoleSuccessCounter(
&swarm.BlackHoleSuccessCounter{
N: 8000,
MinSuccesses: 1,
Name: "permissive-udp",
},
),
swarm.WithIPv6BlackHoleSuccessCounter(
&swarm.BlackHoleSuccessCounter{
N: 8000,
MinSuccesses: 1,
Name: "permissive-ip6",
},
),
),
)
if p2pConfig.LowWatermarkConnections != -1 &&
p2pConfig.HighWatermarkConnections != -1 {
cm, err := connmgr.NewConnManager(
p2pConfig.LowWatermarkConnections,
p2pConfig.HighWatermarkConnections,
)
if err != nil {
logger.Panic("error creating connection manager", zap.Error(err))
}
rm, err := resourceManager(
p2pConfig.HighWatermarkConnections,
allowedPeers,
)
if err != nil {
logger.Panic("error creating resource manager", zap.Error(err))
}
opts = append(opts, libp2p.ConnectionManager(cm))
opts = append(opts, libp2p.ResourceManager(rm))
}
bs := &BlossomSub{
ctx: ctx,
logger: logger,
bitmaskMap: make(map[string]*blossomsub.Bitmask),
subscriptionTracker: make(map[string][][]byte),
signKey: privKey,
peerScore: make(map[string]*appScore),
p2pConfig: *p2pConfig,
derivedPeerID: derivedPeerId,
}
h, err := libp2p.New(opts...)
if err != nil {
logger.Panic("error constructing p2p", zap.Error(err))
}
idService := internal.IDServiceFromHost(h)
logger.Info("established peer id", zap.String("peer_id", h.ID().String()))
reachabilitySub, err := h.EventBus().Subscribe(
&event.EvtLocalReachabilityChanged{},
eventbus.Name("blossomsub"),
)
if err != nil {
logger.Panic("error subscribing to reachability events", zap.Error(err))
}
go func() {
defer reachabilitySub.Close()
instance := "node"
if coreId != 0 {
instance = "worker"
}
logger := logger.Named("reachability")
for {
select {
case <-ctx.Done():
return
case evt, ok := <-reachabilitySub.Out():
if !ok {
return
}
state := evt.(event.EvtLocalReachabilityChanged).Reachability
bs.reachability.Store(&state)
switch state {
case network.ReachabilityPublic:
logger.Info(
instance+" is externally reachable",
zap.Uint("core_id", coreId),
)
case network.ReachabilityPrivate:
logger.Error(
instance+" is not externally reachable",
zap.Uint("core_id", coreId),
)
case network.ReachabilityUnknown:
logger.Info(
instance+" reachability is unknown",
zap.Uint("core_id", coreId),
)
default:
logger.Debug("unknown reachability state", zap.Any("state", state))
}
}
}
}()
kademliaDHT := initDHT(
ctx,
logger,
h,
isBootstrapPeer,
bootstrappers,
p2pConfig.Network,
)
h = routedhost.Wrap(h, kademliaDHT)
routingDiscovery := routing.NewRoutingDiscovery(kademliaDHT)
util.Advertise(ctx, routingDiscovery, getNetworkNamespace(p2pConfig.Network))
minBootstrapPeers := min(len(bootstrappers), p2pConfig.MinBootstrapPeers)
bootstrap := internal.NewPeerConnector(
ctx,
logger.Named("bootstrap"),
h,
idService,
minBootstrapPeers,
p2pConfig.BootstrapParallelism,
internal.NewStaticPeerSource(bootstrappers, true),
)
if err := bootstrap.Connect(ctx); err != nil {
logger.Panic("error connecting to bootstrap peers", zap.Error(err))
}
bootstrap = internal.NewConditionalPeerConnector(
ctx,
internal.NewNotEnoughPeersCondition(
h,
minBootstrapPeers,
internal.PeerAddrInfosToPeerIDMap(bootstrappers),
),
bootstrap,
)
bs.bootstrap = bootstrap
discovery := internal.NewPeerConnector(
ctx,
logger.Named("discovery"),
h,
idService,
p2pConfig.D,
p2pConfig.DiscoveryParallelism,
internal.NewRoutingDiscoveryPeerSource(
routingDiscovery,
getNetworkNamespace(p2pConfig.Network),
p2pConfig.DiscoveryPeerLookupLimit,
),
)
if err := discovery.Connect(ctx); err != nil {
logger.Panic("error connecting to discovery peers", zap.Error(err))
}
discovery = internal.NewChainedPeerConnector(ctx, bootstrap, discovery)
bs.discovery = discovery
internal.MonitorPeers(
ctx,
logger.Named("peer-monitor"),
h,
p2pConfig.PingTimeout,
p2pConfig.PingPeriod,
p2pConfig.PingAttempts,
directPeers,
)
var tracer *blossomsub.JSONTracer
if p2pConfig.TraceLogStdout {
tracer, err = blossomsub.NewStdoutJSONTracer()
if err != nil {
panic(errors.Wrap(err, "error building stdout tracer"))
}
} else if p2pConfig.TraceLogFile != "" {
tracer, err = blossomsub.NewJSONTracer(p2pConfig.TraceLogFile)
if err != nil {
logger.Panic("error building file tracer", zap.Error(err))
}
}
blossomOpts := []blossomsub.Option{
blossomsub.WithStrictSignatureVerification(true),
}
if len(directPeers) > 0 {
blossomOpts = append(blossomOpts, blossomsub.WithDirectPeers(directPeers))
}
if tracer != nil {
blossomOpts = append(blossomOpts, blossomsub.WithEventTracer(tracer))
}
blossomOpts = append(blossomOpts, blossomsub.WithPeerScore(
&blossomsub.PeerScoreParams{
SkipAtomicValidation: false,
BitmaskScoreCap: 0,
IPColocationFactorWeight: 0,
IPColocationFactorThreshold: 6,
BehaviourPenaltyWeight: -10,
BehaviourPenaltyThreshold: 100,
BehaviourPenaltyDecay: .5,
DecayInterval: DecayInterval,
DecayToZero: .1,
RetainScore: 60 * time.Minute,
AppSpecificScore: func(p peer.ID) float64 {
return float64(bs.GetPeerScore([]byte(p)))
},
AppSpecificWeight: 10.0,
},
&blossomsub.PeerScoreThresholds{
SkipAtomicValidation: false,
GossipThreshold: -2000,
PublishThreshold: -5000,
GraylistThreshold: -10000,
AcceptPXThreshold: 1,
OpportunisticGraftThreshold: 2,
},
))
blossomOpts = append(blossomOpts,
blossomsub.WithValidateQueueSize(p2pConfig.ValidateQueueSize),
blossomsub.WithValidateWorkers(p2pConfig.ValidateWorkers),
blossomsub.WithPeerOutboundQueueSize(p2pConfig.PeerOutboundQueueSize),
)
blossomOpts = append(blossomOpts, observability.WithPrometheusRawTracer())
if p2pConfig.Network == 0 {
logger.Info("enabling blacklist for bootstrappers for blossomsub")
blossomOpts = append(blossomOpts, blossomsub.WithPeerFilter(
internal.NewStaticPeerFilter(
[]peer.ID{},
internal.PeerAddrInfosToPeerIDSlice(bootstrappers),
true,
),
))
}
blossomOpts = append(blossomOpts, blossomsub.WithDiscovery(
internal.NewPeerConnectorDiscovery(discovery),
))
blossomOpts = append(blossomOpts, blossomsub.WithMessageIdFn(
func(pmsg *pb.Message) []byte {
id := sha256.Sum256(pmsg.Data)
return id[:]
}),
)
params := toBlossomSubParams(p2pConfig)
rt := blossomsub.NewBlossomSubRouter(h, params, bs.p2pConfig.Network)
blossomOpts = append(blossomOpts, rt.WithDefaultTagTracer())
pubsub, err := blossomsub.NewBlossomSubWithRouter(ctx, h, rt, blossomOpts...)
if err != nil {
logger.Panic("error creating pubsub", zap.Error(err))
}
peerID := h.ID()
bs.dht = kademliaDHT
bs.ps = pubsub
bs.peerID = peerID
bs.h = h
bs.signKey = privKey
go bs.background(ctx)
return bs
}
// adjusted from Lotus' reference implementation, addressing
// https://github.com/libp2p/go-libp2p/issues/1640
func resourceManager(highWatermark int, allowed []peer.AddrInfo) (
network.ResourceManager,
error,
) {
defaultLimits := rcmgr.DefaultLimits
libp2p.SetDefaultServiceLimits(&defaultLimits)
defaultLimits.SystemBaseLimit.Memory = 1 << 28
defaultLimits.SystemLimitIncrease.Memory = 1 << 28
defaultLimitConfig := defaultLimits.AutoScale()
changes := rcmgr.PartialLimitConfig{}
if defaultLimitConfig.ToPartialLimitConfig().System.Memory > 2<<30 {
changes.System.Memory = 2 << 30
}
maxconns := uint(highWatermark)
if rcmgr.LimitVal(3*maxconns) > defaultLimitConfig.
ToPartialLimitConfig().System.ConnsInbound {
changes.System.ConnsInbound = rcmgr.LimitVal(1 << bits.Len(3*maxconns))
changes.System.ConnsOutbound = rcmgr.LimitVal(1 << bits.Len(3*maxconns))
changes.System.Conns = rcmgr.LimitVal(1 << bits.Len(6*maxconns))
changes.System.StreamsInbound = rcmgr.LimitVal(1 << bits.Len(36*maxconns))
changes.System.StreamsOutbound = rcmgr.LimitVal(1 << bits.Len(216*maxconns))
changes.System.Streams = rcmgr.LimitVal(1 << bits.Len(216*maxconns))
if rcmgr.LimitVal(3*maxconns) > defaultLimitConfig.
ToPartialLimitConfig().System.FD {
changes.System.FD = rcmgr.LimitVal(1 << bits.Len(3*maxconns))
}
changes.ServiceDefault.StreamsInbound = rcmgr.LimitVal(
1 << bits.Len(12*maxconns),
)
changes.ServiceDefault.StreamsOutbound = rcmgr.LimitVal(
1 << bits.Len(48*maxconns),
)
changes.ServiceDefault.Streams = rcmgr.LimitVal(1 << bits.Len(48*maxconns))
changes.ProtocolDefault.StreamsInbound = rcmgr.LimitVal(
1 << bits.Len(12*maxconns),
)
changes.ProtocolDefault.StreamsOutbound = rcmgr.LimitVal(
1 << bits.Len(48*maxconns),
)
changes.ProtocolDefault.Streams = rcmgr.LimitVal(1 << bits.Len(48*maxconns))
}
changedLimitConfig := changes.Build(defaultLimitConfig)
limiter := rcmgr.NewFixedLimiter(changedLimitConfig)
str, err := rcmgr.NewStatsTraceReporter()
if err != nil {
return nil, errors.Wrap(err, "resource manager")
}
rcmgr.MustRegisterWith(prometheus.DefaultRegisterer)
// Metrics
opts := append(
[]rcmgr.Option{},
rcmgr.WithTraceReporter(str),
)
resolver := madns.DefaultResolver
var allowedMaddrs []ma.Multiaddr
for _, pi := range allowed {
for _, addr := range pi.Addrs {
resolved, err := resolver.Resolve(context.Background(), addr)
if err != nil {
continue
}
allowedMaddrs = append(allowedMaddrs, resolved...)
}
}
opts = append(opts, rcmgr.WithAllowlistedMultiaddrs(allowedMaddrs))
mgr, err := rcmgr.NewResourceManager(limiter, opts...)
if err != nil {
return nil, errors.Wrap(err, "resource manager")
}
return mgr, nil
}
func (b *BlossomSub) background(ctx context.Context) {
refreshScores := time.NewTicker(DecayInterval)
defer refreshScores.Stop()
for {
select {
case <-refreshScores.C:
b.refreshScores()
case <-ctx.Done():
return
}
}
}
func (b *BlossomSub) refreshScores() {
b.peerScoreMx.Lock()
now := time.Now()
for p, pstats := range b.peerScore {
if now.After(pstats.expire) {
delete(b.peerScore, p)
continue
}
pstats.score *= AppDecay
if math.Abs(pstats.score) < .1 {
pstats.score = 0
}
}
b.peerScoreMx.Unlock()
}
func (b *BlossomSub) PublishToBitmask(bitmask []byte, data []byte) error {
err := b.ps.Publish(
b.ctx,
bitmask,
data,
blossomsub.WithSecretKeyAndPeerId(b.signKey, b.derivedPeerID),
)
if err != nil && errors.Is(err, blossomsub.ErrBitmaskClosed) &&
b.p2pConfig.Network == 99 {
// Ignore bitmask closed errors for devnet
return nil
}
return errors.Wrap(
errors.Wrapf(err, "bitmask: %x", bitmask),
"publish to bitmask",
)
}
func (b *BlossomSub) Publish(address []byte, data []byte) error {
bitmask := up2p.GetBloomFilter(address, 256, 3)
return b.PublishToBitmask(bitmask, data)
}
func (b *BlossomSub) Subscribe(
bitmask []byte,
handler func(message *pb.Message) error,
) error {
b.logger.Info("joining broadcast")
bm, err := b.ps.Join(bitmask)
if err != nil {
b.logger.Error("join failed", zap.Error(err))
return errors.Wrap(err, "subscribe")
}
b.logger.Info(
"subscribe to bitmask",
zap.String("bitmask", hex.EncodeToString(bitmask)),
)
// Track the bit slices for this subscription
b.subscriptionMutex.Lock()
bitSlices := make([][]byte, 0, len(bm))
for _, bit := range bm {
sliceCopy := make([]byte, len(bit.Bitmask()))
copy(sliceCopy, bit.Bitmask())
bitSlices = append(bitSlices, sliceCopy)
}
b.subscriptionTracker[string(bitmask)] = bitSlices
b.subscriptionMutex.Unlock()
// If the bitmask count is greater than three, this is a broad subscribe
// and the caller is expected to handle disambiguation of addresses
exact := len(bm) <= 3
subs := []*blossomsub.Subscription{}
for _, bit := range bm {
sub, err := bit.Subscribe(
blossomsub.WithBufferSize(b.p2pConfig.SubscriptionQueueSize),
)
if err != nil {
b.logger.Error("subscription failed", zap.Error(err))
// Clean up on failure
b.subscriptionMutex.Lock()
delete(b.subscriptionTracker, string(bitmask))
b.subscriptionMutex.Unlock()
return errors.Wrap(err, "subscribe")
}
b.subscriptionMutex.Lock()
_, ok := b.bitmaskMap[string(bit.Bitmask())]
if !ok {
b.bitmaskMap[string(bit.Bitmask())] = bit
}
b.subscriptionMutex.Unlock()
subs = append(subs, sub)
}
b.logger.Info(
"begin streaming from bitmask",
zap.String("bitmask", hex.EncodeToString(bitmask)),
)
for _, sub := range subs {
copiedBitmask := make([]byte, len(bitmask))
copy(copiedBitmask[:], bitmask[:])
sub := sub
go func() {
for {
b.subscribeHandler(sub, copiedBitmask, exact, handler)
}
}()
}
b.logger.Info(
"successfully subscribed to bitmask",
zap.String("bitmask", hex.EncodeToString(bitmask)),
)
return nil
}
func (b *BlossomSub) subscribeHandler(
sub *blossomsub.Subscription,
copiedBitmask []byte,
exact bool,
handler func(message *pb.Message) error,
) {
defer func() {
if r := recover(); r != nil {
b.logger.Error(
"message handler panicked, recovering",
zap.Any("panic", r),
zap.String("stack", string(debug.Stack())),
)
}
}()
m, err := sub.Next(b.ctx)
if err != nil {
b.logger.Error(
"got error when fetching the next message",
zap.Error(err),
)
}
if bytes.Equal(m.Bitmask, copiedBitmask) || !exact {
if err = handler(m.Message); err != nil {
b.logger.Debug("message handler returned error", zap.Error(err))
}
}
}
func (b *BlossomSub) Unsubscribe(bitmask []byte, raw bool) {
b.subscriptionMutex.Lock()
defer b.subscriptionMutex.Unlock()
bitmaskKey := string(bitmask)
bitSlices, exists := b.subscriptionTracker[bitmaskKey]
if !exists {
b.logger.Warn(
"attempted to unsubscribe from unknown bitmask",
zap.String("bitmask", hex.EncodeToString(bitmask)),
)
return
}
b.logger.Info(
"unsubscribing from bitmask",
zap.String("bitmask", hex.EncodeToString(bitmask)),
)
// Check each bit slice to see if it's still needed by other subscriptions
for _, bitSlice := range bitSlices {
bitSliceKey := string(bitSlice)
// Check if any other subscription is using this bit slice
stillNeeded := false
for otherKey, otherSlices := range b.subscriptionTracker {
if otherKey == bitmaskKey {
continue // Skip the subscription we're removing
}
for _, otherSlice := range otherSlices {
if bytes.Equal(otherSlice, bitSlice) {
stillNeeded = true
break
}
}
if stillNeeded {
break
}
}
// Only close the bitmask if no other subscription needs it
if !stillNeeded {
if bm, ok := b.bitmaskMap[bitSliceKey]; ok {
b.logger.Debug(
"closing bit slice",
zap.String("bit_slice", hex.EncodeToString(bitSlice)),
)
bm.Close()
delete(b.bitmaskMap, bitSliceKey)
}
} else {
b.logger.Debug(
"bit slice still needed by other subscription",
zap.String("bit_slice", hex.EncodeToString(bitSlice)),
)
}
}
// Remove the subscription from tracker
delete(b.subscriptionTracker, bitmaskKey)
}
func (b *BlossomSub) RegisterValidator(
bitmask []byte,
validator func(peerID peer.ID, message *pb.Message) p2p.ValidationResult,
sync bool,
) error {
validatorEx := func(
ctx context.Context, peerID peer.ID, message *blossomsub.Message,
) blossomsub.ValidationResult {
switch v := validator(peerID, message.Message); v {
case p2p.ValidationResultAccept:
return blossomsub.ValidationAccept
case p2p.ValidationResultReject:
return blossomsub.ValidationReject
case p2p.ValidationResultIgnore:
return blossomsub.ValidationIgnore
default:
panic("unreachable")
}
}
var _ blossomsub.ValidatorEx = validatorEx
return b.ps.RegisterBitmaskValidator(
bitmask,
validatorEx,
blossomsub.WithValidatorInline(sync),
)
}
func (b *BlossomSub) UnregisterValidator(bitmask []byte) error {
return b.ps.UnregisterBitmaskValidator(bitmask)
}
func (b *BlossomSub) GetPeerID() []byte {
return []byte(b.derivedPeerID)
}
func (b *BlossomSub) GetRandomPeer(bitmask []byte) ([]byte, error) {
peers := b.ps.ListPeers(bitmask)
if len(peers) == 0 {
return nil, errors.Wrap(
ErrNoPeersAvailable,
"get random peer",
)
}
b.logger.Debug("selecting from peers", zap.Any("peer_ids", peers))
sel, err := rand.Int(rand.Reader, big.NewInt(int64(len(peers))))
if err != nil {
return nil, errors.Wrap(err, "get random peer")
}
return []byte(peers[sel.Int64()]), nil
}
func (b *BlossomSub) IsPeerConnected(peerId []byte) bool {
peerID := peer.ID(peerId)
connectedness := b.h.Network().Connectedness(peerID)
return connectedness == network.Connected || connectedness == network.Limited
}
func (b *BlossomSub) Reachability() *wrapperspb.BoolValue {
reachability := b.reachability.Load()
if reachability == nil {
return nil
}
switch *reachability {
case network.ReachabilityPublic:
return wrapperspb.Bool(true)
case network.ReachabilityPrivate:
return wrapperspb.Bool(false)
default:
return nil
}
}
func initDHT(
ctx context.Context,
logger *zap.Logger,
h host.Host,
isBootstrapPeer bool,
bootstrappers []peer.AddrInfo,
network uint8,
) *dht.IpfsDHT {
logger.Info("establishing dht")
var mode dht.ModeOpt
if isBootstrapPeer || network != 0 {
logger.Warn("BOOTSTRAP PEER")
mode = dht.ModeServer
} else {
mode = dht.ModeClient
}
opts := []dht.Option{
dht.Mode(mode),
dht.BootstrapPeers(bootstrappers...),
}
if network != 0 {
opts = append(opts, dht.ProtocolPrefix(protocol.ID("/testnet")))
}
kademliaDHT, err := dht.New(
ctx,
h,
opts...,
)
if err != nil {
logger.Panic("error creating dht", zap.Error(err))
}
if err := kademliaDHT.Bootstrap(ctx); err != nil {
logger.Panic("error bootstrapping dht", zap.Error(err))
}
return kademliaDHT
}
func (b *BlossomSub) Reconnect(peerId []byte) error {
peer := peer.ID(peerId)
info := b.h.Peerstore().PeerInfo(peer)
b.h.ConnManager().Unprotect(info.ID, "bootstrap")
time.Sleep(10 * time.Second)
if err := b.h.Connect(b.ctx, info); err != nil {
return errors.Wrap(err, "reconnect")
}
b.h.ConnManager().Protect(info.ID, "bootstrap")
return nil
}
func (b *BlossomSub) Bootstrap(ctx context.Context) error {
return b.bootstrap.Connect(ctx)
}
func (b *BlossomSub) DiscoverPeers(ctx context.Context) error {
return b.discovery.Connect(ctx)
}
func (b *BlossomSub) GetPeerScore(peerId []byte) int64 {
b.peerScoreMx.Lock()
peerScore, ok := b.peerScore[string(peerId)]
if !ok {
b.peerScoreMx.Unlock()
return 0
}
score := peerScore.score
b.peerScoreMx.Unlock()
return int64(score)
}
func (b *BlossomSub) SetPeerScore(peerId []byte, score int64) {
b.peerScoreMx.Lock()
b.peerScore[string(peerId)] = &appScore{
score: float64(score),
expire: time.Now().Add(1 * time.Hour),
}
b.peerScoreMx.Unlock()
}
func (b *BlossomSub) AddPeerScore(peerId []byte, scoreDelta int64) {
b.peerScoreMx.Lock()
if _, ok := b.peerScore[string(peerId)]; !ok {
b.peerScore[string(peerId)] = &appScore{
score: float64(scoreDelta),
expire: time.Now().Add(1 * time.Hour),
}
} else {
b.peerScore[string(peerId)] = &appScore{
score: b.peerScore[string(peerId)].score + float64(scoreDelta),
expire: time.Now().Add(1 * time.Hour),
}
}
b.peerScoreMx.Unlock()
}
func (b *BlossomSub) GetPeerstoreCount() int {
return len(b.h.Peerstore().Peers())
}
func (b *BlossomSub) GetNetworkInfo() *protobufs.NetworkInfoResponse {
resp := &protobufs.NetworkInfoResponse{}
for _, p := range b.h.Network().Peers() {
addrs := b.h.Peerstore().Addrs(p)
multiaddrs := []string{}
for _, a := range addrs {
multiaddrs = append(multiaddrs, a.String())
}
resp.NetworkInfo = append(resp.NetworkInfo, &protobufs.NetworkInfo{
PeerId: []byte(p),
Multiaddrs: multiaddrs,
PeerScore: b.ps.PeerScore(p),
})
}
return resp
}
func (b *BlossomSub) GetNetworkPeersCount() int {
return len(b.h.Network().Peers())
}
func (b *BlossomSub) GetMultiaddrOfPeerStream(
ctx context.Context,
peerId []byte,
) <-chan ma.Multiaddr {
return b.h.Peerstore().AddrStream(ctx, peer.ID(peerId))
}
func (b *BlossomSub) GetMultiaddrOfPeer(peerId []byte) string {
addrs := b.h.Peerstore().Addrs(peer.ID(peerId))
if len(addrs) == 0 {
return ""
}
return addrs[0].String()
}
func (b *BlossomSub) GetNetwork() uint {
return uint(b.p2pConfig.Network)
}
// GetOwnMultiaddrs returns our own multiaddresses as seen by the network
func (b *BlossomSub) GetOwnMultiaddrs() []ma.Multiaddr {
allAddrs := make([]ma.Multiaddr, 0)
// 1. Get listen addresses
listenAddrs := b.h.Network().ListenAddresses()
// 2. Get addresses from our own peerstore (observed addresses)
selfAddrs := b.h.Peerstore().Addrs(b.h.ID())
// 3. Get all addresses including identified ones
identifiedAddrs := b.h.Addrs()
// Combine and deduplicate
addrMap := make(map[string]ma.Multiaddr)
for _, addr := range listenAddrs {
addrMap[addr.String()] = addr
}
for _, addr := range selfAddrs {
addrMap[addr.String()] = addr
}
for _, addr := range identifiedAddrs {
addrMap[addr.String()] = addr
}
// Convert back to slice
for _, addr := range addrMap {
allAddrs = append(allAddrs, addr)
}
return b.filterAndPrioritizeAddrs(allAddrs)
}
// filterAndPrioritizeAddrs filters and prioritizes addresses for external
// visibility
func (b *BlossomSub) filterAndPrioritizeAddrs(
addrs []ma.Multiaddr,
) []ma.Multiaddr {
var public, private, relay []ma.Multiaddr
for _, addr := range addrs {
// Skip localhost and unspecified addresses
if b.isLocalOnlyAddr(addr) {
continue
}
// Check if it's a relay address
if b.isRelayAddr(addr) {
relay = append(relay, addr)
} else if b.isPublicAddr(addr) {
public = append(public, addr)
} else if b.isPrivateButRoutable(addr) {
private = append(private, addr)
}
}
// Return in priority order: public, private, relay
result := make([]ma.Multiaddr, 0, len(public)+len(private)+len(relay))
result = append(result, public...)
result = append(result, private...)
result = append(result, relay...)
return result
}
// isLocalOnlyAddr checks if the address is localhost or unspecified
func (b *BlossomSub) isLocalOnlyAddr(addr ma.Multiaddr) bool {
// Get the IP component
ipComponent, err := addr.ValueForProtocol(ma.P_IP4)
if err != nil {
ipComponent, err = addr.ValueForProtocol(ma.P_IP6)
if err != nil {
return false
}
}
ip := net.ParseIP(ipComponent)
if ip == nil {
return false
}
return ip.IsLoopback() || ip.IsUnspecified()
}
// isRelayAddr checks if this is a relay address
func (b *BlossomSub) isRelayAddr(addr ma.Multiaddr) bool {
protocols := addr.Protocols()
for _, p := range protocols {
if p.Code == ma.P_CIRCUIT {
return true
}
}
return false
}
// isPublicAddr checks if this is a public/external address
func (b *BlossomSub) isPublicAddr(addr ma.Multiaddr) bool {
ipComponent, err := addr.ValueForProtocol(ma.P_IP4)
if err != nil {
ipComponent, err = addr.ValueForProtocol(ma.P_IP6)
if err != nil {
return false
}
}
ip := net.ParseIP(ipComponent)
if ip == nil {
return false
}
// Check if it's a globally routable IP
return !ip.IsPrivate() && !ip.IsLoopback() && !ip.IsUnspecified()
}
// isPrivateButRoutable checks if this is a private but potentially routable
// address
func (b *BlossomSub) isPrivateButRoutable(addr ma.Multiaddr) bool {
ipComponent, err := addr.ValueForProtocol(ma.P_IP4)
if err != nil {
ipComponent, err = addr.ValueForProtocol(ma.P_IP6)
if err != nil {
return false
}
}
ip := net.ParseIP(ipComponent)
if ip == nil {
return false
}
// Private but not localhost
return ip.IsPrivate() && !ip.IsLoopback() && !ip.IsUnspecified()
}
func (b *BlossomSub) StartDirectChannelListener(
key []byte,
purpose string,
server *grpc.Server,
) error {
bind, err := gostream.Listen(
b.h,
protocol.ID(
"/p2p/direct-channel/"+base58.Encode(key)+purpose,
),
)
if err != nil {
return errors.Wrap(err, "start direct channel listener")
}
return errors.Wrap(server.Serve(bind), "start direct channel listener")
}
type extraCloseConn struct {
net.Conn
extraClose func()
}
func (c *extraCloseConn) Close() error {
err := c.Conn.Close()
c.extraClose()
return err
}
func (b *BlossomSub) GetDirectChannel(
ctx context.Context,
peerID []byte,
purpose string,
) (
cc *grpc.ClientConn, err error,
) {
pi, err := b.dht.FindPeer(ctx, peer.ID(peerID))
if err != nil {
return nil, errors.Wrap(err, "get direct channel")
}
creds, err := NewPeerAuthenticator(
b.logger,
&b.p2pConfig,
nil,
nil,
nil,
nil,
[][]byte{peerID},
map[string]channel.AllowedPeerPolicyType{
"quilibrium.node.proxy.pb.PubSubProxy": channel.OnlySelfPeer,
},
map[string]channel.AllowedPeerPolicyType{},
).CreateClientTLSCredentials(peerID)
if err != nil {
return nil, errors.Wrap(err, "get direct channel")
}
var lastError error
for _, addr := range pi.Addrs {
var mga net.Addr
b.logger.Debug(
"attempting to get direct channel with peer",
zap.String("peer", peer.ID(peerID).String()),
zap.String("addr", addr.String()),
)
mga, lastError = mn.ToNetAddr(addr)
if lastError != nil {
b.logger.Debug(
"skipping address",
zap.String("addr", addr.String()),
zap.Error(lastError),
)
continue
}
var cc *grpc.ClientConn
cc, lastError = grpc.NewClient(
mga.String(),
grpc.WithTransportCredentials(creds),
)
if lastError != nil {
b.logger.Debug(
"could not connect",
zap.String("addr", addr.String()),
zap.Error(lastError),
)
continue
}
return cc, nil
}
return nil, errors.Wrap(lastError, "get direct channel")
}
func (b *BlossomSub) GetPublicKey() []byte {
pub, _ := b.signKey.GetPublic().Raw()
return pub
}
func (b *BlossomSub) SignMessage(msg []byte) ([]byte, error) {
sig, err := b.signKey.Sign(msg)
return sig, errors.Wrap(err, "sign message")
}
func toBlossomSubParams(
p2pConfig *config.P2PConfig,
) blossomsub.BlossomSubParams {
return blossomsub.BlossomSubParams{
D: p2pConfig.D,
Dlo: p2pConfig.DLo,
Dhi: p2pConfig.DHi,
Dscore: p2pConfig.DScore,
Dout: p2pConfig.DOut,
HistoryLength: p2pConfig.HistoryLength,
HistoryGossip: p2pConfig.HistoryGossip,
Dlazy: p2pConfig.DLazy,
GossipFactor: p2pConfig.GossipFactor,
GossipRetransmission: p2pConfig.GossipRetransmission,
HeartbeatInitialDelay: p2pConfig.HeartbeatInitialDelay,
HeartbeatInterval: p2pConfig.HeartbeatInterval,
FanoutTTL: p2pConfig.FanoutTTL,
PrunePeers: p2pConfig.PrunePeers,
PruneBackoff: p2pConfig.PruneBackoff,
UnsubscribeBackoff: p2pConfig.UnsubscribeBackoff,
Connectors: p2pConfig.Connectors,
MaxPendingConnections: p2pConfig.MaxPendingConnections,
ConnectionTimeout: p2pConfig.ConnectionTimeout,
DirectConnectTicks: p2pConfig.DirectConnectTicks,
DirectConnectInitialDelay: p2pConfig.DirectConnectInitialDelay,
OpportunisticGraftTicks: p2pConfig.OpportunisticGraftTicks,
OpportunisticGraftPeers: p2pConfig.OpportunisticGraftPeers,
GraftFloodThreshold: p2pConfig.GraftFloodThreshold,
MaxIHaveLength: p2pConfig.MaxIHaveLength,
MaxIHaveMessages: p2pConfig.MaxIHaveMessages,
MaxIDontWantMessages: p2pConfig.MaxIDontWantMessages,
IWantFollowupTime: p2pConfig.IWantFollowupTime,
IDontWantMessageThreshold: p2pConfig.IDontWantMessageThreshold,
IDontWantMessageTTL: p2pConfig.IDontWantMessageTTL,
SlowHeartbeatWarning: 0.1,
}
}
func getNetworkNamespace(network uint8) string {
var network_name string
switch network {
case 0:
network_name = "mainnet"
case 1:
network_name = "testnet-primary"
default:
network_name = fmt.Sprintf("network-%d", network)
}
return ANNOUNCE_PREFIX + network_name
}