mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 10:27:26 +08:00
2137 lines
56 KiB
Go
2137 lines
56 KiB
Go
package p2p
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/rand"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"log"
|
|
"math"
|
|
"math/big"
|
|
"math/bits"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime/debug"
|
|
"slices"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/libp2p/go-libp2p"
|
|
dht "github.com/libp2p/go-libp2p-kad-dht"
|
|
libp2pconfig "github.com/libp2p/go-libp2p/config"
|
|
"github.com/libp2p/go-libp2p/core/crypto"
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
"github.com/libp2p/go-libp2p/core/network"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/libp2p/go-libp2p/core/peerstore"
|
|
"github.com/libp2p/go-libp2p/core/protocol"
|
|
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
|
|
"github.com/libp2p/go-libp2p/p2p/discovery/util"
|
|
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
|
|
routedhost "github.com/libp2p/go-libp2p/p2p/host/routed"
|
|
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
|
|
"github.com/libp2p/go-libp2p/p2p/net/gostream"
|
|
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
|
"github.com/mr-tron/base58"
|
|
ma "github.com/multiformats/go-multiaddr"
|
|
madns "github.com/multiformats/go-multiaddr-dns"
|
|
mn "github.com/multiformats/go-multiaddr/net"
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
grpcpeer "google.golang.org/grpc/peer"
|
|
"google.golang.org/protobuf/types/known/wrapperspb"
|
|
"source.quilibrium.com/quilibrium/monorepo/config"
|
|
blossomsub "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub"
|
|
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
|
|
"source.quilibrium.com/quilibrium/monorepo/node/internal/observability"
|
|
"source.quilibrium.com/quilibrium/monorepo/node/p2p/internal"
|
|
"source.quilibrium.com/quilibrium/monorepo/protobufs"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/channel"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/p2p"
|
|
up2p "source.quilibrium.com/quilibrium/monorepo/utils/p2p"
|
|
)
|
|
|
|
const (
|
|
DecayInterval = 10 * time.Minute
|
|
AppDecay = .9
|
|
)
|
|
|
|
// ConfigDir is a distinct type for the configuration directory path
|
|
// Used by Wire for dependency injection
|
|
type ConfigDir string
|
|
|
|
type appScore struct {
|
|
expire time.Time
|
|
score float64
|
|
}
|
|
|
|
type BlossomSub struct {
|
|
ps *blossomsub.PubSub
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
logger *zap.Logger
|
|
peerID peer.ID
|
|
derivedPeerID peer.ID
|
|
bitmaskMap map[string]*blossomsub.Bitmask
|
|
// Track which bit slices belong to which original bitmasks, used to reference
|
|
// count bitmasks for closed subscriptions
|
|
subscriptionTracker map[string][][]byte
|
|
subscriptions []*blossomsub.Subscription
|
|
subscriptionMutex sync.RWMutex
|
|
h host.Host
|
|
signKey crypto.PrivKey
|
|
peerScore map[string]*appScore
|
|
peerScoreMx sync.Mutex
|
|
bootstrap internal.PeerConnector
|
|
discovery internal.PeerConnector
|
|
manualReachability atomic.Pointer[bool]
|
|
p2pConfig config.P2PConfig
|
|
dht *dht.IpfsDHT
|
|
coreId uint
|
|
configDir ConfigDir
|
|
}
|
|
|
|
var _ p2p.PubSub = (*BlossomSub)(nil)
|
|
var ErrNoPeersAvailable = errors.New("no peers available")
|
|
|
|
var ANNOUNCE_PREFIX = "quilibrium-2.0.2-dusk-"
|
|
var connectivityServiceProtocolID = protocol.ID("/quilibrium/connectivity/1.0.0")
|
|
|
|
func getPeerID(p2pConfig *config.P2PConfig) peer.ID {
|
|
peerPrivKey, err := hex.DecodeString(p2pConfig.PeerPrivKey)
|
|
if err != nil {
|
|
log.Panic("error unmarshaling peerkey", zap.Error(err))
|
|
}
|
|
|
|
privKey, err := crypto.UnmarshalEd448PrivateKey(peerPrivKey)
|
|
if err != nil {
|
|
log.Panic("error unmarshaling peerkey", zap.Error(err))
|
|
}
|
|
|
|
pub := privKey.GetPublic()
|
|
id, err := peer.IDFromPublicKey(pub)
|
|
if err != nil {
|
|
log.Panic("error getting peer id", zap.Error(err))
|
|
}
|
|
|
|
return id
|
|
}
|
|
|
|
// NewBlossomSubWithHost creates a new blossomsub instance with a pre-defined
|
|
// host. This method is intended for integration tests that need something
|
|
// more realistic for pubsub purposes, with a supplied simulator (see
|
|
// node/tests/simnet.go for utility methods to construct a flaky host)
|
|
func NewBlossomSubWithHost(
|
|
p2pConfig *config.P2PConfig,
|
|
engineConfig *config.EngineConfig,
|
|
logger *zap.Logger,
|
|
coreId uint,
|
|
isBootstrapPeer bool,
|
|
host host.Host,
|
|
privKey crypto.PrivKey,
|
|
bootstrapHosts []host.Host,
|
|
) *BlossomSub {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
if coreId == 0 {
|
|
logger = logger.With(zap.String("process", "master"))
|
|
} else {
|
|
logger = logger.With(zap.String(
|
|
"process",
|
|
fmt.Sprintf("worker %d", coreId),
|
|
))
|
|
}
|
|
|
|
bs := &BlossomSub{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
logger: logger,
|
|
bitmaskMap: make(map[string]*blossomsub.Bitmask),
|
|
subscriptionTracker: make(map[string][][]byte),
|
|
signKey: privKey,
|
|
peerScore: make(map[string]*appScore),
|
|
p2pConfig: *p2pConfig,
|
|
coreId: coreId,
|
|
}
|
|
|
|
idService := internal.IDServiceFromHost(host)
|
|
|
|
logger.Info("established peer id", zap.String("peer_id", host.ID().String()))
|
|
|
|
bootstrappers := []peer.AddrInfo{}
|
|
for _, bh := range bootstrapHosts {
|
|
// manually construct the p2p string, kind of kludgy, but this is intended
|
|
// for use with tests
|
|
ai, err := peer.AddrInfoFromString(
|
|
bh.Addrs()[0].String() + "/p2p/" + bh.ID().String(),
|
|
)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("error for addr %v, %+v:", bh.Addrs()[0], err))
|
|
}
|
|
bootstrappers = append(bootstrappers, *ai)
|
|
}
|
|
kademliaDHT := initDHT(
|
|
ctx,
|
|
logger,
|
|
host,
|
|
isBootstrapPeer,
|
|
bootstrappers,
|
|
p2pConfig.Network,
|
|
)
|
|
host = routedhost.Wrap(host, kademliaDHT)
|
|
|
|
routingDiscovery := routing.NewRoutingDiscovery(kademliaDHT)
|
|
util.Advertise(ctx, routingDiscovery, getNetworkNamespace(p2pConfig.Network))
|
|
|
|
minBootstrapPeers := min(len(bootstrappers), p2pConfig.MinBootstrapPeers)
|
|
bootstrap := internal.NewPeerConnector(
|
|
ctx,
|
|
zap.NewNop(),
|
|
host,
|
|
idService,
|
|
minBootstrapPeers,
|
|
p2pConfig.BootstrapParallelism,
|
|
internal.NewStaticPeerSource(bootstrappers, true),
|
|
)
|
|
if err := bootstrap.Connect(ctx); err != nil {
|
|
logger.Panic("error connecting to bootstrap peers", zap.Error(err))
|
|
}
|
|
bs.bootstrap = bootstrap
|
|
|
|
discovery := internal.NewPeerConnector(
|
|
ctx,
|
|
zap.NewNop(),
|
|
host,
|
|
idService,
|
|
p2pConfig.D,
|
|
p2pConfig.DiscoveryParallelism,
|
|
internal.NewRoutingDiscoveryPeerSource(
|
|
routingDiscovery,
|
|
getNetworkNamespace(p2pConfig.Network),
|
|
p2pConfig.DiscoveryPeerLookupLimit,
|
|
),
|
|
)
|
|
if err := discovery.Connect(ctx); err != nil {
|
|
logger.Panic("error connecting to discovery peers", zap.Error(err))
|
|
}
|
|
discovery = internal.NewChainedPeerConnector(ctx, bootstrap, discovery)
|
|
bs.discovery = discovery
|
|
|
|
internal.MonitorPeers(
|
|
ctx,
|
|
logger.Named("peer-monitor"),
|
|
host,
|
|
p2pConfig.PingTimeout,
|
|
p2pConfig.PingPeriod,
|
|
p2pConfig.PingAttempts,
|
|
nil,
|
|
)
|
|
|
|
var tracer *blossomsub.JSONTracer
|
|
var err error
|
|
if p2pConfig.TraceLogStdout {
|
|
tracer, err = blossomsub.NewStdoutJSONTracer()
|
|
if err != nil {
|
|
panic(errors.Wrap(err, "error building stdout tracer"))
|
|
}
|
|
} else if p2pConfig.TraceLogFile != "" {
|
|
tracer, err = blossomsub.NewJSONTracer(p2pConfig.TraceLogFile)
|
|
if err != nil {
|
|
logger.Panic("error building file tracer", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
blossomOpts := []blossomsub.Option{
|
|
blossomsub.WithStrictSignatureVerification(true),
|
|
blossomsub.WithValidateQueueSize(blossomsub.DefaultValidateQueueSize),
|
|
blossomsub.WithValidateWorkers(1),
|
|
blossomsub.WithPeerOutboundQueueSize(
|
|
blossomsub.DefaultPeerOutboundQueueSize,
|
|
),
|
|
}
|
|
|
|
if tracer != nil {
|
|
blossomOpts = append(blossomOpts, blossomsub.WithEventTracer(tracer))
|
|
}
|
|
blossomOpts = append(blossomOpts, blossomsub.WithPeerScore(
|
|
&blossomsub.PeerScoreParams{
|
|
SkipAtomicValidation: false,
|
|
BitmaskScoreCap: 0,
|
|
IPColocationFactorWeight: 0,
|
|
IPColocationFactorThreshold: 6,
|
|
BehaviourPenaltyWeight: -10,
|
|
BehaviourPenaltyThreshold: 100,
|
|
BehaviourPenaltyDecay: .5,
|
|
DecayInterval: DecayInterval,
|
|
DecayToZero: .1,
|
|
RetainScore: 60 * time.Minute,
|
|
AppSpecificScore: func(p peer.ID) float64 {
|
|
return float64(bs.GetPeerScore([]byte(p)))
|
|
},
|
|
AppSpecificWeight: 10.0,
|
|
},
|
|
&blossomsub.PeerScoreThresholds{
|
|
SkipAtomicValidation: false,
|
|
GossipThreshold: -500,
|
|
PublishThreshold: -1000,
|
|
GraylistThreshold: -2500,
|
|
AcceptPXThreshold: 1000,
|
|
OpportunisticGraftThreshold: 3.5,
|
|
},
|
|
))
|
|
blossomOpts = append(blossomOpts, observability.WithPrometheusRawTracer())
|
|
if p2pConfig.Network == 0 {
|
|
logger.Info("enabling blacklist for bootstrappers for blossomsub")
|
|
blossomOpts = append(blossomOpts, blossomsub.WithPeerFilter(
|
|
internal.NewStaticPeerFilter(
|
|
[]peer.ID{},
|
|
internal.PeerAddrInfosToPeerIDSlice(bootstrappers),
|
|
true,
|
|
),
|
|
))
|
|
}
|
|
blossomOpts = append(blossomOpts, blossomsub.WithDiscovery(
|
|
internal.NewPeerConnectorDiscovery(discovery),
|
|
))
|
|
blossomOpts = append(blossomOpts, blossomsub.WithMessageIdFn(
|
|
func(pmsg *pb.Message) []byte {
|
|
id := sha256.Sum256(pmsg.Data)
|
|
return id[:]
|
|
}),
|
|
)
|
|
|
|
params := toBlossomSubParams(p2pConfig)
|
|
rt := blossomsub.NewBlossomSubRouter(host, params, bs.p2pConfig.Network)
|
|
blossomOpts = append(blossomOpts, rt.WithDefaultTagTracer())
|
|
pubsub, err := blossomsub.NewBlossomSubWithRouter(ctx, host, rt, blossomOpts...)
|
|
if err != nil {
|
|
logger.Panic("error creating pubsub", zap.Error(err))
|
|
}
|
|
|
|
peerID := host.ID()
|
|
bs.dht = kademliaDHT
|
|
bs.ps = pubsub
|
|
bs.peerID = peerID
|
|
bs.h = host
|
|
bs.signKey = privKey
|
|
bs.derivedPeerID = peerID
|
|
|
|
go bs.background(ctx)
|
|
|
|
bs.initConnectivityServices(isBootstrapPeer, bootstrappers)
|
|
|
|
return bs
|
|
}
|
|
|
|
func NewBlossomSub(
|
|
p2pConfig *config.P2PConfig,
|
|
engineConfig *config.EngineConfig,
|
|
logger *zap.Logger,
|
|
coreId uint,
|
|
configDir ConfigDir,
|
|
) *BlossomSub {
|
|
ctx := context.Background()
|
|
|
|
// Determine the appropriate listen address based on coreId
|
|
var listenAddr string
|
|
if coreId == 0 {
|
|
logger = logger.With(zap.String("process", "master"))
|
|
// For main node (coreId == 0), use the standard p2pConfig.ListenMultiaddr
|
|
listenAddr = p2pConfig.ListenMultiaddr
|
|
} else {
|
|
logger = logger.With(zap.String(
|
|
"process",
|
|
fmt.Sprintf("worker %d", coreId),
|
|
))
|
|
|
|
// For data workers (coreId > 0), check if DataWorkerP2PMultiaddrs is
|
|
// provided
|
|
if engineConfig != nil && len(engineConfig.DataWorkerP2PMultiaddrs) > 0 &&
|
|
int(coreId-1) < len(engineConfig.DataWorkerP2PMultiaddrs) {
|
|
listenAddr = engineConfig.DataWorkerP2PMultiaddrs[coreId-1]
|
|
logger.Info(
|
|
"Using configured data worker P2P multiaddr",
|
|
zap.String("multiaddr", listenAddr),
|
|
zap.Uint("core_id", coreId),
|
|
)
|
|
} else if engineConfig != nil && engineConfig.DataWorkerBaseP2PPort > 0 {
|
|
port := engineConfig.DataWorkerBaseP2PPort + uint16(coreId-1)
|
|
|
|
listenAddr = fmt.Sprintf(engineConfig.DataWorkerBaseListenMultiaddr, port)
|
|
logger.Info(
|
|
"worker p2p listen address calculated",
|
|
zap.String("multiaddr", listenAddr),
|
|
zap.Uint("core_id", coreId),
|
|
zap.Uint16("port", port),
|
|
)
|
|
} else {
|
|
logger.Error(
|
|
"no data worker configuration found",
|
|
zap.Uint("core_id", coreId),
|
|
)
|
|
time.Sleep(120 * time.Second)
|
|
panic("no data worker configuration found")
|
|
}
|
|
}
|
|
|
|
opts := []libp2pconfig.Option{
|
|
libp2p.ListenAddrStrings(listenAddr),
|
|
libp2p.EnableNATService(),
|
|
libp2p.NATPortMap(),
|
|
}
|
|
|
|
isBootstrapPeer := false
|
|
|
|
if coreId == 0 {
|
|
peerId := getPeerID(p2pConfig)
|
|
|
|
if p2pConfig.Network == 0 {
|
|
for _, peerAddr := range config.BootstrapPeers {
|
|
peerinfo, err := peer.AddrInfoFromString(peerAddr)
|
|
if err != nil {
|
|
logger.Panic("error getting peer info", zap.Error(err))
|
|
}
|
|
|
|
if bytes.Equal([]byte(peerinfo.ID), []byte(peerId)) {
|
|
isBootstrapPeer = true
|
|
break
|
|
}
|
|
}
|
|
} else {
|
|
for _, peerAddr := range p2pConfig.BootstrapPeers {
|
|
peerinfo, err := peer.AddrInfoFromString(peerAddr)
|
|
if err != nil {
|
|
logger.Panic("error getting peer info", zap.Error(err))
|
|
}
|
|
|
|
if bytes.Equal([]byte(peerinfo.ID), []byte(peerId)) {
|
|
isBootstrapPeer = true
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
defaultBootstrapPeers := append([]string{}, p2pConfig.BootstrapPeers...)
|
|
|
|
if p2pConfig.Network == 0 {
|
|
defaultBootstrapPeers = config.BootstrapPeers
|
|
}
|
|
|
|
bootstrappers := []peer.AddrInfo{}
|
|
|
|
for _, peerAddr := range defaultBootstrapPeers {
|
|
peerinfo, err := peer.AddrInfoFromString(peerAddr)
|
|
if err != nil {
|
|
logger.Panic("error getting peer info", zap.Error(err))
|
|
}
|
|
|
|
bootstrappers = append(bootstrappers, *peerinfo)
|
|
}
|
|
|
|
var privKey crypto.PrivKey
|
|
var derivedPeerId peer.ID
|
|
if p2pConfig.PeerPrivKey != "" {
|
|
peerPrivKey, err := hex.DecodeString(p2pConfig.PeerPrivKey)
|
|
if err != nil {
|
|
logger.Panic("error unmarshaling peerkey", zap.Error(err))
|
|
}
|
|
|
|
privKey, err = crypto.UnmarshalEd448PrivateKey(peerPrivKey)
|
|
if err != nil {
|
|
logger.Panic("error unmarshaling peerkey", zap.Error(err))
|
|
}
|
|
|
|
derivedPeerId, err = peer.IDFromPrivateKey(privKey)
|
|
if err != nil {
|
|
logger.Panic("error deriving peer id", zap.Error(err))
|
|
}
|
|
|
|
if coreId == 0 {
|
|
opts = append(opts, libp2p.Identity(privKey))
|
|
} else {
|
|
workerKey, _, err := crypto.GenerateEd448Key(rand.Reader)
|
|
if err != nil {
|
|
logger.Panic("error generating worker peerkey", zap.Error(err))
|
|
}
|
|
|
|
opts = append(opts, libp2p.Identity(workerKey))
|
|
}
|
|
}
|
|
|
|
allowedPeers := []peer.AddrInfo{}
|
|
allowedPeers = append(allowedPeers, bootstrappers...)
|
|
|
|
directPeers := []peer.AddrInfo{}
|
|
if len(p2pConfig.DirectPeers) > 0 && coreId == 0 {
|
|
logger.Info("found direct peers in config")
|
|
for _, peerAddr := range p2pConfig.DirectPeers {
|
|
peerinfo, err := peer.AddrInfoFromString(peerAddr)
|
|
if err != nil {
|
|
logger.Panic("error getting peer info", zap.Error(err))
|
|
}
|
|
logger.Info(
|
|
"adding direct peer",
|
|
zap.String("peer", peerinfo.ID.String()),
|
|
)
|
|
directPeers = append(directPeers, *peerinfo)
|
|
}
|
|
}
|
|
allowedPeers = append(allowedPeers, directPeers...)
|
|
|
|
opts = append(
|
|
opts,
|
|
libp2p.SwarmOpts(),
|
|
)
|
|
|
|
if p2pConfig.LowWatermarkConnections != -1 &&
|
|
p2pConfig.HighWatermarkConnections != -1 {
|
|
cm, err := connmgr.NewConnManager(
|
|
p2pConfig.LowWatermarkConnections,
|
|
p2pConfig.HighWatermarkConnections,
|
|
)
|
|
if err != nil {
|
|
logger.Panic("error creating connection manager", zap.Error(err))
|
|
}
|
|
|
|
rm, err := resourceManager(
|
|
p2pConfig.HighWatermarkConnections,
|
|
allowedPeers,
|
|
)
|
|
if err != nil {
|
|
logger.Panic("error creating resource manager", zap.Error(err))
|
|
}
|
|
|
|
opts = append(opts, libp2p.ConnectionManager(cm))
|
|
opts = append(opts, libp2p.ResourceManager(rm))
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
bs := &BlossomSub{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
logger: logger,
|
|
bitmaskMap: make(map[string]*blossomsub.Bitmask),
|
|
subscriptionTracker: make(map[string][][]byte),
|
|
signKey: privKey,
|
|
peerScore: make(map[string]*appScore),
|
|
p2pConfig: *p2pConfig,
|
|
derivedPeerID: derivedPeerId,
|
|
coreId: coreId,
|
|
configDir: configDir,
|
|
}
|
|
|
|
h, err := libp2p.New(opts...)
|
|
if err != nil {
|
|
logger.Panic("error constructing p2p", zap.Error(err))
|
|
}
|
|
idService := internal.IDServiceFromHost(h)
|
|
|
|
logger.Info("established peer id", zap.String("peer_id", h.ID().String()))
|
|
|
|
kademliaDHT := initDHT(
|
|
ctx,
|
|
logger,
|
|
h,
|
|
isBootstrapPeer,
|
|
bootstrappers,
|
|
p2pConfig.Network,
|
|
)
|
|
h = routedhost.Wrap(h, kademliaDHT)
|
|
|
|
routingDiscovery := routing.NewRoutingDiscovery(kademliaDHT)
|
|
util.Advertise(ctx, routingDiscovery, getNetworkNamespace(p2pConfig.Network))
|
|
|
|
minBootstrapPeers := min(len(bootstrappers), p2pConfig.MinBootstrapPeers)
|
|
bootstrap := internal.NewPeerConnector(
|
|
ctx,
|
|
logger.Named("bootstrap"),
|
|
h,
|
|
idService,
|
|
minBootstrapPeers,
|
|
p2pConfig.BootstrapParallelism,
|
|
internal.NewStaticPeerSource(bootstrappers, true),
|
|
)
|
|
if err := bootstrap.Connect(ctx); err != nil {
|
|
logger.Panic("error connecting to bootstrap peers", zap.Error(err))
|
|
}
|
|
bootstrap = internal.NewConditionalPeerConnector(
|
|
ctx,
|
|
internal.NewNotEnoughPeersCondition(
|
|
h,
|
|
minBootstrapPeers,
|
|
internal.PeerAddrInfosToPeerIDMap(bootstrappers),
|
|
),
|
|
bootstrap,
|
|
)
|
|
bs.bootstrap = bootstrap
|
|
|
|
discovery := internal.NewPeerConnector(
|
|
ctx,
|
|
logger.Named("discovery"),
|
|
h,
|
|
idService,
|
|
p2pConfig.D,
|
|
p2pConfig.DiscoveryParallelism,
|
|
internal.NewRoutingDiscoveryPeerSource(
|
|
routingDiscovery,
|
|
getNetworkNamespace(p2pConfig.Network),
|
|
p2pConfig.DiscoveryPeerLookupLimit,
|
|
),
|
|
)
|
|
if err := discovery.Connect(ctx); err != nil {
|
|
logger.Panic("error connecting to discovery peers", zap.Error(err))
|
|
}
|
|
discovery = internal.NewChainedPeerConnector(ctx, bootstrap, discovery)
|
|
bs.discovery = discovery
|
|
|
|
internal.MonitorPeers(
|
|
ctx,
|
|
logger.Named("peer-monitor"),
|
|
h,
|
|
p2pConfig.PingTimeout,
|
|
p2pConfig.PingPeriod,
|
|
p2pConfig.PingAttempts,
|
|
directPeers,
|
|
)
|
|
|
|
var tracer *blossomsub.JSONTracer
|
|
if p2pConfig.TraceLogStdout {
|
|
tracer, err = blossomsub.NewStdoutJSONTracer()
|
|
if err != nil {
|
|
panic(errors.Wrap(err, "error building stdout tracer"))
|
|
}
|
|
} else if p2pConfig.TraceLogFile != "" {
|
|
tracer, err = blossomsub.NewJSONTracer(p2pConfig.TraceLogFile)
|
|
if err != nil {
|
|
logger.Panic("error building file tracer", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
blossomOpts := []blossomsub.Option{
|
|
blossomsub.WithStrictSignatureVerification(true),
|
|
}
|
|
|
|
if len(directPeers) > 0 {
|
|
blossomOpts = append(blossomOpts, blossomsub.WithDirectPeers(directPeers))
|
|
}
|
|
|
|
if tracer != nil {
|
|
blossomOpts = append(blossomOpts, blossomsub.WithEventTracer(tracer))
|
|
}
|
|
|
|
GLOBAL_CONSENSUS_BITMASK := []byte{0x00}
|
|
GLOBAL_FRAME_BITMASK := []byte{0x00, 0x00}
|
|
GLOBAL_PROVER_BITMASK := []byte{0x00, 0x00, 0x00}
|
|
GLOBAL_PEER_INFO_BITMASK := []byte{0x00, 0x00, 0x00, 0x00}
|
|
GLOBAL_ALERT_BITMASK := bytes.Repeat([]byte{0x00}, 16)
|
|
sets := getBitmaskSets(bytes.Repeat([]byte{0xff}, 32))
|
|
sets = slices.Concat([][]byte{
|
|
GLOBAL_CONSENSUS_BITMASK,
|
|
GLOBAL_FRAME_BITMASK,
|
|
GLOBAL_PROVER_BITMASK,
|
|
GLOBAL_PEER_INFO_BITMASK,
|
|
GLOBAL_ALERT_BITMASK,
|
|
}, sets)
|
|
bitmasksScoring := map[string]*blossomsub.BitmaskScoreParams{}
|
|
for _, set := range sets {
|
|
bitmasksScoring[string(set)] = &blossomsub.BitmaskScoreParams{
|
|
SkipAtomicValidation: false,
|
|
BitmaskWeight: 0.1,
|
|
TimeInMeshWeight: 0.00027,
|
|
TimeInMeshQuantum: time.Second,
|
|
TimeInMeshCap: 1,
|
|
FirstMessageDeliveriesWeight: 5,
|
|
FirstMessageDeliveriesDecay: blossomsub.ScoreParameterDecay(
|
|
10 * time.Minute,
|
|
),
|
|
FirstMessageDeliveriesCap: 10000,
|
|
InvalidMessageDeliveriesWeight: -1000,
|
|
InvalidMessageDeliveriesDecay: blossomsub.ScoreParameterDecay(time.Hour),
|
|
}
|
|
}
|
|
|
|
if p2pConfig.Network != 0 {
|
|
blossomOpts = append(blossomOpts, blossomsub.WithPeerScore(
|
|
&blossomsub.PeerScoreParams{
|
|
SkipAtomicValidation: false,
|
|
Bitmasks: bitmasksScoring,
|
|
BitmaskScoreCap: 0,
|
|
IPColocationFactorWeight: 0,
|
|
IPColocationFactorThreshold: 6,
|
|
BehaviourPenaltyWeight: -10,
|
|
BehaviourPenaltyThreshold: 6,
|
|
BehaviourPenaltyDecay: .5,
|
|
DecayInterval: DecayInterval,
|
|
DecayToZero: .1,
|
|
RetainScore: 60 * time.Minute,
|
|
AppSpecificScore: func(p peer.ID) float64 {
|
|
return float64(bs.GetPeerScore([]byte(p)))
|
|
},
|
|
AppSpecificWeight: 10.0,
|
|
},
|
|
&blossomsub.PeerScoreThresholds{
|
|
SkipAtomicValidation: false,
|
|
GossipThreshold: -500,
|
|
PublishThreshold: -1000,
|
|
GraylistThreshold: -2500,
|
|
AcceptPXThreshold: 1000,
|
|
OpportunisticGraftThreshold: 3.5,
|
|
},
|
|
))
|
|
|
|
} else {
|
|
whitelist := []*net.IPNet{}
|
|
for _, p := range directPeers {
|
|
for _, i := range p.Addrs {
|
|
ipnet, err := MultiaddrToIPNet(i)
|
|
if err != nil {
|
|
logger.Error(
|
|
"could not convert direct peer for ip colocation whitelist",
|
|
zap.String("peer_addr", i.String()),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
whitelist = append(whitelist, ipnet)
|
|
}
|
|
}
|
|
blossomOpts = append(blossomOpts, blossomsub.WithPeerScore(
|
|
&blossomsub.PeerScoreParams{
|
|
SkipAtomicValidation: false,
|
|
Bitmasks: bitmasksScoring,
|
|
BitmaskScoreCap: 0,
|
|
IPColocationFactorWeight: -100,
|
|
IPColocationFactorThreshold: 6,
|
|
IPColocationFactorWhitelist: whitelist,
|
|
BehaviourPenaltyWeight: -10,
|
|
BehaviourPenaltyThreshold: 6,
|
|
BehaviourPenaltyDecay: .5,
|
|
DecayInterval: DecayInterval,
|
|
DecayToZero: .1,
|
|
RetainScore: 60 * time.Minute,
|
|
AppSpecificScore: func(p peer.ID) float64 {
|
|
return float64(bs.GetPeerScore([]byte(p)))
|
|
},
|
|
AppSpecificWeight: 10.0,
|
|
},
|
|
&blossomsub.PeerScoreThresholds{
|
|
SkipAtomicValidation: false,
|
|
GossipThreshold: -500,
|
|
PublishThreshold: -1000,
|
|
GraylistThreshold: -2500,
|
|
AcceptPXThreshold: 1000,
|
|
OpportunisticGraftThreshold: 3.5,
|
|
},
|
|
))
|
|
}
|
|
blossomOpts = append(blossomOpts,
|
|
blossomsub.WithValidateQueueSize(p2pConfig.ValidateQueueSize),
|
|
blossomsub.WithValidateWorkers(p2pConfig.ValidateWorkers),
|
|
blossomsub.WithPeerOutboundQueueSize(p2pConfig.PeerOutboundQueueSize),
|
|
)
|
|
blossomOpts = append(blossomOpts, observability.WithPrometheusRawTracer())
|
|
if p2pConfig.Network == 0 {
|
|
logger.Info("enabling blacklist for bootstrappers for blossomsub")
|
|
blossomOpts = append(blossomOpts, blossomsub.WithPeerFilter(
|
|
internal.NewStaticPeerFilter(
|
|
[]peer.ID{},
|
|
internal.PeerAddrInfosToPeerIDSlice(bootstrappers),
|
|
true,
|
|
),
|
|
))
|
|
}
|
|
blossomOpts = append(blossomOpts, blossomsub.WithDiscovery(
|
|
internal.NewPeerConnectorDiscovery(discovery),
|
|
))
|
|
blossomOpts = append(blossomOpts, blossomsub.WithMessageIdFn(
|
|
func(pmsg *pb.Message) []byte {
|
|
id := sha256.Sum256(pmsg.Data)
|
|
return id[:]
|
|
}),
|
|
)
|
|
|
|
params := toBlossomSubParams(p2pConfig)
|
|
rt := blossomsub.NewBlossomSubRouter(h, params, bs.p2pConfig.Network)
|
|
blossomOpts = append(blossomOpts, rt.WithDefaultTagTracer())
|
|
pubsub, err := blossomsub.NewBlossomSubWithRouter(ctx, h, rt, blossomOpts...)
|
|
if err != nil {
|
|
logger.Panic("error creating pubsub", zap.Error(err))
|
|
}
|
|
|
|
peerID := h.ID()
|
|
bs.dht = kademliaDHT
|
|
bs.ps = pubsub
|
|
bs.peerID = peerID
|
|
bs.h = h
|
|
bs.signKey = privKey
|
|
bs.initConnectivityServices(isBootstrapPeer, bootstrappers)
|
|
|
|
go bs.background(ctx)
|
|
|
|
return bs
|
|
}
|
|
|
|
// adjusted from Lotus' reference implementation, addressing
|
|
// https://github.com/libp2p/go-libp2p/issues/1640
|
|
func resourceManager(highWatermark int, allowed []peer.AddrInfo) (
|
|
network.ResourceManager,
|
|
error,
|
|
) {
|
|
defaultLimits := rcmgr.DefaultLimits
|
|
|
|
libp2p.SetDefaultServiceLimits(&defaultLimits)
|
|
|
|
defaultLimits.SystemBaseLimit.Memory = 1 << 28
|
|
defaultLimits.SystemLimitIncrease.Memory = 1 << 28
|
|
defaultLimitConfig := defaultLimits.AutoScale()
|
|
|
|
changes := rcmgr.PartialLimitConfig{}
|
|
|
|
if defaultLimitConfig.ToPartialLimitConfig().System.Memory > 2<<30 {
|
|
changes.System.Memory = 2 << 30
|
|
}
|
|
|
|
maxconns := uint(highWatermark)
|
|
if rcmgr.LimitVal(3*maxconns) > defaultLimitConfig.
|
|
ToPartialLimitConfig().System.ConnsInbound {
|
|
changes.System.ConnsInbound = rcmgr.LimitVal(1 << bits.Len(3*maxconns))
|
|
changes.System.ConnsOutbound = rcmgr.LimitVal(1 << bits.Len(3*maxconns))
|
|
changes.System.Conns = rcmgr.LimitVal(1 << bits.Len(6*maxconns))
|
|
changes.System.StreamsInbound = rcmgr.LimitVal(1 << bits.Len(36*maxconns))
|
|
changes.System.StreamsOutbound = rcmgr.LimitVal(1 << bits.Len(216*maxconns))
|
|
changes.System.Streams = rcmgr.LimitVal(1 << bits.Len(216*maxconns))
|
|
|
|
if rcmgr.LimitVal(3*maxconns) > defaultLimitConfig.
|
|
ToPartialLimitConfig().System.FD {
|
|
changes.System.FD = rcmgr.LimitVal(1 << bits.Len(3*maxconns))
|
|
}
|
|
|
|
changes.ServiceDefault.StreamsInbound = rcmgr.LimitVal(
|
|
1 << bits.Len(12*maxconns),
|
|
)
|
|
changes.ServiceDefault.StreamsOutbound = rcmgr.LimitVal(
|
|
1 << bits.Len(48*maxconns),
|
|
)
|
|
changes.ServiceDefault.Streams = rcmgr.LimitVal(1 << bits.Len(48*maxconns))
|
|
changes.ProtocolDefault.StreamsInbound = rcmgr.LimitVal(
|
|
1 << bits.Len(12*maxconns),
|
|
)
|
|
changes.ProtocolDefault.StreamsOutbound = rcmgr.LimitVal(
|
|
1 << bits.Len(48*maxconns),
|
|
)
|
|
changes.ProtocolDefault.Streams = rcmgr.LimitVal(1 << bits.Len(48*maxconns))
|
|
}
|
|
|
|
changedLimitConfig := changes.Build(defaultLimitConfig)
|
|
|
|
limiter := rcmgr.NewFixedLimiter(changedLimitConfig)
|
|
|
|
str, err := rcmgr.NewStatsTraceReporter()
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "resource manager")
|
|
}
|
|
|
|
rcmgr.MustRegisterWith(prometheus.DefaultRegisterer)
|
|
|
|
// Metrics
|
|
opts := append(
|
|
[]rcmgr.Option{},
|
|
rcmgr.WithTraceReporter(str),
|
|
)
|
|
|
|
resolver := madns.DefaultResolver
|
|
var allowedMaddrs []ma.Multiaddr
|
|
for _, pi := range allowed {
|
|
for _, addr := range pi.Addrs {
|
|
resolved, err := resolver.Resolve(context.Background(), addr)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
allowedMaddrs = append(allowedMaddrs, resolved...)
|
|
}
|
|
}
|
|
|
|
opts = append(opts, rcmgr.WithAllowlistedMultiaddrs(allowedMaddrs))
|
|
|
|
mgr, err := rcmgr.NewResourceManager(limiter, opts...)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "resource manager")
|
|
}
|
|
|
|
return mgr, nil
|
|
}
|
|
|
|
func (b *BlossomSub) background(ctx context.Context) {
|
|
refreshScores := time.NewTicker(DecayInterval)
|
|
defer refreshScores.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-refreshScores.C:
|
|
b.refreshScores()
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *BlossomSub) refreshScores() {
|
|
b.peerScoreMx.Lock()
|
|
|
|
now := time.Now()
|
|
for p, pstats := range b.peerScore {
|
|
if now.After(pstats.expire) {
|
|
delete(b.peerScore, p)
|
|
continue
|
|
}
|
|
|
|
pstats.score *= AppDecay
|
|
if math.Abs(pstats.score) < .1 {
|
|
pstats.score = 0
|
|
}
|
|
}
|
|
|
|
b.peerScoreMx.Unlock()
|
|
}
|
|
|
|
func (b *BlossomSub) PublishToBitmask(bitmask []byte, data []byte) error {
|
|
err := b.ps.Publish(
|
|
b.ctx,
|
|
bitmask,
|
|
data,
|
|
blossomsub.WithSecretKeyAndPeerId(b.signKey, b.derivedPeerID),
|
|
)
|
|
if err != nil && errors.Is(err, blossomsub.ErrBitmaskClosed) &&
|
|
b.p2pConfig.Network == 99 {
|
|
// Ignore bitmask closed errors for devnet
|
|
return nil
|
|
}
|
|
|
|
return errors.Wrap(
|
|
errors.Wrapf(err, "bitmask: %x", bitmask),
|
|
"publish to bitmask",
|
|
)
|
|
}
|
|
|
|
func (b *BlossomSub) Publish(address []byte, data []byte) error {
|
|
bitmask := up2p.GetBloomFilter(address, 256, 3)
|
|
return b.PublishToBitmask(bitmask, data)
|
|
}
|
|
|
|
func (b *BlossomSub) Subscribe(
|
|
bitmask []byte,
|
|
handler func(message *pb.Message) error,
|
|
) error {
|
|
b.logger.Info("joining broadcast")
|
|
bm, err := b.ps.Join(bitmask)
|
|
if err != nil {
|
|
b.logger.Error("join failed", zap.Error(err))
|
|
return errors.Wrap(err, "subscribe")
|
|
}
|
|
|
|
b.logger.Info(
|
|
"subscribe to bitmask",
|
|
zap.String("bitmask", hex.EncodeToString(bitmask)),
|
|
)
|
|
|
|
// Track the bit slices for this subscription
|
|
b.subscriptionMutex.Lock()
|
|
bitSlices := make([][]byte, 0, len(bm))
|
|
for _, bit := range bm {
|
|
sliceCopy := make([]byte, len(bit.Bitmask()))
|
|
copy(sliceCopy, bit.Bitmask())
|
|
bitSlices = append(bitSlices, sliceCopy)
|
|
}
|
|
b.subscriptionTracker[string(bitmask)] = bitSlices
|
|
b.subscriptionMutex.Unlock()
|
|
|
|
// If the bitmask count is greater than three, this is a broad subscribe
|
|
// and the caller is expected to handle disambiguation of addresses
|
|
exact := len(bm) <= 3
|
|
|
|
subs := []*blossomsub.Subscription{}
|
|
for _, bit := range bm {
|
|
sub, err := bit.Subscribe(
|
|
blossomsub.WithBufferSize(b.p2pConfig.SubscriptionQueueSize),
|
|
)
|
|
if err != nil {
|
|
b.logger.Error("subscription failed", zap.Error(err))
|
|
// Clean up on failure
|
|
b.subscriptionMutex.Lock()
|
|
delete(b.subscriptionTracker, string(bitmask))
|
|
b.subscriptionMutex.Unlock()
|
|
return errors.Wrap(err, "subscribe")
|
|
}
|
|
b.subscriptionMutex.Lock()
|
|
_, ok := b.bitmaskMap[string(bit.Bitmask())]
|
|
if !ok {
|
|
b.bitmaskMap[string(bit.Bitmask())] = bit
|
|
}
|
|
b.subscriptionMutex.Unlock()
|
|
subs = append(subs, sub)
|
|
}
|
|
|
|
b.logger.Info(
|
|
"begin streaming from bitmask",
|
|
zap.String("bitmask", hex.EncodeToString(bitmask)),
|
|
)
|
|
|
|
// Track subscriptions for cleanup on Close
|
|
b.subscriptionMutex.Lock()
|
|
b.subscriptions = append(b.subscriptions, subs...)
|
|
b.subscriptionMutex.Unlock()
|
|
|
|
for _, sub := range subs {
|
|
copiedBitmask := make([]byte, len(bitmask))
|
|
copy(copiedBitmask[:], bitmask[:])
|
|
sub := sub
|
|
|
|
go func() {
|
|
for {
|
|
if !b.subscribeHandler(sub, copiedBitmask, exact, handler) {
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
b.logger.Info(
|
|
"successfully subscribed to bitmask",
|
|
zap.String("bitmask", hex.EncodeToString(bitmask)),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
// subscribeHandler processes a single message from the subscription.
|
|
// Returns true if the loop should continue, false if it should exit.
|
|
func (b *BlossomSub) subscribeHandler(
|
|
sub *blossomsub.Subscription,
|
|
copiedBitmask []byte,
|
|
exact bool,
|
|
handler func(message *pb.Message) error,
|
|
) bool {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
b.logger.Error(
|
|
"message handler panicked, recovering",
|
|
zap.Any("panic", r),
|
|
zap.String("stack", string(debug.Stack())),
|
|
)
|
|
}
|
|
}()
|
|
|
|
m, err := sub.Next(b.ctx)
|
|
if err != nil {
|
|
// Context cancelled or subscription closed - exit the loop
|
|
b.logger.Debug(
|
|
"subscription exiting",
|
|
zap.Error(err),
|
|
)
|
|
return false
|
|
}
|
|
if m == nil {
|
|
// Subscription closed
|
|
return false
|
|
}
|
|
if bytes.Equal(m.Bitmask, copiedBitmask) || !exact {
|
|
if err = handler(m.Message); err != nil {
|
|
b.logger.Debug("message handler returned error", zap.Error(err))
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (b *BlossomSub) Unsubscribe(bitmask []byte, raw bool) {
|
|
b.subscriptionMutex.Lock()
|
|
defer b.subscriptionMutex.Unlock()
|
|
|
|
bitmaskKey := string(bitmask)
|
|
bitSlices, exists := b.subscriptionTracker[bitmaskKey]
|
|
if !exists {
|
|
b.logger.Warn(
|
|
"attempted to unsubscribe from unknown bitmask",
|
|
zap.String("bitmask", hex.EncodeToString(bitmask)),
|
|
)
|
|
return
|
|
}
|
|
|
|
b.logger.Info(
|
|
"unsubscribing from bitmask",
|
|
zap.String("bitmask", hex.EncodeToString(bitmask)),
|
|
)
|
|
|
|
// Check each bit slice to see if it's still needed by other subscriptions
|
|
for _, bitSlice := range bitSlices {
|
|
bitSliceKey := string(bitSlice)
|
|
|
|
// Check if any other subscription is using this bit slice
|
|
stillNeeded := false
|
|
for otherKey, otherSlices := range b.subscriptionTracker {
|
|
if otherKey == bitmaskKey {
|
|
continue // Skip the subscription we're removing
|
|
}
|
|
|
|
for _, otherSlice := range otherSlices {
|
|
if bytes.Equal(otherSlice, bitSlice) {
|
|
stillNeeded = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if stillNeeded {
|
|
break
|
|
}
|
|
}
|
|
|
|
// Only close the bitmask if no other subscription needs it
|
|
if !stillNeeded {
|
|
if bm, ok := b.bitmaskMap[bitSliceKey]; ok {
|
|
b.logger.Debug(
|
|
"closing bit slice",
|
|
zap.String("bit_slice", hex.EncodeToString(bitSlice)),
|
|
)
|
|
bm.Close()
|
|
delete(b.bitmaskMap, bitSliceKey)
|
|
}
|
|
} else {
|
|
b.logger.Debug(
|
|
"bit slice still needed by other subscription",
|
|
zap.String("bit_slice", hex.EncodeToString(bitSlice)),
|
|
)
|
|
}
|
|
}
|
|
|
|
// Remove the subscription from tracker
|
|
delete(b.subscriptionTracker, bitmaskKey)
|
|
}
|
|
|
|
func (b *BlossomSub) RegisterValidator(
|
|
bitmask []byte,
|
|
validator func(peerID peer.ID, message *pb.Message) p2p.ValidationResult,
|
|
sync bool,
|
|
) error {
|
|
validatorEx := func(
|
|
ctx context.Context, peerID peer.ID, message *blossomsub.Message,
|
|
) blossomsub.ValidationResult {
|
|
switch v := validator(peerID, message.Message); v {
|
|
case p2p.ValidationResultAccept:
|
|
return blossomsub.ValidationAccept
|
|
case p2p.ValidationResultReject:
|
|
return blossomsub.ValidationReject
|
|
case p2p.ValidationResultIgnore:
|
|
return blossomsub.ValidationIgnore
|
|
default:
|
|
panic("unreachable")
|
|
}
|
|
}
|
|
var _ blossomsub.ValidatorEx = validatorEx
|
|
return b.ps.RegisterBitmaskValidator(
|
|
bitmask,
|
|
validatorEx,
|
|
blossomsub.WithValidatorInline(sync),
|
|
)
|
|
}
|
|
|
|
func (b *BlossomSub) UnregisterValidator(bitmask []byte) error {
|
|
return b.ps.UnregisterBitmaskValidator(bitmask)
|
|
}
|
|
|
|
func (b *BlossomSub) GetPeerID() []byte {
|
|
return []byte(b.derivedPeerID)
|
|
}
|
|
|
|
func (b *BlossomSub) GetRandomPeer(bitmask []byte) ([]byte, error) {
|
|
peers := b.ps.ListPeers(bitmask)
|
|
if len(peers) == 0 {
|
|
return nil, errors.Wrap(
|
|
ErrNoPeersAvailable,
|
|
"get random peer",
|
|
)
|
|
}
|
|
b.logger.Debug("selecting from peers", zap.Any("peer_ids", peers))
|
|
sel, err := rand.Int(rand.Reader, big.NewInt(int64(len(peers))))
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "get random peer")
|
|
}
|
|
|
|
return []byte(peers[sel.Int64()]), nil
|
|
}
|
|
|
|
func (b *BlossomSub) IsPeerConnected(peerId []byte) bool {
|
|
peerID := peer.ID(peerId)
|
|
connectedness := b.h.Network().Connectedness(peerID)
|
|
return connectedness == network.Connected || connectedness == network.Limited
|
|
}
|
|
|
|
func (b *BlossomSub) Reachability() *wrapperspb.BoolValue {
|
|
if manual := b.manualReachability.Load(); manual != nil {
|
|
return wrapperspb.Bool(*manual)
|
|
}
|
|
reachability := b.manualReachability.Load()
|
|
if reachability == nil {
|
|
return nil
|
|
}
|
|
return &wrapperspb.BoolValue{Value: *reachability}
|
|
}
|
|
|
|
func (b *BlossomSub) initConnectivityServices(
|
|
isBootstrapPeer bool,
|
|
bootstrappers []peer.AddrInfo,
|
|
) {
|
|
if b.p2pConfig.Network != 0 {
|
|
return
|
|
}
|
|
|
|
if b.h == nil {
|
|
return
|
|
}
|
|
if isBootstrapPeer {
|
|
b.startConnectivityService()
|
|
return
|
|
}
|
|
clone := make([]peer.AddrInfo, len(bootstrappers))
|
|
copy(clone, bootstrappers)
|
|
b.blockUntilConnectivityTest(clone)
|
|
}
|
|
|
|
func (b *BlossomSub) startConnectivityService() {
|
|
// Use raw TCP listener on port 8340
|
|
listenAddr := "0.0.0.0:8340"
|
|
|
|
listener, err := net.Listen("tcp", listenAddr)
|
|
if err != nil {
|
|
b.logger.Error("failed to start connectivity service", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
b.logger.Info("started connectivity service", zap.String("addr", listenAddr))
|
|
|
|
server := grpc.NewServer()
|
|
protobufs.RegisterConnectivityServiceServer(
|
|
server,
|
|
newConnectivityService(b.logger.Named("connectivity-service"), b.h),
|
|
)
|
|
|
|
go func() {
|
|
if err := server.Serve(listener); err != nil &&
|
|
!errors.Is(err, net.ErrClosed) {
|
|
b.logger.Error("connectivity service exited", zap.Error(err))
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
<-b.ctx.Done()
|
|
server.GracefulStop()
|
|
_ = listener.Close()
|
|
}()
|
|
}
|
|
|
|
func (b *BlossomSub) blockUntilConnectivityTest(bootstrappers []peer.AddrInfo) {
|
|
if len(bootstrappers) == 0 {
|
|
b.logger.Warn("connectivity test skipped, no bootstrap peers available")
|
|
return
|
|
}
|
|
|
|
// Check if we have a recent successful connectivity check cached
|
|
if b.isConnectivityCacheValid() {
|
|
b.logger.Info("skipping connectivity test, recent successful check cached",
|
|
zap.Uint("core_id", b.coreId))
|
|
b.recordManualReachability(true)
|
|
return
|
|
}
|
|
|
|
delay := time.NewTimer(10 * time.Second)
|
|
defer delay.Stop()
|
|
select {
|
|
case <-delay.C:
|
|
case <-b.ctx.Done():
|
|
b.logger.Info("connectivity test cancelled before start, context done")
|
|
return
|
|
}
|
|
|
|
backoff := 10 * time.Second
|
|
for {
|
|
if err := b.runConnectivityTest(b.ctx, bootstrappers); err == nil {
|
|
// Write the cache on successful connectivity test
|
|
b.writeConnectivityCache()
|
|
return
|
|
} else {
|
|
b.logger.Warn("connectivity test failed, retrying", zap.Error(err))
|
|
}
|
|
|
|
wait := time.NewTimer(backoff)
|
|
select {
|
|
case <-wait.C:
|
|
wait.Stop()
|
|
case <-b.ctx.Done():
|
|
wait.Stop()
|
|
b.logger.Info("connectivity test cancelled, context done")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *BlossomSub) runConnectivityTest(
|
|
ctx context.Context,
|
|
bootstrappers []peer.AddrInfo,
|
|
) error {
|
|
candidates := make([]peer.AddrInfo, 0, len(bootstrappers))
|
|
for _, info := range bootstrappers {
|
|
if info.ID == b.h.ID() {
|
|
continue
|
|
}
|
|
if strings.Contains(info.Addrs[0].String(), "dnsaddr") {
|
|
candidates = append(candidates, info)
|
|
}
|
|
}
|
|
if len(candidates) == 0 {
|
|
return errors.New("connectivity test: no bootstrap peers available")
|
|
}
|
|
selection, err := rand.Int(rand.Reader, big.NewInt(int64(len(candidates))))
|
|
if err != nil {
|
|
return errors.Wrap(err, "connectivity test peer selection")
|
|
}
|
|
target := candidates[selection.Int64()]
|
|
return b.invokeConnectivityTest(ctx, target)
|
|
}
|
|
|
|
func (b *BlossomSub) invokeConnectivityTest(
|
|
ctx context.Context,
|
|
target peer.AddrInfo,
|
|
) error {
|
|
dialCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
|
defer cancel()
|
|
|
|
var targetAddr string
|
|
for _, addr := range target.Addrs {
|
|
host, err := addr.ValueForProtocol(ma.P_IP4)
|
|
if err != nil {
|
|
host, err = addr.ValueForProtocol(ma.P_IP6)
|
|
if err != nil {
|
|
host, err = addr.ValueForProtocol(ma.P_DNSADDR)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
targetAddr = fmt.Sprintf("%s:8340", host)
|
|
break
|
|
}
|
|
|
|
if targetAddr == "" {
|
|
b.recordManualReachability(false)
|
|
return errors.New(
|
|
"connectivity test: no valid address found for bootstrap peer",
|
|
)
|
|
}
|
|
|
|
b.logger.Debug(
|
|
"connecting to bootstrap connectivity service",
|
|
zap.String("target", targetAddr),
|
|
)
|
|
|
|
conn, err := grpc.NewClient(
|
|
targetAddr,
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
)
|
|
if err != nil {
|
|
b.recordManualReachability(false)
|
|
return errors.Wrap(err, "connectivity test dial")
|
|
}
|
|
defer conn.Close()
|
|
|
|
client := protobufs.NewConnectivityServiceClient(conn)
|
|
req := &protobufs.ConnectivityTestRequest{
|
|
PeerId: []byte(b.h.ID()),
|
|
Multiaddrs: b.collectConnectivityMultiaddrs(),
|
|
}
|
|
|
|
resp, err := client.TestConnectivity(dialCtx, req)
|
|
if err != nil {
|
|
b.recordManualReachability(false)
|
|
return errors.Wrap(err, "connectivity test rpc")
|
|
}
|
|
|
|
b.recordManualReachability(resp.GetSuccess())
|
|
if resp.GetSuccess() {
|
|
b.logger.Info(
|
|
"your node is reachable",
|
|
zap.String("bootstrap_peer", target.ID.String()),
|
|
)
|
|
return nil
|
|
}
|
|
|
|
b.logger.Warn(
|
|
"YOUR NODE IS NOT REACHABLE. CHECK YOUR FIREWALL AND PORT FORWARDING CONFIGURATION",
|
|
zap.String("bootstrap_peer", target.ID.String()),
|
|
zap.String("error", resp.GetErrorMessage()),
|
|
)
|
|
if resp.GetErrorMessage() != "" {
|
|
return errors.New(resp.GetErrorMessage())
|
|
}
|
|
return errors.New("connectivity test failed")
|
|
}
|
|
|
|
func (b *BlossomSub) collectConnectivityMultiaddrs() []string {
|
|
addrs := b.GetOwnMultiaddrs()
|
|
out := make([]string, 0, len(addrs))
|
|
for _, addr := range addrs {
|
|
out = append(out, addr.String())
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (b *BlossomSub) recordManualReachability(success bool) {
|
|
state := new(bool)
|
|
*state = success
|
|
b.manualReachability.Store(state)
|
|
}
|
|
|
|
const connectivityCacheValidity = 7 * 24 * time.Hour // 1 week
|
|
|
|
// connectivityCachePath returns the path to the connectivity check cache file
|
|
// for this core. The file is stored in <configDir>/connectivity-check-<coreId>
|
|
func (b *BlossomSub) connectivityCachePath() string {
|
|
return filepath.Join(
|
|
string(b.configDir),
|
|
fmt.Sprintf("connectivity-check-%d", b.coreId),
|
|
)
|
|
}
|
|
|
|
// isConnectivityCacheValid checks if there's a valid (< 1 week old) connectivity
|
|
// cache file indicating a previous successful check
|
|
func (b *BlossomSub) isConnectivityCacheValid() bool {
|
|
cachePath := b.connectivityCachePath()
|
|
info, err := os.Stat(cachePath)
|
|
if err != nil {
|
|
// File doesn't exist or error accessing it
|
|
return false
|
|
}
|
|
|
|
// Check if the file is less than 1 week old
|
|
age := time.Since(info.ModTime())
|
|
if age < connectivityCacheValidity {
|
|
b.logger.Debug("connectivity cache is valid",
|
|
zap.String("path", cachePath),
|
|
zap.Duration("age", age))
|
|
return true
|
|
}
|
|
|
|
b.logger.Debug("connectivity cache is stale",
|
|
zap.String("path", cachePath),
|
|
zap.Duration("age", age))
|
|
return false
|
|
}
|
|
|
|
// writeConnectivityCache writes the connectivity cache file to indicate
|
|
// a successful connectivity check
|
|
func (b *BlossomSub) writeConnectivityCache() {
|
|
cachePath := b.connectivityCachePath()
|
|
|
|
// Ensure the directory exists
|
|
if err := os.MkdirAll(filepath.Dir(cachePath), 0755); err != nil {
|
|
b.logger.Warn("failed to create connectivity cache directory",
|
|
zap.Error(err))
|
|
return
|
|
}
|
|
|
|
// Write the cache file with the current timestamp
|
|
timestamp := time.Now().Format(time.RFC3339)
|
|
if err := os.WriteFile(cachePath, []byte(timestamp), 0644); err != nil {
|
|
b.logger.Warn("failed to write connectivity cache",
|
|
zap.String("path", cachePath),
|
|
zap.Error(err))
|
|
return
|
|
}
|
|
|
|
b.logger.Debug("wrote connectivity cache",
|
|
zap.String("path", cachePath))
|
|
}
|
|
|
|
type connectivityService struct {
|
|
protobufs.UnimplementedConnectivityServiceServer
|
|
logger *zap.Logger
|
|
host host.Host
|
|
ping *ping.PingService
|
|
}
|
|
|
|
func newConnectivityService(
|
|
logger *zap.Logger,
|
|
h host.Host,
|
|
) *connectivityService {
|
|
return &connectivityService{
|
|
logger: logger,
|
|
host: h,
|
|
ping: ping.NewPingService(h),
|
|
}
|
|
}
|
|
|
|
func (s *connectivityService) TestConnectivity(
|
|
ctx context.Context,
|
|
req *protobufs.ConnectivityTestRequest,
|
|
) (*protobufs.ConnectivityTestResponse, error) {
|
|
resp := &protobufs.ConnectivityTestResponse{}
|
|
peerID := peer.ID(req.GetPeerId())
|
|
if peerID == "" {
|
|
resp.ErrorMessage = "peer id required"
|
|
return resp, nil
|
|
}
|
|
|
|
// Get the actual IP address from the gRPC peer context
|
|
pr, ok := grpcpeer.FromContext(ctx)
|
|
if !ok || pr.Addr == nil {
|
|
resp.ErrorMessage = "unable to determine peer address from context"
|
|
return resp, nil
|
|
}
|
|
|
|
// Extract the IP from the remote address
|
|
remoteAddr := pr.Addr.String()
|
|
host, _, err := net.SplitHostPort(remoteAddr)
|
|
if err != nil {
|
|
resp.ErrorMessage = fmt.Sprintf("invalid remote address: %v", err)
|
|
return resp, nil
|
|
}
|
|
|
|
s.logger.Debug(
|
|
"connectivity test from peer",
|
|
zap.String("peer_id", peerID.String()),
|
|
zap.String("remote_ip", host),
|
|
)
|
|
|
|
addrs := make([]ma.Multiaddr, 0, len(req.GetMultiaddrs()))
|
|
for _, addrStr := range req.GetMultiaddrs() {
|
|
maddr, err := ma.NewMultiaddr(addrStr)
|
|
if err != nil {
|
|
s.logger.Debug(
|
|
"invalid multiaddr in connectivity request",
|
|
zap.String("peer_id", peerID.String()),
|
|
zap.String("multiaddr", addrStr),
|
|
zap.Error(err),
|
|
)
|
|
continue
|
|
}
|
|
|
|
// Extract the port from the multiaddr but use the actual IP from the
|
|
// connection
|
|
port, err := maddr.ValueForProtocol(ma.P_TCP)
|
|
if err != nil {
|
|
// If it's not TCP, try UDP
|
|
port, err = maddr.ValueForProtocol(ma.P_UDP)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
// Build UDP multiaddr with actual IP
|
|
newAddr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/udp/%s/quic-v1", host, port))
|
|
if err != nil {
|
|
continue
|
|
}
|
|
addrs = append(addrs, newAddr)
|
|
continue
|
|
}
|
|
|
|
// Build TCP multiaddr with actual IP
|
|
newAddr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%s", host, port))
|
|
if err != nil {
|
|
continue
|
|
}
|
|
addrs = append(addrs, newAddr)
|
|
}
|
|
|
|
if len(addrs) == 0 {
|
|
resp.ErrorMessage = "no valid multiaddrs to test"
|
|
return resp, nil
|
|
}
|
|
|
|
s.logger.Debug(
|
|
"attempting to connect to peer",
|
|
zap.String("peer_id", peerID.String()),
|
|
zap.Any("addrs", addrs),
|
|
)
|
|
|
|
s.host.Peerstore().AddAddrs(peerID, addrs, peerstore.TempAddrTTL)
|
|
|
|
connectCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
|
defer cancel()
|
|
err = s.host.Connect(connectCtx, peer.AddrInfo{
|
|
ID: peerID,
|
|
Addrs: addrs,
|
|
})
|
|
if err != nil {
|
|
resp.ErrorMessage = err.Error()
|
|
return resp, nil
|
|
}
|
|
|
|
defer s.host.Network().ClosePeer(peerID)
|
|
|
|
pingCtx, cancelPing := context.WithTimeout(ctx, 10*time.Second)
|
|
defer cancelPing()
|
|
|
|
select {
|
|
case <-pingCtx.Done():
|
|
resp.ErrorMessage = pingCtx.Err().Error()
|
|
return resp, nil
|
|
case result := <-s.ping.Ping(pingCtx, peerID):
|
|
if result.Error != nil {
|
|
resp.ErrorMessage = result.Error.Error()
|
|
return resp, nil
|
|
}
|
|
}
|
|
|
|
resp.Success = true
|
|
return resp, nil
|
|
}
|
|
|
|
func initDHT(
|
|
ctx context.Context,
|
|
logger *zap.Logger,
|
|
h host.Host,
|
|
isBootstrapPeer bool,
|
|
bootstrappers []peer.AddrInfo,
|
|
network uint8,
|
|
) *dht.IpfsDHT {
|
|
logger.Info("establishing dht")
|
|
var mode dht.ModeOpt
|
|
if isBootstrapPeer || network != 0 {
|
|
logger.Warn("BOOTSTRAP PEER")
|
|
mode = dht.ModeServer
|
|
} else {
|
|
mode = dht.ModeClient
|
|
}
|
|
opts := []dht.Option{
|
|
dht.Mode(mode),
|
|
dht.BootstrapPeers(bootstrappers...),
|
|
}
|
|
if network != 0 {
|
|
opts = append(opts, dht.ProtocolPrefix(protocol.ID("/testnet")))
|
|
}
|
|
kademliaDHT, err := dht.New(
|
|
ctx,
|
|
h,
|
|
opts...,
|
|
)
|
|
if err != nil {
|
|
logger.Panic("error creating dht", zap.Error(err))
|
|
}
|
|
if err := kademliaDHT.Bootstrap(ctx); err != nil {
|
|
logger.Panic("error bootstrapping dht", zap.Error(err))
|
|
}
|
|
return kademliaDHT
|
|
}
|
|
|
|
func (b *BlossomSub) Reconnect(peerId []byte) error {
|
|
peer := peer.ID(peerId)
|
|
info := b.h.Peerstore().PeerInfo(peer)
|
|
b.h.ConnManager().Unprotect(info.ID, "bootstrap")
|
|
time.Sleep(10 * time.Second)
|
|
if err := b.h.Connect(b.ctx, info); err != nil {
|
|
return errors.Wrap(err, "reconnect")
|
|
}
|
|
|
|
b.h.ConnManager().Protect(info.ID, "bootstrap")
|
|
return nil
|
|
}
|
|
|
|
func (b *BlossomSub) Bootstrap(ctx context.Context) error {
|
|
return b.bootstrap.Connect(ctx)
|
|
}
|
|
|
|
func (b *BlossomSub) DiscoverPeers(ctx context.Context) error {
|
|
return b.discovery.Connect(ctx)
|
|
}
|
|
|
|
func (b *BlossomSub) GetPeerScore(peerId []byte) int64 {
|
|
b.peerScoreMx.Lock()
|
|
peerScore, ok := b.peerScore[string(peerId)]
|
|
if !ok {
|
|
b.peerScoreMx.Unlock()
|
|
return 0
|
|
}
|
|
score := peerScore.score
|
|
b.peerScoreMx.Unlock()
|
|
return int64(score)
|
|
}
|
|
|
|
func (b *BlossomSub) SetPeerScore(peerId []byte, score int64) {
|
|
b.peerScoreMx.Lock()
|
|
b.peerScore[string(peerId)] = &appScore{
|
|
score: float64(score),
|
|
expire: time.Now().Add(1 * time.Hour),
|
|
}
|
|
b.peerScoreMx.Unlock()
|
|
}
|
|
|
|
func (b *BlossomSub) AddPeerScore(peerId []byte, scoreDelta int64) {
|
|
b.peerScoreMx.Lock()
|
|
if _, ok := b.peerScore[string(peerId)]; !ok {
|
|
b.peerScore[string(peerId)] = &appScore{
|
|
score: float64(scoreDelta),
|
|
expire: time.Now().Add(1 * time.Hour),
|
|
}
|
|
} else {
|
|
b.peerScore[string(peerId)] = &appScore{
|
|
score: b.peerScore[string(peerId)].score + float64(scoreDelta),
|
|
expire: time.Now().Add(1 * time.Hour),
|
|
}
|
|
}
|
|
b.peerScoreMx.Unlock()
|
|
}
|
|
|
|
func (b *BlossomSub) GetPeerstoreCount() int {
|
|
return len(b.h.Peerstore().Peers())
|
|
}
|
|
|
|
func (b *BlossomSub) GetNetworkInfo() *protobufs.NetworkInfoResponse {
|
|
resp := &protobufs.NetworkInfoResponse{}
|
|
for _, p := range b.h.Network().Peers() {
|
|
addrs := b.h.Peerstore().Addrs(p)
|
|
multiaddrs := []string{}
|
|
for _, a := range addrs {
|
|
multiaddrs = append(multiaddrs, a.String())
|
|
}
|
|
resp.NetworkInfo = append(resp.NetworkInfo, &protobufs.NetworkInfo{
|
|
PeerId: []byte(p),
|
|
Multiaddrs: multiaddrs,
|
|
PeerScore: b.ps.PeerScore(p),
|
|
})
|
|
}
|
|
return resp
|
|
}
|
|
|
|
func (b *BlossomSub) GetNetworkPeersCount() int {
|
|
return len(b.h.Network().Peers())
|
|
}
|
|
|
|
func (b *BlossomSub) GetMultiaddrOfPeerStream(
|
|
ctx context.Context,
|
|
peerId []byte,
|
|
) <-chan ma.Multiaddr {
|
|
return b.h.Peerstore().AddrStream(ctx, peer.ID(peerId))
|
|
}
|
|
|
|
func (b *BlossomSub) GetMultiaddrOfPeer(peerId []byte) string {
|
|
addrs := b.h.Peerstore().Addrs(peer.ID(peerId))
|
|
if len(addrs) == 0 {
|
|
return ""
|
|
}
|
|
|
|
return addrs[0].String()
|
|
}
|
|
|
|
func (b *BlossomSub) GetNetwork() uint {
|
|
return uint(b.p2pConfig.Network)
|
|
}
|
|
|
|
// GetOwnMultiaddrs returns our own multiaddresses as seen by the network
|
|
func (b *BlossomSub) GetOwnMultiaddrs() []ma.Multiaddr {
|
|
allAddrs := make([]ma.Multiaddr, 0)
|
|
|
|
// 1. Get listen addresses
|
|
listenAddrs := b.h.Network().ListenAddresses()
|
|
|
|
// 2. Get addresses from our own peerstore (observed addresses)
|
|
selfAddrs := b.h.Peerstore().Addrs(b.h.ID())
|
|
|
|
// 3. Get all addresses including identified ones
|
|
identifiedAddrs := b.h.Addrs()
|
|
|
|
// Combine and deduplicate
|
|
addrMap := make(map[string]ma.Multiaddr)
|
|
for _, addr := range listenAddrs {
|
|
addrMap[addr.String()] = addr
|
|
}
|
|
for _, addr := range selfAddrs {
|
|
addrMap[addr.String()] = addr
|
|
}
|
|
for _, addr := range identifiedAddrs {
|
|
addrMap[addr.String()] = addr
|
|
}
|
|
|
|
// Convert back to slice
|
|
for _, addr := range addrMap {
|
|
allAddrs = append(allAddrs, addr)
|
|
}
|
|
|
|
return b.filterAndPrioritizeAddrs(allAddrs)
|
|
}
|
|
|
|
// filterAndPrioritizeAddrs filters and prioritizes addresses for external
|
|
// visibility
|
|
func (b *BlossomSub) filterAndPrioritizeAddrs(
|
|
addrs []ma.Multiaddr,
|
|
) []ma.Multiaddr {
|
|
var public, private, relay []ma.Multiaddr
|
|
|
|
for _, addr := range addrs {
|
|
// Skip localhost and unspecified addresses
|
|
if b.isLocalOnlyAddr(addr) {
|
|
continue
|
|
}
|
|
|
|
// Check if it's a relay address
|
|
if b.isRelayAddr(addr) {
|
|
relay = append(relay, addr)
|
|
} else if b.isPublicAddr(addr) {
|
|
public = append(public, addr)
|
|
} else if b.isPrivateButRoutable(addr) {
|
|
private = append(private, addr)
|
|
}
|
|
}
|
|
|
|
// Return in priority order: public, private, relay
|
|
result := make([]ma.Multiaddr, 0, len(public)+len(private)+len(relay))
|
|
result = append(result, public...)
|
|
result = append(result, private...)
|
|
result = append(result, relay...)
|
|
|
|
return result
|
|
}
|
|
|
|
// isLocalOnlyAddr checks if the address is localhost or unspecified
|
|
func (b *BlossomSub) isLocalOnlyAddr(addr ma.Multiaddr) bool {
|
|
// Get the IP component
|
|
ipComponent, err := addr.ValueForProtocol(ma.P_IP4)
|
|
if err != nil {
|
|
ipComponent, err = addr.ValueForProtocol(ma.P_IP6)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
}
|
|
|
|
ip := net.ParseIP(ipComponent)
|
|
if ip == nil {
|
|
return false
|
|
}
|
|
|
|
return ip.IsLoopback() || ip.IsUnspecified()
|
|
}
|
|
|
|
// isRelayAddr checks if this is a relay address
|
|
func (b *BlossomSub) isRelayAddr(addr ma.Multiaddr) bool {
|
|
protocols := addr.Protocols()
|
|
for _, p := range protocols {
|
|
if p.Code == ma.P_CIRCUIT {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// isPublicAddr checks if this is a public/external address
|
|
func (b *BlossomSub) isPublicAddr(addr ma.Multiaddr) bool {
|
|
ipComponent, err := addr.ValueForProtocol(ma.P_IP4)
|
|
if err != nil {
|
|
ipComponent, err = addr.ValueForProtocol(ma.P_IP6)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
}
|
|
|
|
ip := net.ParseIP(ipComponent)
|
|
if ip == nil {
|
|
return false
|
|
}
|
|
|
|
// Check if it's a globally routable IP
|
|
return !ip.IsPrivate() && !ip.IsLoopback() && !ip.IsUnspecified()
|
|
}
|
|
|
|
// isPrivateButRoutable checks if this is a private but potentially routable
|
|
// address
|
|
func (b *BlossomSub) isPrivateButRoutable(addr ma.Multiaddr) bool {
|
|
ipComponent, err := addr.ValueForProtocol(ma.P_IP4)
|
|
if err != nil {
|
|
ipComponent, err = addr.ValueForProtocol(ma.P_IP6)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
}
|
|
|
|
ip := net.ParseIP(ipComponent)
|
|
if ip == nil {
|
|
return false
|
|
}
|
|
|
|
// Private but not localhost
|
|
return ip.IsPrivate() && !ip.IsLoopback() && !ip.IsUnspecified()
|
|
}
|
|
|
|
func (b *BlossomSub) StartDirectChannelListener(
|
|
key []byte,
|
|
purpose string,
|
|
server *grpc.Server,
|
|
) error {
|
|
bind, err := gostream.Listen(
|
|
b.h,
|
|
protocol.ID(
|
|
"/p2p/direct-channel/"+base58.Encode(key)+purpose,
|
|
),
|
|
)
|
|
if err != nil {
|
|
return errors.Wrap(err, "start direct channel listener")
|
|
}
|
|
|
|
return errors.Wrap(server.Serve(bind), "start direct channel listener")
|
|
}
|
|
|
|
type extraCloseConn struct {
|
|
net.Conn
|
|
extraClose func()
|
|
}
|
|
|
|
func (c *extraCloseConn) Close() error {
|
|
err := c.Conn.Close()
|
|
c.extraClose()
|
|
return err
|
|
}
|
|
|
|
func (b *BlossomSub) GetDirectChannel(
|
|
ctx context.Context,
|
|
peerID []byte,
|
|
purpose string,
|
|
) (
|
|
cc *grpc.ClientConn, err error,
|
|
) {
|
|
pi, err := b.dht.FindPeer(ctx, peer.ID(peerID))
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "get direct channel")
|
|
}
|
|
|
|
creds, err := NewPeerAuthenticator(
|
|
b.logger,
|
|
&b.p2pConfig,
|
|
nil,
|
|
nil,
|
|
nil,
|
|
nil,
|
|
[][]byte{peerID},
|
|
map[string]channel.AllowedPeerPolicyType{
|
|
"quilibrium.node.proxy.pb.PubSubProxy": channel.OnlySelfPeer,
|
|
},
|
|
map[string]channel.AllowedPeerPolicyType{},
|
|
).CreateClientTLSCredentials(peerID)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "get direct channel")
|
|
}
|
|
|
|
var lastError error
|
|
for _, addr := range pi.Addrs {
|
|
var mga net.Addr
|
|
b.logger.Debug(
|
|
"attempting to get direct channel with peer",
|
|
zap.String("peer", peer.ID(peerID).String()),
|
|
zap.String("addr", addr.String()),
|
|
)
|
|
mga, lastError = mn.ToNetAddr(addr)
|
|
if lastError != nil {
|
|
b.logger.Debug(
|
|
"skipping address",
|
|
zap.String("addr", addr.String()),
|
|
zap.Error(lastError),
|
|
)
|
|
continue
|
|
}
|
|
|
|
var cc *grpc.ClientConn
|
|
cc, lastError = grpc.NewClient(
|
|
mga.String(),
|
|
grpc.WithTransportCredentials(creds),
|
|
)
|
|
if lastError != nil {
|
|
b.logger.Debug(
|
|
"could not connect",
|
|
zap.String("addr", addr.String()),
|
|
zap.Error(lastError),
|
|
)
|
|
continue
|
|
}
|
|
|
|
return cc, nil
|
|
}
|
|
|
|
return nil, errors.Wrap(lastError, "get direct channel")
|
|
}
|
|
|
|
func (b *BlossomSub) GetPublicKey() []byte {
|
|
pub, _ := b.signKey.GetPublic().Raw()
|
|
return pub
|
|
}
|
|
|
|
func (b *BlossomSub) SignMessage(msg []byte) ([]byte, error) {
|
|
sig, err := b.signKey.Sign(msg)
|
|
return sig, errors.Wrap(err, "sign message")
|
|
}
|
|
|
|
func toBlossomSubParams(
|
|
p2pConfig *config.P2PConfig,
|
|
) blossomsub.BlossomSubParams {
|
|
return blossomsub.BlossomSubParams{
|
|
D: p2pConfig.D,
|
|
Dlo: p2pConfig.DLo,
|
|
Dhi: p2pConfig.DHi,
|
|
Dscore: p2pConfig.DScore,
|
|
Dout: p2pConfig.DOut,
|
|
HistoryLength: p2pConfig.HistoryLength,
|
|
HistoryGossip: p2pConfig.HistoryGossip,
|
|
Dlazy: p2pConfig.DLazy,
|
|
GossipFactor: p2pConfig.GossipFactor,
|
|
GossipRetransmission: p2pConfig.GossipRetransmission,
|
|
HeartbeatInitialDelay: p2pConfig.HeartbeatInitialDelay,
|
|
HeartbeatInterval: p2pConfig.HeartbeatInterval,
|
|
FanoutTTL: p2pConfig.FanoutTTL,
|
|
PrunePeers: p2pConfig.PrunePeers,
|
|
PruneBackoff: p2pConfig.PruneBackoff,
|
|
UnsubscribeBackoff: p2pConfig.UnsubscribeBackoff,
|
|
Connectors: p2pConfig.Connectors,
|
|
MaxPendingConnections: p2pConfig.MaxPendingConnections,
|
|
ConnectionTimeout: p2pConfig.ConnectionTimeout,
|
|
DirectConnectTicks: p2pConfig.DirectConnectTicks,
|
|
DirectConnectInitialDelay: p2pConfig.DirectConnectInitialDelay,
|
|
OpportunisticGraftTicks: p2pConfig.OpportunisticGraftTicks,
|
|
OpportunisticGraftPeers: p2pConfig.OpportunisticGraftPeers,
|
|
GraftFloodThreshold: p2pConfig.GraftFloodThreshold,
|
|
MaxIHaveLength: p2pConfig.MaxIHaveLength,
|
|
MaxIHaveMessages: p2pConfig.MaxIHaveMessages,
|
|
MaxIDontWantMessages: p2pConfig.MaxIDontWantMessages,
|
|
IWantFollowupTime: p2pConfig.IWantFollowupTime,
|
|
IDontWantMessageThreshold: p2pConfig.IDontWantMessageThreshold,
|
|
IDontWantMessageTTL: p2pConfig.IDontWantMessageTTL,
|
|
SlowHeartbeatWarning: 0.1,
|
|
}
|
|
}
|
|
|
|
func getNetworkNamespace(network uint8) string {
|
|
var network_name string
|
|
switch network {
|
|
case 0:
|
|
network_name = "mainnet"
|
|
case 1:
|
|
network_name = "testnet-primary"
|
|
default:
|
|
network_name = fmt.Sprintf("network-%d", network)
|
|
}
|
|
|
|
return ANNOUNCE_PREFIX + network_name
|
|
}
|
|
|
|
// Close implements p2p.PubSub.
|
|
func (b *BlossomSub) Close() error {
|
|
// Cancel context to signal all subscription goroutines to exit
|
|
if b.cancel != nil {
|
|
b.cancel()
|
|
}
|
|
|
|
// Cancel all subscriptions to unblock any pending Next() calls
|
|
b.subscriptionMutex.Lock()
|
|
for _, sub := range b.subscriptions {
|
|
sub.Cancel()
|
|
}
|
|
b.subscriptions = nil
|
|
b.subscriptionMutex.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// SetShutdownContext implements p2p.PubSub. When the provided context is
|
|
// cancelled, the internal BlossomSub context will also be cancelled, allowing
|
|
// subscription loops to exit gracefully.
|
|
func (b *BlossomSub) SetShutdownContext(ctx context.Context) {
|
|
go func() {
|
|
select {
|
|
case <-ctx.Done():
|
|
b.logger.Debug("shutdown context cancelled, closing pubsub")
|
|
b.Close()
|
|
case <-b.ctx.Done():
|
|
// Already closed
|
|
}
|
|
}()
|
|
}
|
|
|
|
// MultiaddrToIPNet converts a multiaddr containing /ip4 or /ip6
|
|
// into a *net.IPNet with a host mask (/32 or /128).
|
|
func MultiaddrToIPNet(m ma.Multiaddr) (*net.IPNet, error) {
|
|
var (
|
|
ip net.IP
|
|
ipBits int
|
|
)
|
|
|
|
// Walk components and grab the first IP we see.
|
|
ma.ForEach(m, func(c ma.Component, err error) bool {
|
|
if err != nil {
|
|
return false
|
|
}
|
|
switch c.Protocol().Code {
|
|
case ma.P_IP4:
|
|
if ip == nil {
|
|
ip = net.IP(c.RawValue()).To4()
|
|
ipBits = 32
|
|
}
|
|
return false
|
|
|
|
case ma.P_IP6:
|
|
if ip == nil {
|
|
ip = net.IP(c.RawValue()).To16()
|
|
ipBits = 128
|
|
}
|
|
return false
|
|
}
|
|
return true
|
|
})
|
|
|
|
if ip == nil {
|
|
return nil, fmt.Errorf("multiaddr has no ip4/ip6 component: %s", m)
|
|
}
|
|
|
|
mask := net.CIDRMask(ipBits, ipBits)
|
|
|
|
return &net.IPNet{
|
|
IP: ip.Mask(mask),
|
|
Mask: mask,
|
|
}, nil
|
|
}
|
|
|
|
func getBitmaskSets(bitmask []byte) [][]byte {
|
|
sliced := [][]byte{}
|
|
if bytes.Equal(bitmask, make([]byte, len(bitmask))) {
|
|
sliced = append(sliced, bitmask)
|
|
} else {
|
|
for i, b := range bitmask {
|
|
if b == 0 {
|
|
continue
|
|
}
|
|
|
|
// fast: one bit in byte
|
|
if b&(b-1) == 0 {
|
|
slice := make([]byte, len(bitmask))
|
|
slice[i] = b
|
|
sliced = append(sliced, slice)
|
|
sliced = append(sliced, slices.Concat([]byte{0}, slice))
|
|
sliced = append(sliced, slices.Concat([]byte{0, 0}, slice))
|
|
sliced = append(sliced, slices.Concat([]byte{0, 0, 0}, slice))
|
|
continue
|
|
}
|
|
|
|
for j := 7; j >= 0; j-- {
|
|
if (b>>j)&1 == 1 {
|
|
slice := make([]byte, len(bitmask))
|
|
slice[i] = 1 << j
|
|
sliced = append(sliced, slice)
|
|
sliced = append(sliced, slices.Concat([]byte{0}, slice))
|
|
sliced = append(sliced, slices.Concat([]byte{0, 0}, slice))
|
|
sliced = append(sliced, slices.Concat([]byte{0, 0, 0}, slice))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return sliced
|
|
}
|