ceremonyclient/node/p2p/blossomsub.go
Cassandra Heart 12996487c3
v2.1.0.18 (#508)
* experiment: reject bad peer info messages

* v2.1.0.18 preview

* add tagged sync

* Add missing hypergraph changes

* small tweaks to sync

* allow local sync, use it for provers with workers

* missing file

* resolve build error

* resolve sync issue, remove raw sync

* resolve deletion promotion bug

* resolve sync abstraction leak from tree deletion changes

* rearrange prover sync

* remove pruning from sync

* restore removed sync flag

* fix: sync, event stream deadlock, heuristic scoring of better shards

* resolve hanging shutdown + pubsub proxy issue

* further bugfixes: sync (restore old leaf sync), pubsub shutdown, merge events

* fix: clean up rust ffi, background coverage events, and sync tweaks

* fix: linking issue for channel, connectivity test aggression, sync regression, join tests

* fix: disjoint sync, improper application of filter

* resolve sync/reel/validation deadlock

* adjust sync to handle no leaf edge cases, multi-path segment traversal

* use simpler sync

* faster, simpler sync with some debug extras

* migration to recalculate

* don't use batch

* square up the roots

* fix nil pointer

* fix: seniority calculation, sync race condition, migration

* make sync dumber

* fix: tree deletion issue

* fix: missing seniority merge request canonical serialization

* address issues from previous commit test

* stale workers should be cleared

* remove missing gap check

* rearrange collect, reduce sync logging noise

* fix: the disjoint leaf/branch sync case

* nuclear option on sync failures

* v2.1.0.18, finalized
2026-02-08 23:51:51 -06:00

2137 lines
56 KiB
Go

package p2p
import (
"bytes"
"context"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"fmt"
"log"
"math"
"math/big"
"math/bits"
"net"
"os"
"path/filepath"
"runtime/debug"
"slices"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
libp2pconfig "github.com/libp2p/go-libp2p/config"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/libp2p/go-libp2p/p2p/discovery/util"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
routedhost "github.com/libp2p/go-libp2p/p2p/host/routed"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/libp2p/go-libp2p/p2p/net/gostream"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/mr-tron/base58"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
mn "github.com/multiformats/go-multiaddr/net"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
grpcpeer "google.golang.org/grpc/peer"
"google.golang.org/protobuf/types/known/wrapperspb"
"source.quilibrium.com/quilibrium/monorepo/config"
blossomsub "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub"
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
"source.quilibrium.com/quilibrium/monorepo/node/internal/observability"
"source.quilibrium.com/quilibrium/monorepo/node/p2p/internal"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/channel"
"source.quilibrium.com/quilibrium/monorepo/types/p2p"
up2p "source.quilibrium.com/quilibrium/monorepo/utils/p2p"
)
const (
DecayInterval = 10 * time.Minute
AppDecay = .9
)
// ConfigDir is a distinct type for the configuration directory path
// Used by Wire for dependency injection
type ConfigDir string
type appScore struct {
expire time.Time
score float64
}
type BlossomSub struct {
ps *blossomsub.PubSub
ctx context.Context
cancel context.CancelFunc
logger *zap.Logger
peerID peer.ID
derivedPeerID peer.ID
bitmaskMap map[string]*blossomsub.Bitmask
// Track which bit slices belong to which original bitmasks, used to reference
// count bitmasks for closed subscriptions
subscriptionTracker map[string][][]byte
subscriptions []*blossomsub.Subscription
subscriptionMutex sync.RWMutex
h host.Host
signKey crypto.PrivKey
peerScore map[string]*appScore
peerScoreMx sync.Mutex
bootstrap internal.PeerConnector
discovery internal.PeerConnector
manualReachability atomic.Pointer[bool]
p2pConfig config.P2PConfig
dht *dht.IpfsDHT
coreId uint
configDir ConfigDir
}
var _ p2p.PubSub = (*BlossomSub)(nil)
var ErrNoPeersAvailable = errors.New("no peers available")
var ANNOUNCE_PREFIX = "quilibrium-2.0.2-dusk-"
var connectivityServiceProtocolID = protocol.ID("/quilibrium/connectivity/1.0.0")
func getPeerID(p2pConfig *config.P2PConfig) peer.ID {
peerPrivKey, err := hex.DecodeString(p2pConfig.PeerPrivKey)
if err != nil {
log.Panic("error unmarshaling peerkey", zap.Error(err))
}
privKey, err := crypto.UnmarshalEd448PrivateKey(peerPrivKey)
if err != nil {
log.Panic("error unmarshaling peerkey", zap.Error(err))
}
pub := privKey.GetPublic()
id, err := peer.IDFromPublicKey(pub)
if err != nil {
log.Panic("error getting peer id", zap.Error(err))
}
return id
}
// NewBlossomSubWithHost creates a new blossomsub instance with a pre-defined
// host. This method is intended for integration tests that need something
// more realistic for pubsub purposes, with a supplied simulator (see
// node/tests/simnet.go for utility methods to construct a flaky host)
func NewBlossomSubWithHost(
p2pConfig *config.P2PConfig,
engineConfig *config.EngineConfig,
logger *zap.Logger,
coreId uint,
isBootstrapPeer bool,
host host.Host,
privKey crypto.PrivKey,
bootstrapHosts []host.Host,
) *BlossomSub {
ctx, cancel := context.WithCancel(context.Background())
if coreId == 0 {
logger = logger.With(zap.String("process", "master"))
} else {
logger = logger.With(zap.String(
"process",
fmt.Sprintf("worker %d", coreId),
))
}
bs := &BlossomSub{
ctx: ctx,
cancel: cancel,
logger: logger,
bitmaskMap: make(map[string]*blossomsub.Bitmask),
subscriptionTracker: make(map[string][][]byte),
signKey: privKey,
peerScore: make(map[string]*appScore),
p2pConfig: *p2pConfig,
coreId: coreId,
}
idService := internal.IDServiceFromHost(host)
logger.Info("established peer id", zap.String("peer_id", host.ID().String()))
bootstrappers := []peer.AddrInfo{}
for _, bh := range bootstrapHosts {
// manually construct the p2p string, kind of kludgy, but this is intended
// for use with tests
ai, err := peer.AddrInfoFromString(
bh.Addrs()[0].String() + "/p2p/" + bh.ID().String(),
)
if err != nil {
panic(fmt.Sprintf("error for addr %v, %+v:", bh.Addrs()[0], err))
}
bootstrappers = append(bootstrappers, *ai)
}
kademliaDHT := initDHT(
ctx,
logger,
host,
isBootstrapPeer,
bootstrappers,
p2pConfig.Network,
)
host = routedhost.Wrap(host, kademliaDHT)
routingDiscovery := routing.NewRoutingDiscovery(kademliaDHT)
util.Advertise(ctx, routingDiscovery, getNetworkNamespace(p2pConfig.Network))
minBootstrapPeers := min(len(bootstrappers), p2pConfig.MinBootstrapPeers)
bootstrap := internal.NewPeerConnector(
ctx,
zap.NewNop(),
host,
idService,
minBootstrapPeers,
p2pConfig.BootstrapParallelism,
internal.NewStaticPeerSource(bootstrappers, true),
)
if err := bootstrap.Connect(ctx); err != nil {
logger.Panic("error connecting to bootstrap peers", zap.Error(err))
}
bs.bootstrap = bootstrap
discovery := internal.NewPeerConnector(
ctx,
zap.NewNop(),
host,
idService,
p2pConfig.D,
p2pConfig.DiscoveryParallelism,
internal.NewRoutingDiscoveryPeerSource(
routingDiscovery,
getNetworkNamespace(p2pConfig.Network),
p2pConfig.DiscoveryPeerLookupLimit,
),
)
if err := discovery.Connect(ctx); err != nil {
logger.Panic("error connecting to discovery peers", zap.Error(err))
}
discovery = internal.NewChainedPeerConnector(ctx, bootstrap, discovery)
bs.discovery = discovery
internal.MonitorPeers(
ctx,
logger.Named("peer-monitor"),
host,
p2pConfig.PingTimeout,
p2pConfig.PingPeriod,
p2pConfig.PingAttempts,
nil,
)
var tracer *blossomsub.JSONTracer
var err error
if p2pConfig.TraceLogStdout {
tracer, err = blossomsub.NewStdoutJSONTracer()
if err != nil {
panic(errors.Wrap(err, "error building stdout tracer"))
}
} else if p2pConfig.TraceLogFile != "" {
tracer, err = blossomsub.NewJSONTracer(p2pConfig.TraceLogFile)
if err != nil {
logger.Panic("error building file tracer", zap.Error(err))
}
}
blossomOpts := []blossomsub.Option{
blossomsub.WithStrictSignatureVerification(true),
blossomsub.WithValidateQueueSize(blossomsub.DefaultValidateQueueSize),
blossomsub.WithValidateWorkers(1),
blossomsub.WithPeerOutboundQueueSize(
blossomsub.DefaultPeerOutboundQueueSize,
),
}
if tracer != nil {
blossomOpts = append(blossomOpts, blossomsub.WithEventTracer(tracer))
}
blossomOpts = append(blossomOpts, blossomsub.WithPeerScore(
&blossomsub.PeerScoreParams{
SkipAtomicValidation: false,
BitmaskScoreCap: 0,
IPColocationFactorWeight: 0,
IPColocationFactorThreshold: 6,
BehaviourPenaltyWeight: -10,
BehaviourPenaltyThreshold: 100,
BehaviourPenaltyDecay: .5,
DecayInterval: DecayInterval,
DecayToZero: .1,
RetainScore: 60 * time.Minute,
AppSpecificScore: func(p peer.ID) float64 {
return float64(bs.GetPeerScore([]byte(p)))
},
AppSpecificWeight: 10.0,
},
&blossomsub.PeerScoreThresholds{
SkipAtomicValidation: false,
GossipThreshold: -500,
PublishThreshold: -1000,
GraylistThreshold: -2500,
AcceptPXThreshold: 1000,
OpportunisticGraftThreshold: 3.5,
},
))
blossomOpts = append(blossomOpts, observability.WithPrometheusRawTracer())
if p2pConfig.Network == 0 {
logger.Info("enabling blacklist for bootstrappers for blossomsub")
blossomOpts = append(blossomOpts, blossomsub.WithPeerFilter(
internal.NewStaticPeerFilter(
[]peer.ID{},
internal.PeerAddrInfosToPeerIDSlice(bootstrappers),
true,
),
))
}
blossomOpts = append(blossomOpts, blossomsub.WithDiscovery(
internal.NewPeerConnectorDiscovery(discovery),
))
blossomOpts = append(blossomOpts, blossomsub.WithMessageIdFn(
func(pmsg *pb.Message) []byte {
id := sha256.Sum256(pmsg.Data)
return id[:]
}),
)
params := toBlossomSubParams(p2pConfig)
rt := blossomsub.NewBlossomSubRouter(host, params, bs.p2pConfig.Network)
blossomOpts = append(blossomOpts, rt.WithDefaultTagTracer())
pubsub, err := blossomsub.NewBlossomSubWithRouter(ctx, host, rt, blossomOpts...)
if err != nil {
logger.Panic("error creating pubsub", zap.Error(err))
}
peerID := host.ID()
bs.dht = kademliaDHT
bs.ps = pubsub
bs.peerID = peerID
bs.h = host
bs.signKey = privKey
bs.derivedPeerID = peerID
go bs.background(ctx)
bs.initConnectivityServices(isBootstrapPeer, bootstrappers)
return bs
}
func NewBlossomSub(
p2pConfig *config.P2PConfig,
engineConfig *config.EngineConfig,
logger *zap.Logger,
coreId uint,
configDir ConfigDir,
) *BlossomSub {
ctx := context.Background()
// Determine the appropriate listen address based on coreId
var listenAddr string
if coreId == 0 {
logger = logger.With(zap.String("process", "master"))
// For main node (coreId == 0), use the standard p2pConfig.ListenMultiaddr
listenAddr = p2pConfig.ListenMultiaddr
} else {
logger = logger.With(zap.String(
"process",
fmt.Sprintf("worker %d", coreId),
))
// For data workers (coreId > 0), check if DataWorkerP2PMultiaddrs is
// provided
if engineConfig != nil && len(engineConfig.DataWorkerP2PMultiaddrs) > 0 &&
int(coreId-1) < len(engineConfig.DataWorkerP2PMultiaddrs) {
listenAddr = engineConfig.DataWorkerP2PMultiaddrs[coreId-1]
logger.Info(
"Using configured data worker P2P multiaddr",
zap.String("multiaddr", listenAddr),
zap.Uint("core_id", coreId),
)
} else if engineConfig != nil && engineConfig.DataWorkerBaseP2PPort > 0 {
port := engineConfig.DataWorkerBaseP2PPort + uint16(coreId-1)
listenAddr = fmt.Sprintf(engineConfig.DataWorkerBaseListenMultiaddr, port)
logger.Info(
"worker p2p listen address calculated",
zap.String("multiaddr", listenAddr),
zap.Uint("core_id", coreId),
zap.Uint16("port", port),
)
} else {
logger.Error(
"no data worker configuration found",
zap.Uint("core_id", coreId),
)
time.Sleep(120 * time.Second)
panic("no data worker configuration found")
}
}
opts := []libp2pconfig.Option{
libp2p.ListenAddrStrings(listenAddr),
libp2p.EnableNATService(),
libp2p.NATPortMap(),
}
isBootstrapPeer := false
if coreId == 0 {
peerId := getPeerID(p2pConfig)
if p2pConfig.Network == 0 {
for _, peerAddr := range config.BootstrapPeers {
peerinfo, err := peer.AddrInfoFromString(peerAddr)
if err != nil {
logger.Panic("error getting peer info", zap.Error(err))
}
if bytes.Equal([]byte(peerinfo.ID), []byte(peerId)) {
isBootstrapPeer = true
break
}
}
} else {
for _, peerAddr := range p2pConfig.BootstrapPeers {
peerinfo, err := peer.AddrInfoFromString(peerAddr)
if err != nil {
logger.Panic("error getting peer info", zap.Error(err))
}
if bytes.Equal([]byte(peerinfo.ID), []byte(peerId)) {
isBootstrapPeer = true
break
}
}
}
}
defaultBootstrapPeers := append([]string{}, p2pConfig.BootstrapPeers...)
if p2pConfig.Network == 0 {
defaultBootstrapPeers = config.BootstrapPeers
}
bootstrappers := []peer.AddrInfo{}
for _, peerAddr := range defaultBootstrapPeers {
peerinfo, err := peer.AddrInfoFromString(peerAddr)
if err != nil {
logger.Panic("error getting peer info", zap.Error(err))
}
bootstrappers = append(bootstrappers, *peerinfo)
}
var privKey crypto.PrivKey
var derivedPeerId peer.ID
if p2pConfig.PeerPrivKey != "" {
peerPrivKey, err := hex.DecodeString(p2pConfig.PeerPrivKey)
if err != nil {
logger.Panic("error unmarshaling peerkey", zap.Error(err))
}
privKey, err = crypto.UnmarshalEd448PrivateKey(peerPrivKey)
if err != nil {
logger.Panic("error unmarshaling peerkey", zap.Error(err))
}
derivedPeerId, err = peer.IDFromPrivateKey(privKey)
if err != nil {
logger.Panic("error deriving peer id", zap.Error(err))
}
if coreId == 0 {
opts = append(opts, libp2p.Identity(privKey))
} else {
workerKey, _, err := crypto.GenerateEd448Key(rand.Reader)
if err != nil {
logger.Panic("error generating worker peerkey", zap.Error(err))
}
opts = append(opts, libp2p.Identity(workerKey))
}
}
allowedPeers := []peer.AddrInfo{}
allowedPeers = append(allowedPeers, bootstrappers...)
directPeers := []peer.AddrInfo{}
if len(p2pConfig.DirectPeers) > 0 {
logger.Info("found direct peers in config")
for _, peerAddr := range p2pConfig.DirectPeers {
peerinfo, err := peer.AddrInfoFromString(peerAddr)
if err != nil {
logger.Panic("error getting peer info", zap.Error(err))
}
logger.Info(
"adding direct peer",
zap.String("peer", peerinfo.ID.String()),
)
directPeers = append(directPeers, *peerinfo)
}
}
allowedPeers = append(allowedPeers, directPeers...)
opts = append(
opts,
libp2p.SwarmOpts(),
)
if p2pConfig.LowWatermarkConnections != -1 &&
p2pConfig.HighWatermarkConnections != -1 {
cm, err := connmgr.NewConnManager(
p2pConfig.LowWatermarkConnections,
p2pConfig.HighWatermarkConnections,
)
if err != nil {
logger.Panic("error creating connection manager", zap.Error(err))
}
rm, err := resourceManager(
p2pConfig.HighWatermarkConnections,
allowedPeers,
)
if err != nil {
logger.Panic("error creating resource manager", zap.Error(err))
}
opts = append(opts, libp2p.ConnectionManager(cm))
opts = append(opts, libp2p.ResourceManager(rm))
}
ctx, cancel := context.WithCancel(ctx)
bs := &BlossomSub{
ctx: ctx,
cancel: cancel,
logger: logger,
bitmaskMap: make(map[string]*blossomsub.Bitmask),
subscriptionTracker: make(map[string][][]byte),
signKey: privKey,
peerScore: make(map[string]*appScore),
p2pConfig: *p2pConfig,
derivedPeerID: derivedPeerId,
coreId: coreId,
configDir: configDir,
}
h, err := libp2p.New(opts...)
if err != nil {
logger.Panic("error constructing p2p", zap.Error(err))
}
idService := internal.IDServiceFromHost(h)
logger.Info("established peer id", zap.String("peer_id", h.ID().String()))
kademliaDHT := initDHT(
ctx,
logger,
h,
isBootstrapPeer,
bootstrappers,
p2pConfig.Network,
)
h = routedhost.Wrap(h, kademliaDHT)
routingDiscovery := routing.NewRoutingDiscovery(kademliaDHT)
util.Advertise(ctx, routingDiscovery, getNetworkNamespace(p2pConfig.Network))
minBootstrapPeers := min(len(bootstrappers), p2pConfig.MinBootstrapPeers)
bootstrap := internal.NewPeerConnector(
ctx,
logger.Named("bootstrap"),
h,
idService,
minBootstrapPeers,
p2pConfig.BootstrapParallelism,
internal.NewStaticPeerSource(bootstrappers, true),
)
if err := bootstrap.Connect(ctx); err != nil {
logger.Panic("error connecting to bootstrap peers", zap.Error(err))
}
bootstrap = internal.NewConditionalPeerConnector(
ctx,
internal.NewNotEnoughPeersCondition(
h,
minBootstrapPeers,
internal.PeerAddrInfosToPeerIDMap(bootstrappers),
),
bootstrap,
)
bs.bootstrap = bootstrap
discovery := internal.NewPeerConnector(
ctx,
logger.Named("discovery"),
h,
idService,
p2pConfig.D,
p2pConfig.DiscoveryParallelism,
internal.NewRoutingDiscoveryPeerSource(
routingDiscovery,
getNetworkNamespace(p2pConfig.Network),
p2pConfig.DiscoveryPeerLookupLimit,
),
)
if err := discovery.Connect(ctx); err != nil {
logger.Panic("error connecting to discovery peers", zap.Error(err))
}
discovery = internal.NewChainedPeerConnector(ctx, bootstrap, discovery)
bs.discovery = discovery
internal.MonitorPeers(
ctx,
logger.Named("peer-monitor"),
h,
p2pConfig.PingTimeout,
p2pConfig.PingPeriod,
p2pConfig.PingAttempts,
directPeers,
)
var tracer *blossomsub.JSONTracer
if p2pConfig.TraceLogStdout {
tracer, err = blossomsub.NewStdoutJSONTracer()
if err != nil {
panic(errors.Wrap(err, "error building stdout tracer"))
}
} else if p2pConfig.TraceLogFile != "" {
tracer, err = blossomsub.NewJSONTracer(p2pConfig.TraceLogFile)
if err != nil {
logger.Panic("error building file tracer", zap.Error(err))
}
}
blossomOpts := []blossomsub.Option{
blossomsub.WithStrictSignatureVerification(true),
}
if len(directPeers) > 0 {
blossomOpts = append(blossomOpts, blossomsub.WithDirectPeers(directPeers))
}
if tracer != nil {
blossomOpts = append(blossomOpts, blossomsub.WithEventTracer(tracer))
}
GLOBAL_CONSENSUS_BITMASK := []byte{0x00}
GLOBAL_FRAME_BITMASK := []byte{0x00, 0x00}
GLOBAL_PROVER_BITMASK := []byte{0x00, 0x00, 0x00}
GLOBAL_PEER_INFO_BITMASK := []byte{0x00, 0x00, 0x00, 0x00}
GLOBAL_ALERT_BITMASK := bytes.Repeat([]byte{0x00}, 16)
sets := getBitmaskSets(bytes.Repeat([]byte{0xff}, 32))
sets = slices.Concat([][]byte{
GLOBAL_CONSENSUS_BITMASK,
GLOBAL_FRAME_BITMASK,
GLOBAL_PROVER_BITMASK,
GLOBAL_PEER_INFO_BITMASK,
GLOBAL_ALERT_BITMASK,
}, sets)
bitmasksScoring := map[string]*blossomsub.BitmaskScoreParams{}
for _, set := range sets {
bitmasksScoring[string(set)] = &blossomsub.BitmaskScoreParams{
SkipAtomicValidation: false,
BitmaskWeight: 0.1,
TimeInMeshWeight: 0.00027,
TimeInMeshQuantum: time.Second,
TimeInMeshCap: 1,
FirstMessageDeliveriesWeight: 5,
FirstMessageDeliveriesDecay: blossomsub.ScoreParameterDecay(
10 * time.Minute,
),
FirstMessageDeliveriesCap: 10000,
InvalidMessageDeliveriesWeight: -1000,
InvalidMessageDeliveriesDecay: blossomsub.ScoreParameterDecay(time.Hour),
}
}
if p2pConfig.Network != 0 {
blossomOpts = append(blossomOpts, blossomsub.WithPeerScore(
&blossomsub.PeerScoreParams{
SkipAtomicValidation: false,
Bitmasks: bitmasksScoring,
BitmaskScoreCap: 0,
IPColocationFactorWeight: 0,
IPColocationFactorThreshold: 6,
BehaviourPenaltyWeight: -10,
BehaviourPenaltyThreshold: 6,
BehaviourPenaltyDecay: .5,
DecayInterval: DecayInterval,
DecayToZero: .1,
RetainScore: 60 * time.Minute,
AppSpecificScore: func(p peer.ID) float64 {
return float64(bs.GetPeerScore([]byte(p)))
},
AppSpecificWeight: 10.0,
},
&blossomsub.PeerScoreThresholds{
SkipAtomicValidation: false,
GossipThreshold: -500,
PublishThreshold: -1000,
GraylistThreshold: -2500,
AcceptPXThreshold: 1000,
OpportunisticGraftThreshold: 3.5,
},
))
} else {
whitelist := []*net.IPNet{}
for _, p := range directPeers {
for _, i := range p.Addrs {
ipnet, err := MultiaddrToIPNet(i)
if err != nil {
logger.Error(
"could not convert direct peer for ip colocation whitelist",
zap.String("peer_addr", i.String()),
zap.Error(err),
)
}
whitelist = append(whitelist, ipnet)
}
}
blossomOpts = append(blossomOpts, blossomsub.WithPeerScore(
&blossomsub.PeerScoreParams{
SkipAtomicValidation: false,
Bitmasks: bitmasksScoring,
BitmaskScoreCap: 0,
IPColocationFactorWeight: -100,
IPColocationFactorThreshold: 6,
IPColocationFactorWhitelist: whitelist,
BehaviourPenaltyWeight: -10,
BehaviourPenaltyThreshold: 6,
BehaviourPenaltyDecay: .5,
DecayInterval: DecayInterval,
DecayToZero: .1,
RetainScore: 60 * time.Minute,
AppSpecificScore: func(p peer.ID) float64 {
return float64(bs.GetPeerScore([]byte(p)))
},
AppSpecificWeight: 10.0,
},
&blossomsub.PeerScoreThresholds{
SkipAtomicValidation: false,
GossipThreshold: -500,
PublishThreshold: -1000,
GraylistThreshold: -2500,
AcceptPXThreshold: 1000,
OpportunisticGraftThreshold: 3.5,
},
))
}
blossomOpts = append(blossomOpts,
blossomsub.WithValidateQueueSize(p2pConfig.ValidateQueueSize),
blossomsub.WithValidateWorkers(p2pConfig.ValidateWorkers),
blossomsub.WithPeerOutboundQueueSize(p2pConfig.PeerOutboundQueueSize),
)
blossomOpts = append(blossomOpts, observability.WithPrometheusRawTracer())
if p2pConfig.Network == 0 {
logger.Info("enabling blacklist for bootstrappers for blossomsub")
blossomOpts = append(blossomOpts, blossomsub.WithPeerFilter(
internal.NewStaticPeerFilter(
[]peer.ID{},
internal.PeerAddrInfosToPeerIDSlice(bootstrappers),
true,
),
))
}
blossomOpts = append(blossomOpts, blossomsub.WithDiscovery(
internal.NewPeerConnectorDiscovery(discovery),
))
blossomOpts = append(blossomOpts, blossomsub.WithMessageIdFn(
func(pmsg *pb.Message) []byte {
id := sha256.Sum256(pmsg.Data)
return id[:]
}),
)
params := toBlossomSubParams(p2pConfig)
rt := blossomsub.NewBlossomSubRouter(h, params, bs.p2pConfig.Network)
blossomOpts = append(blossomOpts, rt.WithDefaultTagTracer())
pubsub, err := blossomsub.NewBlossomSubWithRouter(ctx, h, rt, blossomOpts...)
if err != nil {
logger.Panic("error creating pubsub", zap.Error(err))
}
peerID := h.ID()
bs.dht = kademliaDHT
bs.ps = pubsub
bs.peerID = peerID
bs.h = h
bs.signKey = privKey
bs.initConnectivityServices(isBootstrapPeer, bootstrappers)
go bs.background(ctx)
return bs
}
// adjusted from Lotus' reference implementation, addressing
// https://github.com/libp2p/go-libp2p/issues/1640
func resourceManager(highWatermark int, allowed []peer.AddrInfo) (
network.ResourceManager,
error,
) {
defaultLimits := rcmgr.DefaultLimits
libp2p.SetDefaultServiceLimits(&defaultLimits)
defaultLimits.SystemBaseLimit.Memory = 1 << 28
defaultLimits.SystemLimitIncrease.Memory = 1 << 28
defaultLimitConfig := defaultLimits.AutoScale()
changes := rcmgr.PartialLimitConfig{}
if defaultLimitConfig.ToPartialLimitConfig().System.Memory > 2<<30 {
changes.System.Memory = 2 << 30
}
maxconns := uint(highWatermark)
if rcmgr.LimitVal(3*maxconns) > defaultLimitConfig.
ToPartialLimitConfig().System.ConnsInbound {
changes.System.ConnsInbound = rcmgr.LimitVal(1 << bits.Len(3*maxconns))
changes.System.ConnsOutbound = rcmgr.LimitVal(1 << bits.Len(3*maxconns))
changes.System.Conns = rcmgr.LimitVal(1 << bits.Len(6*maxconns))
changes.System.StreamsInbound = rcmgr.LimitVal(1 << bits.Len(36*maxconns))
changes.System.StreamsOutbound = rcmgr.LimitVal(1 << bits.Len(216*maxconns))
changes.System.Streams = rcmgr.LimitVal(1 << bits.Len(216*maxconns))
if rcmgr.LimitVal(3*maxconns) > defaultLimitConfig.
ToPartialLimitConfig().System.FD {
changes.System.FD = rcmgr.LimitVal(1 << bits.Len(3*maxconns))
}
changes.ServiceDefault.StreamsInbound = rcmgr.LimitVal(
1 << bits.Len(12*maxconns),
)
changes.ServiceDefault.StreamsOutbound = rcmgr.LimitVal(
1 << bits.Len(48*maxconns),
)
changes.ServiceDefault.Streams = rcmgr.LimitVal(1 << bits.Len(48*maxconns))
changes.ProtocolDefault.StreamsInbound = rcmgr.LimitVal(
1 << bits.Len(12*maxconns),
)
changes.ProtocolDefault.StreamsOutbound = rcmgr.LimitVal(
1 << bits.Len(48*maxconns),
)
changes.ProtocolDefault.Streams = rcmgr.LimitVal(1 << bits.Len(48*maxconns))
}
changedLimitConfig := changes.Build(defaultLimitConfig)
limiter := rcmgr.NewFixedLimiter(changedLimitConfig)
str, err := rcmgr.NewStatsTraceReporter()
if err != nil {
return nil, errors.Wrap(err, "resource manager")
}
rcmgr.MustRegisterWith(prometheus.DefaultRegisterer)
// Metrics
opts := append(
[]rcmgr.Option{},
rcmgr.WithTraceReporter(str),
)
resolver := madns.DefaultResolver
var allowedMaddrs []ma.Multiaddr
for _, pi := range allowed {
for _, addr := range pi.Addrs {
resolved, err := resolver.Resolve(context.Background(), addr)
if err != nil {
continue
}
allowedMaddrs = append(allowedMaddrs, resolved...)
}
}
opts = append(opts, rcmgr.WithAllowlistedMultiaddrs(allowedMaddrs))
mgr, err := rcmgr.NewResourceManager(limiter, opts...)
if err != nil {
return nil, errors.Wrap(err, "resource manager")
}
return mgr, nil
}
func (b *BlossomSub) background(ctx context.Context) {
refreshScores := time.NewTicker(DecayInterval)
defer refreshScores.Stop()
for {
select {
case <-refreshScores.C:
b.refreshScores()
case <-ctx.Done():
return
}
}
}
func (b *BlossomSub) refreshScores() {
b.peerScoreMx.Lock()
now := time.Now()
for p, pstats := range b.peerScore {
if now.After(pstats.expire) {
delete(b.peerScore, p)
continue
}
pstats.score *= AppDecay
if math.Abs(pstats.score) < .1 {
pstats.score = 0
}
}
b.peerScoreMx.Unlock()
}
func (b *BlossomSub) PublishToBitmask(bitmask []byte, data []byte) error {
err := b.ps.Publish(
b.ctx,
bitmask,
data,
blossomsub.WithSecretKeyAndPeerId(b.signKey, b.derivedPeerID),
)
if err != nil && errors.Is(err, blossomsub.ErrBitmaskClosed) &&
b.p2pConfig.Network == 99 {
// Ignore bitmask closed errors for devnet
return nil
}
return errors.Wrap(
errors.Wrapf(err, "bitmask: %x", bitmask),
"publish to bitmask",
)
}
func (b *BlossomSub) Publish(address []byte, data []byte) error {
bitmask := up2p.GetBloomFilter(address, 256, 3)
return b.PublishToBitmask(bitmask, data)
}
func (b *BlossomSub) Subscribe(
bitmask []byte,
handler func(message *pb.Message) error,
) error {
b.logger.Info("joining broadcast")
bm, err := b.ps.Join(bitmask)
if err != nil {
b.logger.Error("join failed", zap.Error(err))
return errors.Wrap(err, "subscribe")
}
b.logger.Info(
"subscribe to bitmask",
zap.String("bitmask", hex.EncodeToString(bitmask)),
)
// Track the bit slices for this subscription
b.subscriptionMutex.Lock()
bitSlices := make([][]byte, 0, len(bm))
for _, bit := range bm {
sliceCopy := make([]byte, len(bit.Bitmask()))
copy(sliceCopy, bit.Bitmask())
bitSlices = append(bitSlices, sliceCopy)
}
b.subscriptionTracker[string(bitmask)] = bitSlices
b.subscriptionMutex.Unlock()
// If the bitmask count is greater than three, this is a broad subscribe
// and the caller is expected to handle disambiguation of addresses
exact := len(bm) <= 3
subs := []*blossomsub.Subscription{}
for _, bit := range bm {
sub, err := bit.Subscribe(
blossomsub.WithBufferSize(b.p2pConfig.SubscriptionQueueSize),
)
if err != nil {
b.logger.Error("subscription failed", zap.Error(err))
// Clean up on failure
b.subscriptionMutex.Lock()
delete(b.subscriptionTracker, string(bitmask))
b.subscriptionMutex.Unlock()
return errors.Wrap(err, "subscribe")
}
b.subscriptionMutex.Lock()
_, ok := b.bitmaskMap[string(bit.Bitmask())]
if !ok {
b.bitmaskMap[string(bit.Bitmask())] = bit
}
b.subscriptionMutex.Unlock()
subs = append(subs, sub)
}
b.logger.Info(
"begin streaming from bitmask",
zap.String("bitmask", hex.EncodeToString(bitmask)),
)
// Track subscriptions for cleanup on Close
b.subscriptionMutex.Lock()
b.subscriptions = append(b.subscriptions, subs...)
b.subscriptionMutex.Unlock()
for _, sub := range subs {
copiedBitmask := make([]byte, len(bitmask))
copy(copiedBitmask[:], bitmask[:])
sub := sub
go func() {
for {
if !b.subscribeHandler(sub, copiedBitmask, exact, handler) {
return
}
}
}()
}
b.logger.Info(
"successfully subscribed to bitmask",
zap.String("bitmask", hex.EncodeToString(bitmask)),
)
return nil
}
// subscribeHandler processes a single message from the subscription.
// Returns true if the loop should continue, false if it should exit.
func (b *BlossomSub) subscribeHandler(
sub *blossomsub.Subscription,
copiedBitmask []byte,
exact bool,
handler func(message *pb.Message) error,
) bool {
defer func() {
if r := recover(); r != nil {
b.logger.Error(
"message handler panicked, recovering",
zap.Any("panic", r),
zap.String("stack", string(debug.Stack())),
)
}
}()
m, err := sub.Next(b.ctx)
if err != nil {
// Context cancelled or subscription closed - exit the loop
b.logger.Debug(
"subscription exiting",
zap.Error(err),
)
return false
}
if m == nil {
// Subscription closed
return false
}
if bytes.Equal(m.Bitmask, copiedBitmask) || !exact {
if err = handler(m.Message); err != nil {
b.logger.Debug("message handler returned error", zap.Error(err))
}
}
return true
}
func (b *BlossomSub) Unsubscribe(bitmask []byte, raw bool) {
b.subscriptionMutex.Lock()
defer b.subscriptionMutex.Unlock()
bitmaskKey := string(bitmask)
bitSlices, exists := b.subscriptionTracker[bitmaskKey]
if !exists {
b.logger.Warn(
"attempted to unsubscribe from unknown bitmask",
zap.String("bitmask", hex.EncodeToString(bitmask)),
)
return
}
b.logger.Info(
"unsubscribing from bitmask",
zap.String("bitmask", hex.EncodeToString(bitmask)),
)
// Check each bit slice to see if it's still needed by other subscriptions
for _, bitSlice := range bitSlices {
bitSliceKey := string(bitSlice)
// Check if any other subscription is using this bit slice
stillNeeded := false
for otherKey, otherSlices := range b.subscriptionTracker {
if otherKey == bitmaskKey {
continue // Skip the subscription we're removing
}
for _, otherSlice := range otherSlices {
if bytes.Equal(otherSlice, bitSlice) {
stillNeeded = true
break
}
}
if stillNeeded {
break
}
}
// Only close the bitmask if no other subscription needs it
if !stillNeeded {
if bm, ok := b.bitmaskMap[bitSliceKey]; ok {
b.logger.Debug(
"closing bit slice",
zap.String("bit_slice", hex.EncodeToString(bitSlice)),
)
bm.Close()
delete(b.bitmaskMap, bitSliceKey)
}
} else {
b.logger.Debug(
"bit slice still needed by other subscription",
zap.String("bit_slice", hex.EncodeToString(bitSlice)),
)
}
}
// Remove the subscription from tracker
delete(b.subscriptionTracker, bitmaskKey)
}
func (b *BlossomSub) RegisterValidator(
bitmask []byte,
validator func(peerID peer.ID, message *pb.Message) p2p.ValidationResult,
sync bool,
) error {
validatorEx := func(
ctx context.Context, peerID peer.ID, message *blossomsub.Message,
) blossomsub.ValidationResult {
switch v := validator(peerID, message.Message); v {
case p2p.ValidationResultAccept:
return blossomsub.ValidationAccept
case p2p.ValidationResultReject:
return blossomsub.ValidationReject
case p2p.ValidationResultIgnore:
return blossomsub.ValidationIgnore
default:
panic("unreachable")
}
}
var _ blossomsub.ValidatorEx = validatorEx
return b.ps.RegisterBitmaskValidator(
bitmask,
validatorEx,
blossomsub.WithValidatorInline(sync),
)
}
func (b *BlossomSub) UnregisterValidator(bitmask []byte) error {
return b.ps.UnregisterBitmaskValidator(bitmask)
}
func (b *BlossomSub) GetPeerID() []byte {
return []byte(b.derivedPeerID)
}
func (b *BlossomSub) GetRandomPeer(bitmask []byte) ([]byte, error) {
peers := b.ps.ListPeers(bitmask)
if len(peers) == 0 {
return nil, errors.Wrap(
ErrNoPeersAvailable,
"get random peer",
)
}
b.logger.Debug("selecting from peers", zap.Any("peer_ids", peers))
sel, err := rand.Int(rand.Reader, big.NewInt(int64(len(peers))))
if err != nil {
return nil, errors.Wrap(err, "get random peer")
}
return []byte(peers[sel.Int64()]), nil
}
func (b *BlossomSub) IsPeerConnected(peerId []byte) bool {
peerID := peer.ID(peerId)
connectedness := b.h.Network().Connectedness(peerID)
return connectedness == network.Connected || connectedness == network.Limited
}
func (b *BlossomSub) Reachability() *wrapperspb.BoolValue {
if manual := b.manualReachability.Load(); manual != nil {
return wrapperspb.Bool(*manual)
}
reachability := b.manualReachability.Load()
if reachability == nil {
return nil
}
return &wrapperspb.BoolValue{Value: *reachability}
}
func (b *BlossomSub) initConnectivityServices(
isBootstrapPeer bool,
bootstrappers []peer.AddrInfo,
) {
if b.p2pConfig.Network != 0 {
return
}
if b.h == nil {
return
}
if isBootstrapPeer {
b.startConnectivityService()
return
}
clone := make([]peer.AddrInfo, len(bootstrappers))
copy(clone, bootstrappers)
b.blockUntilConnectivityTest(clone)
}
func (b *BlossomSub) startConnectivityService() {
// Use raw TCP listener on port 8340
listenAddr := "0.0.0.0:8340"
listener, err := net.Listen("tcp", listenAddr)
if err != nil {
b.logger.Error("failed to start connectivity service", zap.Error(err))
return
}
b.logger.Info("started connectivity service", zap.String("addr", listenAddr))
server := grpc.NewServer()
protobufs.RegisterConnectivityServiceServer(
server,
newConnectivityService(b.logger.Named("connectivity-service"), b.h),
)
go func() {
if err := server.Serve(listener); err != nil &&
!errors.Is(err, net.ErrClosed) {
b.logger.Error("connectivity service exited", zap.Error(err))
}
}()
go func() {
<-b.ctx.Done()
server.GracefulStop()
_ = listener.Close()
}()
}
func (b *BlossomSub) blockUntilConnectivityTest(bootstrappers []peer.AddrInfo) {
if len(bootstrappers) == 0 {
b.logger.Warn("connectivity test skipped, no bootstrap peers available")
return
}
// Check if we have a recent successful connectivity check cached
if b.isConnectivityCacheValid() {
b.logger.Info("skipping connectivity test, recent successful check cached",
zap.Uint("core_id", b.coreId))
b.recordManualReachability(true)
return
}
delay := time.NewTimer(10 * time.Second)
defer delay.Stop()
select {
case <-delay.C:
case <-b.ctx.Done():
b.logger.Info("connectivity test cancelled before start, context done")
return
}
backoff := 10 * time.Second
for {
if err := b.runConnectivityTest(b.ctx, bootstrappers); err == nil {
// Write the cache on successful connectivity test
b.writeConnectivityCache()
return
} else {
b.logger.Warn("connectivity test failed, retrying", zap.Error(err))
}
wait := time.NewTimer(backoff)
select {
case <-wait.C:
wait.Stop()
case <-b.ctx.Done():
wait.Stop()
b.logger.Info("connectivity test cancelled, context done")
return
}
}
}
func (b *BlossomSub) runConnectivityTest(
ctx context.Context,
bootstrappers []peer.AddrInfo,
) error {
candidates := make([]peer.AddrInfo, 0, len(bootstrappers))
for _, info := range bootstrappers {
if info.ID == b.h.ID() {
continue
}
if strings.Contains(info.Addrs[0].String(), "dnsaddr") {
candidates = append(candidates, info)
}
}
if len(candidates) == 0 {
return errors.New("connectivity test: no bootstrap peers available")
}
selection, err := rand.Int(rand.Reader, big.NewInt(int64(len(candidates))))
if err != nil {
return errors.Wrap(err, "connectivity test peer selection")
}
target := candidates[selection.Int64()]
return b.invokeConnectivityTest(ctx, target)
}
func (b *BlossomSub) invokeConnectivityTest(
ctx context.Context,
target peer.AddrInfo,
) error {
dialCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
var targetAddr string
for _, addr := range target.Addrs {
host, err := addr.ValueForProtocol(ma.P_IP4)
if err != nil {
host, err = addr.ValueForProtocol(ma.P_IP6)
if err != nil {
host, err = addr.ValueForProtocol(ma.P_DNSADDR)
if err != nil {
continue
}
}
}
targetAddr = fmt.Sprintf("%s:8340", host)
break
}
if targetAddr == "" {
b.recordManualReachability(false)
return errors.New(
"connectivity test: no valid address found for bootstrap peer",
)
}
b.logger.Debug(
"connecting to bootstrap connectivity service",
zap.String("target", targetAddr),
)
conn, err := grpc.NewClient(
targetAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
b.recordManualReachability(false)
return errors.Wrap(err, "connectivity test dial")
}
defer conn.Close()
client := protobufs.NewConnectivityServiceClient(conn)
req := &protobufs.ConnectivityTestRequest{
PeerId: []byte(b.h.ID()),
Multiaddrs: b.collectConnectivityMultiaddrs(),
}
resp, err := client.TestConnectivity(dialCtx, req)
if err != nil {
b.recordManualReachability(false)
return errors.Wrap(err, "connectivity test rpc")
}
b.recordManualReachability(resp.GetSuccess())
if resp.GetSuccess() {
b.logger.Info(
"your node is reachable",
zap.String("bootstrap_peer", target.ID.String()),
)
return nil
}
b.logger.Warn(
"YOUR NODE IS NOT REACHABLE. CHECK YOUR FIREWALL AND PORT FORWARDING CONFIGURATION",
zap.String("bootstrap_peer", target.ID.String()),
zap.String("error", resp.GetErrorMessage()),
)
if resp.GetErrorMessage() != "" {
return errors.New(resp.GetErrorMessage())
}
return errors.New("connectivity test failed")
}
func (b *BlossomSub) collectConnectivityMultiaddrs() []string {
addrs := b.GetOwnMultiaddrs()
out := make([]string, 0, len(addrs))
for _, addr := range addrs {
out = append(out, addr.String())
}
return out
}
func (b *BlossomSub) recordManualReachability(success bool) {
state := new(bool)
*state = success
b.manualReachability.Store(state)
}
const connectivityCacheValidity = 7 * 24 * time.Hour // 1 week
// connectivityCachePath returns the path to the connectivity check cache file
// for this core. The file is stored in <configDir>/connectivity-check-<coreId>
func (b *BlossomSub) connectivityCachePath() string {
return filepath.Join(
string(b.configDir),
fmt.Sprintf("connectivity-check-%d", b.coreId),
)
}
// isConnectivityCacheValid checks if there's a valid (< 1 week old) connectivity
// cache file indicating a previous successful check
func (b *BlossomSub) isConnectivityCacheValid() bool {
cachePath := b.connectivityCachePath()
info, err := os.Stat(cachePath)
if err != nil {
// File doesn't exist or error accessing it
return false
}
// Check if the file is less than 1 week old
age := time.Since(info.ModTime())
if age < connectivityCacheValidity {
b.logger.Debug("connectivity cache is valid",
zap.String("path", cachePath),
zap.Duration("age", age))
return true
}
b.logger.Debug("connectivity cache is stale",
zap.String("path", cachePath),
zap.Duration("age", age))
return false
}
// writeConnectivityCache writes the connectivity cache file to indicate
// a successful connectivity check
func (b *BlossomSub) writeConnectivityCache() {
cachePath := b.connectivityCachePath()
// Ensure the directory exists
if err := os.MkdirAll(filepath.Dir(cachePath), 0755); err != nil {
b.logger.Warn("failed to create connectivity cache directory",
zap.Error(err))
return
}
// Write the cache file with the current timestamp
timestamp := time.Now().Format(time.RFC3339)
if err := os.WriteFile(cachePath, []byte(timestamp), 0644); err != nil {
b.logger.Warn("failed to write connectivity cache",
zap.String("path", cachePath),
zap.Error(err))
return
}
b.logger.Debug("wrote connectivity cache",
zap.String("path", cachePath))
}
type connectivityService struct {
protobufs.UnimplementedConnectivityServiceServer
logger *zap.Logger
host host.Host
ping *ping.PingService
}
func newConnectivityService(
logger *zap.Logger,
h host.Host,
) *connectivityService {
return &connectivityService{
logger: logger,
host: h,
ping: ping.NewPingService(h),
}
}
func (s *connectivityService) TestConnectivity(
ctx context.Context,
req *protobufs.ConnectivityTestRequest,
) (*protobufs.ConnectivityTestResponse, error) {
resp := &protobufs.ConnectivityTestResponse{}
peerID := peer.ID(req.GetPeerId())
if peerID == "" {
resp.ErrorMessage = "peer id required"
return resp, nil
}
// Get the actual IP address from the gRPC peer context
pr, ok := grpcpeer.FromContext(ctx)
if !ok || pr.Addr == nil {
resp.ErrorMessage = "unable to determine peer address from context"
return resp, nil
}
// Extract the IP from the remote address
remoteAddr := pr.Addr.String()
host, _, err := net.SplitHostPort(remoteAddr)
if err != nil {
resp.ErrorMessage = fmt.Sprintf("invalid remote address: %v", err)
return resp, nil
}
s.logger.Debug(
"connectivity test from peer",
zap.String("peer_id", peerID.String()),
zap.String("remote_ip", host),
)
addrs := make([]ma.Multiaddr, 0, len(req.GetMultiaddrs()))
for _, addrStr := range req.GetMultiaddrs() {
maddr, err := ma.NewMultiaddr(addrStr)
if err != nil {
s.logger.Debug(
"invalid multiaddr in connectivity request",
zap.String("peer_id", peerID.String()),
zap.String("multiaddr", addrStr),
zap.Error(err),
)
continue
}
// Extract the port from the multiaddr but use the actual IP from the
// connection
port, err := maddr.ValueForProtocol(ma.P_TCP)
if err != nil {
// If it's not TCP, try UDP
port, err = maddr.ValueForProtocol(ma.P_UDP)
if err != nil {
continue
}
// Build UDP multiaddr with actual IP
newAddr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/udp/%s/quic-v1", host, port))
if err != nil {
continue
}
addrs = append(addrs, newAddr)
continue
}
// Build TCP multiaddr with actual IP
newAddr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%s", host, port))
if err != nil {
continue
}
addrs = append(addrs, newAddr)
}
if len(addrs) == 0 {
resp.ErrorMessage = "no valid multiaddrs to test"
return resp, nil
}
s.logger.Debug(
"attempting to connect to peer",
zap.String("peer_id", peerID.String()),
zap.Any("addrs", addrs),
)
s.host.Peerstore().AddAddrs(peerID, addrs, peerstore.TempAddrTTL)
connectCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
err = s.host.Connect(connectCtx, peer.AddrInfo{
ID: peerID,
Addrs: addrs,
})
if err != nil {
resp.ErrorMessage = err.Error()
return resp, nil
}
defer s.host.Network().ClosePeer(peerID)
pingCtx, cancelPing := context.WithTimeout(ctx, 10*time.Second)
defer cancelPing()
select {
case <-pingCtx.Done():
resp.ErrorMessage = pingCtx.Err().Error()
return resp, nil
case result := <-s.ping.Ping(pingCtx, peerID):
if result.Error != nil {
resp.ErrorMessage = result.Error.Error()
return resp, nil
}
}
resp.Success = true
return resp, nil
}
func initDHT(
ctx context.Context,
logger *zap.Logger,
h host.Host,
isBootstrapPeer bool,
bootstrappers []peer.AddrInfo,
network uint8,
) *dht.IpfsDHT {
logger.Info("establishing dht")
var mode dht.ModeOpt
if isBootstrapPeer || network != 0 {
logger.Warn("BOOTSTRAP PEER")
mode = dht.ModeServer
} else {
mode = dht.ModeClient
}
opts := []dht.Option{
dht.Mode(mode),
dht.BootstrapPeers(bootstrappers...),
}
if network != 0 {
opts = append(opts, dht.ProtocolPrefix(protocol.ID("/testnet")))
}
kademliaDHT, err := dht.New(
ctx,
h,
opts...,
)
if err != nil {
logger.Panic("error creating dht", zap.Error(err))
}
if err := kademliaDHT.Bootstrap(ctx); err != nil {
logger.Panic("error bootstrapping dht", zap.Error(err))
}
return kademliaDHT
}
func (b *BlossomSub) Reconnect(peerId []byte) error {
peer := peer.ID(peerId)
info := b.h.Peerstore().PeerInfo(peer)
b.h.ConnManager().Unprotect(info.ID, "bootstrap")
time.Sleep(10 * time.Second)
if err := b.h.Connect(b.ctx, info); err != nil {
return errors.Wrap(err, "reconnect")
}
b.h.ConnManager().Protect(info.ID, "bootstrap")
return nil
}
func (b *BlossomSub) Bootstrap(ctx context.Context) error {
return b.bootstrap.Connect(ctx)
}
func (b *BlossomSub) DiscoverPeers(ctx context.Context) error {
return b.discovery.Connect(ctx)
}
func (b *BlossomSub) GetPeerScore(peerId []byte) int64 {
b.peerScoreMx.Lock()
peerScore, ok := b.peerScore[string(peerId)]
if !ok {
b.peerScoreMx.Unlock()
return 0
}
score := peerScore.score
b.peerScoreMx.Unlock()
return int64(score)
}
func (b *BlossomSub) SetPeerScore(peerId []byte, score int64) {
b.peerScoreMx.Lock()
b.peerScore[string(peerId)] = &appScore{
score: float64(score),
expire: time.Now().Add(1 * time.Hour),
}
b.peerScoreMx.Unlock()
}
func (b *BlossomSub) AddPeerScore(peerId []byte, scoreDelta int64) {
b.peerScoreMx.Lock()
if _, ok := b.peerScore[string(peerId)]; !ok {
b.peerScore[string(peerId)] = &appScore{
score: float64(scoreDelta),
expire: time.Now().Add(1 * time.Hour),
}
} else {
b.peerScore[string(peerId)] = &appScore{
score: b.peerScore[string(peerId)].score + float64(scoreDelta),
expire: time.Now().Add(1 * time.Hour),
}
}
b.peerScoreMx.Unlock()
}
func (b *BlossomSub) GetPeerstoreCount() int {
return len(b.h.Peerstore().Peers())
}
func (b *BlossomSub) GetNetworkInfo() *protobufs.NetworkInfoResponse {
resp := &protobufs.NetworkInfoResponse{}
for _, p := range b.h.Network().Peers() {
addrs := b.h.Peerstore().Addrs(p)
multiaddrs := []string{}
for _, a := range addrs {
multiaddrs = append(multiaddrs, a.String())
}
resp.NetworkInfo = append(resp.NetworkInfo, &protobufs.NetworkInfo{
PeerId: []byte(p),
Multiaddrs: multiaddrs,
PeerScore: b.ps.PeerScore(p),
})
}
return resp
}
func (b *BlossomSub) GetNetworkPeersCount() int {
return len(b.h.Network().Peers())
}
func (b *BlossomSub) GetMultiaddrOfPeerStream(
ctx context.Context,
peerId []byte,
) <-chan ma.Multiaddr {
return b.h.Peerstore().AddrStream(ctx, peer.ID(peerId))
}
func (b *BlossomSub) GetMultiaddrOfPeer(peerId []byte) string {
addrs := b.h.Peerstore().Addrs(peer.ID(peerId))
if len(addrs) == 0 {
return ""
}
return addrs[0].String()
}
func (b *BlossomSub) GetNetwork() uint {
return uint(b.p2pConfig.Network)
}
// GetOwnMultiaddrs returns our own multiaddresses as seen by the network
func (b *BlossomSub) GetOwnMultiaddrs() []ma.Multiaddr {
allAddrs := make([]ma.Multiaddr, 0)
// 1. Get listen addresses
listenAddrs := b.h.Network().ListenAddresses()
// 2. Get addresses from our own peerstore (observed addresses)
selfAddrs := b.h.Peerstore().Addrs(b.h.ID())
// 3. Get all addresses including identified ones
identifiedAddrs := b.h.Addrs()
// Combine and deduplicate
addrMap := make(map[string]ma.Multiaddr)
for _, addr := range listenAddrs {
addrMap[addr.String()] = addr
}
for _, addr := range selfAddrs {
addrMap[addr.String()] = addr
}
for _, addr := range identifiedAddrs {
addrMap[addr.String()] = addr
}
// Convert back to slice
for _, addr := range addrMap {
allAddrs = append(allAddrs, addr)
}
return b.filterAndPrioritizeAddrs(allAddrs)
}
// filterAndPrioritizeAddrs filters and prioritizes addresses for external
// visibility
func (b *BlossomSub) filterAndPrioritizeAddrs(
addrs []ma.Multiaddr,
) []ma.Multiaddr {
var public, private, relay []ma.Multiaddr
for _, addr := range addrs {
// Skip localhost and unspecified addresses
if b.isLocalOnlyAddr(addr) {
continue
}
// Check if it's a relay address
if b.isRelayAddr(addr) {
relay = append(relay, addr)
} else if b.isPublicAddr(addr) {
public = append(public, addr)
} else if b.isPrivateButRoutable(addr) {
private = append(private, addr)
}
}
// Return in priority order: public, private, relay
result := make([]ma.Multiaddr, 0, len(public)+len(private)+len(relay))
result = append(result, public...)
result = append(result, private...)
result = append(result, relay...)
return result
}
// isLocalOnlyAddr checks if the address is localhost or unspecified
func (b *BlossomSub) isLocalOnlyAddr(addr ma.Multiaddr) bool {
// Get the IP component
ipComponent, err := addr.ValueForProtocol(ma.P_IP4)
if err != nil {
ipComponent, err = addr.ValueForProtocol(ma.P_IP6)
if err != nil {
return false
}
}
ip := net.ParseIP(ipComponent)
if ip == nil {
return false
}
return ip.IsLoopback() || ip.IsUnspecified()
}
// isRelayAddr checks if this is a relay address
func (b *BlossomSub) isRelayAddr(addr ma.Multiaddr) bool {
protocols := addr.Protocols()
for _, p := range protocols {
if p.Code == ma.P_CIRCUIT {
return true
}
}
return false
}
// isPublicAddr checks if this is a public/external address
func (b *BlossomSub) isPublicAddr(addr ma.Multiaddr) bool {
ipComponent, err := addr.ValueForProtocol(ma.P_IP4)
if err != nil {
ipComponent, err = addr.ValueForProtocol(ma.P_IP6)
if err != nil {
return false
}
}
ip := net.ParseIP(ipComponent)
if ip == nil {
return false
}
// Check if it's a globally routable IP
return !ip.IsPrivate() && !ip.IsLoopback() && !ip.IsUnspecified()
}
// isPrivateButRoutable checks if this is a private but potentially routable
// address
func (b *BlossomSub) isPrivateButRoutable(addr ma.Multiaddr) bool {
ipComponent, err := addr.ValueForProtocol(ma.P_IP4)
if err != nil {
ipComponent, err = addr.ValueForProtocol(ma.P_IP6)
if err != nil {
return false
}
}
ip := net.ParseIP(ipComponent)
if ip == nil {
return false
}
// Private but not localhost
return ip.IsPrivate() && !ip.IsLoopback() && !ip.IsUnspecified()
}
func (b *BlossomSub) StartDirectChannelListener(
key []byte,
purpose string,
server *grpc.Server,
) error {
bind, err := gostream.Listen(
b.h,
protocol.ID(
"/p2p/direct-channel/"+base58.Encode(key)+purpose,
),
)
if err != nil {
return errors.Wrap(err, "start direct channel listener")
}
return errors.Wrap(server.Serve(bind), "start direct channel listener")
}
type extraCloseConn struct {
net.Conn
extraClose func()
}
func (c *extraCloseConn) Close() error {
err := c.Conn.Close()
c.extraClose()
return err
}
func (b *BlossomSub) GetDirectChannel(
ctx context.Context,
peerID []byte,
purpose string,
) (
cc *grpc.ClientConn, err error,
) {
pi, err := b.dht.FindPeer(ctx, peer.ID(peerID))
if err != nil {
return nil, errors.Wrap(err, "get direct channel")
}
creds, err := NewPeerAuthenticator(
b.logger,
&b.p2pConfig,
nil,
nil,
nil,
nil,
[][]byte{peerID},
map[string]channel.AllowedPeerPolicyType{
"quilibrium.node.proxy.pb.PubSubProxy": channel.OnlySelfPeer,
},
map[string]channel.AllowedPeerPolicyType{},
).CreateClientTLSCredentials(peerID)
if err != nil {
return nil, errors.Wrap(err, "get direct channel")
}
var lastError error
for _, addr := range pi.Addrs {
var mga net.Addr
b.logger.Debug(
"attempting to get direct channel with peer",
zap.String("peer", peer.ID(peerID).String()),
zap.String("addr", addr.String()),
)
mga, lastError = mn.ToNetAddr(addr)
if lastError != nil {
b.logger.Debug(
"skipping address",
zap.String("addr", addr.String()),
zap.Error(lastError),
)
continue
}
var cc *grpc.ClientConn
cc, lastError = grpc.NewClient(
mga.String(),
grpc.WithTransportCredentials(creds),
)
if lastError != nil {
b.logger.Debug(
"could not connect",
zap.String("addr", addr.String()),
zap.Error(lastError),
)
continue
}
return cc, nil
}
return nil, errors.Wrap(lastError, "get direct channel")
}
func (b *BlossomSub) GetPublicKey() []byte {
pub, _ := b.signKey.GetPublic().Raw()
return pub
}
func (b *BlossomSub) SignMessage(msg []byte) ([]byte, error) {
sig, err := b.signKey.Sign(msg)
return sig, errors.Wrap(err, "sign message")
}
func toBlossomSubParams(
p2pConfig *config.P2PConfig,
) blossomsub.BlossomSubParams {
return blossomsub.BlossomSubParams{
D: p2pConfig.D,
Dlo: p2pConfig.DLo,
Dhi: p2pConfig.DHi,
Dscore: p2pConfig.DScore,
Dout: p2pConfig.DOut,
HistoryLength: p2pConfig.HistoryLength,
HistoryGossip: p2pConfig.HistoryGossip,
Dlazy: p2pConfig.DLazy,
GossipFactor: p2pConfig.GossipFactor,
GossipRetransmission: p2pConfig.GossipRetransmission,
HeartbeatInitialDelay: p2pConfig.HeartbeatInitialDelay,
HeartbeatInterval: p2pConfig.HeartbeatInterval,
FanoutTTL: p2pConfig.FanoutTTL,
PrunePeers: p2pConfig.PrunePeers,
PruneBackoff: p2pConfig.PruneBackoff,
UnsubscribeBackoff: p2pConfig.UnsubscribeBackoff,
Connectors: p2pConfig.Connectors,
MaxPendingConnections: p2pConfig.MaxPendingConnections,
ConnectionTimeout: p2pConfig.ConnectionTimeout,
DirectConnectTicks: p2pConfig.DirectConnectTicks,
DirectConnectInitialDelay: p2pConfig.DirectConnectInitialDelay,
OpportunisticGraftTicks: p2pConfig.OpportunisticGraftTicks,
OpportunisticGraftPeers: p2pConfig.OpportunisticGraftPeers,
GraftFloodThreshold: p2pConfig.GraftFloodThreshold,
MaxIHaveLength: p2pConfig.MaxIHaveLength,
MaxIHaveMessages: p2pConfig.MaxIHaveMessages,
MaxIDontWantMessages: p2pConfig.MaxIDontWantMessages,
IWantFollowupTime: p2pConfig.IWantFollowupTime,
IDontWantMessageThreshold: p2pConfig.IDontWantMessageThreshold,
IDontWantMessageTTL: p2pConfig.IDontWantMessageTTL,
SlowHeartbeatWarning: 0.1,
}
}
func getNetworkNamespace(network uint8) string {
var network_name string
switch network {
case 0:
network_name = "mainnet"
case 1:
network_name = "testnet-primary"
default:
network_name = fmt.Sprintf("network-%d", network)
}
return ANNOUNCE_PREFIX + network_name
}
// Close implements p2p.PubSub.
func (b *BlossomSub) Close() error {
// Cancel context to signal all subscription goroutines to exit
if b.cancel != nil {
b.cancel()
}
// Cancel all subscriptions to unblock any pending Next() calls
b.subscriptionMutex.Lock()
for _, sub := range b.subscriptions {
sub.Cancel()
}
b.subscriptions = nil
b.subscriptionMutex.Unlock()
return nil
}
// SetShutdownContext implements p2p.PubSub. When the provided context is
// cancelled, the internal BlossomSub context will also be cancelled, allowing
// subscription loops to exit gracefully.
func (b *BlossomSub) SetShutdownContext(ctx context.Context) {
go func() {
select {
case <-ctx.Done():
b.logger.Debug("shutdown context cancelled, closing pubsub")
b.Close()
case <-b.ctx.Done():
// Already closed
}
}()
}
// MultiaddrToIPNet converts a multiaddr containing /ip4 or /ip6
// into a *net.IPNet with a host mask (/32 or /128).
func MultiaddrToIPNet(m ma.Multiaddr) (*net.IPNet, error) {
var (
ip net.IP
ipBits int
)
// Walk components and grab the first IP we see.
ma.ForEach(m, func(c ma.Component, err error) bool {
if err != nil {
return false
}
switch c.Protocol().Code {
case ma.P_IP4:
if ip == nil {
ip = net.IP(c.RawValue()).To4()
ipBits = 32
}
return false
case ma.P_IP6:
if ip == nil {
ip = net.IP(c.RawValue()).To16()
ipBits = 128
}
return false
}
return true
})
if ip == nil {
return nil, fmt.Errorf("multiaddr has no ip4/ip6 component: %s", m)
}
mask := net.CIDRMask(ipBits, ipBits)
return &net.IPNet{
IP: ip.Mask(mask),
Mask: mask,
}, nil
}
func getBitmaskSets(bitmask []byte) [][]byte {
sliced := [][]byte{}
if bytes.Equal(bitmask, make([]byte, len(bitmask))) {
sliced = append(sliced, bitmask)
} else {
for i, b := range bitmask {
if b == 0 {
continue
}
// fast: one bit in byte
if b&(b-1) == 0 {
slice := make([]byte, len(bitmask))
slice[i] = b
sliced = append(sliced, slice)
sliced = append(sliced, slices.Concat([]byte{0}, slice))
sliced = append(sliced, slices.Concat([]byte{0, 0}, slice))
sliced = append(sliced, slices.Concat([]byte{0, 0, 0}, slice))
continue
}
for j := 7; j >= 0; j-- {
if (b>>j)&1 == 1 {
slice := make([]byte, len(bitmask))
slice[i] = 1 << j
sliced = append(sliced, slice)
sliced = append(sliced, slices.Concat([]byte{0}, slice))
sliced = append(sliced, slices.Concat([]byte{0, 0}, slice))
sliced = append(sliced, slices.Concat([]byte{0, 0, 0}, slice))
}
}
}
}
return sliced
}