get develop caught up (#322)

* Update qcommander.sh bootrap (#304)

* v2.0.1 (#308)

* roll up v2.0.1-b2 to develop

* b2-fixed

* adjust return data of fast sync so it doesn't return the earliest frame

* -b3

* fix: announce peer based on leading frame, not initial frame; fix: looping bug

* fix: last batch fails due to underflow; qol: make logging chattier

* -b4

* resolve frame cache issue

* fix: mint loop + re-migrate

* fix: register execution panic

* fix: mint loop, other side

* fix: handle unexpected return of nil status

* final -b4

* handle subtle change to migration

* qol: add heuristic to handle corruption scenario

* bump genesis

* qol: use separate channel for worker

* final parameterization, parallelize streams

* deprecate signers 10, 11, 14, 17

* adjust signatory check size to match rotated out signers

* V2.0.2.3 (#321)

* roll up v2.0.1-b2 to develop

* b2-fixed

* adjust return data of fast sync so it doesn't return the earliest frame

* -b3

* fix: announce peer based on leading frame, not initial frame; fix: looping bug

* fix: last batch fails due to underflow; qol: make logging chattier

* -b4

* resolve frame cache issue

* fix: mint loop + re-migrate

* fix: register execution panic

* fix: mint loop, other side

* fix: handle unexpected return of nil status

* final -b4

* handle subtle change to migration

* qol: add heuristic to handle corruption scenario

* bump genesis

* qol: use separate channel for worker

* final parameterization, parallelize streams

* Add direct peers to blossomsub (#309)

Co-authored-by: Tyler Sturos <tyler.john@qcommander.sh>

* chore(docker): add ca-certificates to fix x509 error. (#307)

* Update qcommander.sh bootrap (#304)

* chore(docker): add ca-certificates to fix x509 error.

---------

Co-authored-by: Tyler Sturos <55340199+tjsturos@users.noreply.github.com>

* deprecate signers 10, 11, 14, 17

* adjust signatory check size to match rotated out signers

* qol: sync by rebroadcast

* upgrade version

* more small adjustments

* wait a little longer

* fix: don't use iterator for frame directly until iterator is fixed

* change iterator, genesis for testnet

* adjust to previous sync handling

* adjust: don't grab the very latest while it's already being broadcasted

* ok, ready for testnet

* handle rebroadcast quirks

* more adjustments from testing

* faster

* temporarily bulk process on frame candidates

* resolve separate frames

* don't loop

* make worker reset resume to check where it should continue

* move window

* reduce signature count now that supermajority signed last

* resolve bottlenecks

* remove GOMAXPROCS limit for now

* revisions for v2.0.2.1

* bump version

* bulk import

* reintroduce sync

* small adustments to make life better

* check bitmask for peers and keep alive

* adjust reconnect

* ensure peer doesn't fall off address list

* adjust blossomsub to background discovery

* bump version

* remove dev check

* remove debug log line

* further adjustments

* a little more logic around connection management

* v2.0.2.3

* Fix peer discovery (#319)

* Fix peer discovery

* Make peer discovery connections parallel

* Monitor peers via pings (#317)

* Support QUILIBRIUM_SIGNATURE_CHECK in client (#314)

* Ensure direct peers are not pruned by resource limits (#315)

* Support pprof profiling via HTTP (#313)

* Fix CPU profiling

* Add pprof server support

* Additional peering connection improvements (#320)

* Lookup peers if not enough external peers are available

* Make bootstrap peer discovery sensitive to a lack of bootstrappers

---------

Co-authored-by: Tyler Sturos <55340199+tjsturos@users.noreply.github.com>
Co-authored-by: Tyler Sturos <tyler.john@qcommander.sh>
Co-authored-by: linquanisaac <33619994+linquanisaac@users.noreply.github.com>
Co-authored-by: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com>

---------

Co-authored-by: Tyler Sturos <55340199+tjsturos@users.noreply.github.com>
Co-authored-by: Tyler Sturos <tyler.john@qcommander.sh>
Co-authored-by: linquanisaac <33619994+linquanisaac@users.noreply.github.com>
Co-authored-by: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com>
This commit is contained in:
Cassandra Heart 2024-10-31 16:46:58 -05:00 committed by GitHub
parent 448ee56b46
commit 3dd9a0c5f3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 596 additions and 204 deletions

View File

@ -6,13 +6,19 @@ import (
"encoding/binary"
"fmt"
"os"
gotime "time"
"strings"
"time"
"github.com/iden3/go-iden3-crypto/poseidon"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token"
"google.golang.org/grpc"
"source.quilibrium.com/quilibrium/monorepo/node/config"
"source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token/application"
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
"source.quilibrium.com/quilibrium/monorepo/node/store"
)
@ -27,12 +33,6 @@ var allCmd = &cobra.Command{
os.Exit(1)
}
conn, err := GetGRPCClient()
if err != nil {
panic(err)
}
defer conn.Close()
if !LightNode {
fmt.Println(
"mint all cannot be run unless node is not running. ensure your node " +
@ -41,8 +41,6 @@ var allCmd = &cobra.Command{
os.Exit(1)
}
client := protobufs.NewNodeServiceClient(conn)
db := store.NewPebbleDB(NodeConfig.DB)
logger, _ := zap.NewProduction()
dataProofStore := store.NewPebbleDataProofStore(db, logger)
@ -57,43 +55,100 @@ var allCmd = &cobra.Command{
panic(err)
}
pubSub := p2p.NewBlossomSub(NodeConfig.P2P, logger)
logger.Info("connecting to network")
time.Sleep(5 * time.Second)
increment, _, _, err := dataProofStore.GetLatestDataTimeProof(
[]byte(peerId),
)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
logger.Info("could not find pre-2.0 proofs")
return
}
addr, err := poseidon.HashBytes([]byte(peerId))
panic(err)
}
addrBI, err := poseidon.HashBytes([]byte(peerId))
if err != nil {
panic(err)
}
addr := addrBI.FillBytes(make([]byte, 32))
genesis := config.GetGenesis()
bpub, err := crypto.UnmarshalEd448PublicKey(genesis.Beacon)
if err != nil {
panic(err)
}
resp, err := client.GetPreCoinProofsByAccount(
context.Background(),
&protobufs.GetPreCoinProofsByAccountRequest{
Address: addr.FillBytes(make([]byte, 32)),
},
)
bpeerId, err := peer.IDFromPublicKey(bpub)
if err != nil {
panic(err)
panic(errors.Wrap(err, "error getting peer id"))
}
resume := make([]byte, 32)
for _, pr := range resp.Proofs {
if pr.IndexProof != nil {
resume, err = token.GetAddressOfPreCoinProof(pr)
cc, err := pubSub.GetDirectChannel([]byte(bpeerId), "worker")
if err != nil {
panic(err)
logger.Info(
"could not establish direct channel, waiting...",
zap.Error(err),
)
time.Sleep(10 * time.Second)
}
increment = pr.Difficulty - 1
for {
if cc == nil {
cc, err = pubSub.GetDirectChannel([]byte(bpeerId), "worker")
if err != nil {
logger.Info(
"could not establish direct channel, waiting...",
zap.Error(err),
)
cc = nil
time.Sleep(10 * time.Second)
continue
}
}
if increment == 0 && !bytes.Equal(resume, make([]byte, 32)) {
fmt.Println("already completed pre-midnight mint")
return
client := protobufs.NewDataServiceClient(cc)
if bytes.Equal(resume, make([]byte, 32)) {
status, err := client.GetPreMidnightMintStatus(
context.Background(),
&protobufs.PreMidnightMintStatusRequest{
Owner: addr,
},
grpc.MaxCallSendMsgSize(1*1024*1024),
grpc.MaxCallRecvMsgSize(1*1024*1024),
)
if err != nil || status == nil {
logger.Error(
"got error response, waiting...",
zap.Error(err),
)
time.Sleep(10 * time.Second)
cc.Close()
cc = nil
err = pubSub.Reconnect([]byte(peerId))
if err != nil {
logger.Error(
"got error response, waiting...",
zap.Error(err),
)
time.Sleep(10 * time.Second)
}
continue
}
resume = status.Address
if status.Increment != 0 {
increment = status.Increment - 1
} else if !bytes.Equal(status.Address, make([]byte, 32)) {
increment = 0
}
}
proofs := [][]byte{
@ -102,6 +157,7 @@ var allCmd = &cobra.Command{
}
batchCount := 0
// the cast is important, it underflows without:
for i := int(increment); i >= 0; i-- {
_, parallelism, input, output, err := dataProofStore.GetDataTimeProof(
[]byte(peerId),
@ -118,27 +174,33 @@ var allCmd = &cobra.Command{
proofs = append(proofs, p)
} else {
fmt.Println("could not find data time proof for peer and increment, stopping at increment", i)
panic(err)
logger.Error(
"could not find data time proof for peer and increment, stopping worker",
zap.String("peer_id", peerId.String()),
zap.Int("increment", i),
)
cc.Close()
cc = nil
return
}
batchCount++
if batchCount == 200 || i == 0 {
fmt.Println("publishing proof batch, increment", i)
logger.Info("publishing proof batch", zap.Int("increment", i))
payload := []byte("mint")
for _, i := range proofs {
payload = append(payload, i...)
}
sig, err := privKey.Sign(payload)
sig, err := pubSub.SignMessage(payload)
if err != nil {
cc.Close()
panic(err)
}
_, err = client.SendMessage(
resp, err := client.HandlePreMidnightMint(
context.Background(),
&protobufs.TokenRequest{
Request: &protobufs.TokenRequest_Mint{
Mint: &protobufs.MintCoinRequest{
&protobufs.MintCoinRequest{
Proofs: proofs,
Signature: &protobufs.Ed448Signature{
PublicKey: &protobufs.Ed448PublicKey{
@ -147,38 +209,43 @@ var allCmd = &cobra.Command{
Signature: sig,
},
},
},
},
grpc.MaxCallSendMsgSize(1*1024*1024),
grpc.MaxCallRecvMsgSize(1*1024*1024),
)
if err != nil {
panic(err)
if strings.Contains(
err.Error(),
application.ErrInvalidStateTransition.Error(),
) && i == 0 {
resume = make([]byte, 32)
logger.Info("pre-midnight proofs submitted, returning")
cc.Close()
cc = nil
return
}
waitForConf:
for {
gotime.Sleep(20 * gotime.Second)
resp, err := client.GetPreCoinProofsByAccount(
context.Background(),
&protobufs.GetPreCoinProofsByAccountRequest{
Address: addr.FillBytes(make([]byte, 32)),
},
logger.Error(
"got error response, waiting...",
zap.Error(err),
)
resume = make([]byte, 32)
cc.Close()
cc = nil
time.Sleep(10 * time.Second)
err = pubSub.Reconnect([]byte(peerId))
if err != nil {
for _, pr := range resp.Proofs {
if pr.IndexProof != nil {
newResume, err := token.GetAddressOfPreCoinProof(pr)
if err != nil {
panic(err)
}
if bytes.Equal(newResume, resume) {
fmt.Println("waiting for confirmation...")
continue waitForConf
}
}
}
logger.Error(
"got error response, waiting...",
zap.Error(err),
)
time.Sleep(10 * time.Second)
}
break
}
resume = resp.Address
batchCount = 0
proofs = [][]byte{
[]byte("pre-dusk"),
@ -186,8 +253,15 @@ var allCmd = &cobra.Command{
}
if i == 0 {
fmt.Println("all proofs submitted, returning")
logger.Info("pre-midnight proofs submitted, returning")
cc.Close()
cc = nil
return
} else {
increment = uint32(i) - 1
}
break
}
}
}

View File

@ -22,6 +22,16 @@ var balanceCmd = &cobra.Command{
client := protobufs.NewNodeServiceClient(conn)
peerId := GetPeerIDFromConfig(NodeConfig)
privKey, err := GetPrivKeyFromConfig(NodeConfig)
if err != nil {
panic(err)
}
pub, err := privKey.GetPublic().Raw()
if err != nil {
panic(err)
}
addr, err := poseidon.HashBytes([]byte(peerId))
if err != nil {
panic(err)
@ -38,16 +48,42 @@ var balanceCmd = &cobra.Command{
panic(err)
}
if info.OwnedTokens == nil {
panic("invalid response from RPC")
}
tokens := new(big.Int).SetBytes(info.OwnedTokens)
conversionFactor, _ := new(big.Int).SetString("1DCD65000", 16)
r := new(big.Rat).SetFrac(tokens, conversionFactor)
altAddr, err := poseidon.HashBytes([]byte(pub))
if err != nil {
panic(err)
}
altAddrBytes := altAddr.FillBytes(make([]byte, 32))
info, err = client.GetTokenInfo(
context.Background(),
&protobufs.GetTokenInfoRequest{
Address: altAddrBytes,
},
)
if err != nil {
panic(err)
}
if info.OwnedTokens == nil {
panic("invalid response from RPC")
}
tokens = new(big.Int).SetBytes(info.OwnedTokens)
r2 := new(big.Rat).SetFrac(tokens, conversionFactor)
fmt.Println("Total balance:", r.FloatString(12), fmt.Sprintf(
"QUIL (Account 0x%x)",
addrBytes,
))
if r2.Cmp(big.NewRat(0, 1)) != 0 {
fmt.Println("Total balance:", r2.FloatString(12), fmt.Sprintf(
"QUIL (Account 0x%x)",
altAddrBytes,
))
}
},
}

View File

@ -22,6 +22,16 @@ var coinsCmd = &cobra.Command{
client := protobufs.NewNodeServiceClient(conn)
peerId := GetPeerIDFromConfig(NodeConfig)
privKey, err := GetPrivKeyFromConfig(NodeConfig)
if err != nil {
panic(err)
}
pub, err := privKey.GetPublic().Raw()
if err != nil {
panic(err)
}
addr, err := poseidon.HashBytes([]byte(peerId))
if err != nil {
panic(err)
@ -42,6 +52,26 @@ var coinsCmd = &cobra.Command{
panic("invalid response from RPC")
}
altAddr, err := poseidon.HashBytes([]byte(pub))
if err != nil {
panic(err)
}
altAddrBytes := altAddr.FillBytes(make([]byte, 32))
resp2, err := client.GetTokensByAccount(
context.Background(),
&protobufs.GetTokensByAccountRequest{
Address: altAddrBytes,
},
)
if err != nil {
panic(err)
}
if len(resp.Coins) != len(resp.FrameNumbers) {
panic("invalid response from RPC")
}
for i, coin := range resp.Coins {
amount := new(big.Int).SetBytes(coin.Amount)
conversionFactor, _ := new(big.Int).SetString("1DCD65000", 16)
@ -51,6 +81,15 @@ var coinsCmd = &cobra.Command{
fmt.Sprintf("QUIL (Coin 0x%x)", resp.Addresses[i]),
)
}
for i, coin := range resp2.Coins {
amount := new(big.Int).SetBytes(coin.Amount)
conversionFactor, _ := new(big.Int).SetString("1DCD65000", 16)
r := new(big.Rat).SetFrac(amount, conversionFactor)
fmt.Println(
r.FloatString(12),
fmt.Sprintf("QUIL (Coin 0x%x)", resp.Addresses[i]),
)
}
},
}

View File

@ -6,6 +6,7 @@ import (
"encoding/hex"
"fmt"
"os"
"strconv"
"strings"
"github.com/cloudflare/circl/sign/ed448"
@ -150,6 +151,20 @@ func GetGRPCClient() (*grpc.ClientConn, error) {
)
}
func signatureCheckDefault() bool {
envVarValue, envVarExists := os.LookupEnv("QUILIBRIUM_SIGNATURE_CHECK")
if envVarExists {
def, err := strconv.ParseBool(envVarValue)
if err == nil {
return def
} else {
fmt.Println("Invalid environment variable QUILIBRIUM_SIGNATURE_CHECK, must be 'true' or 'false'. Got: " + envVarValue)
}
}
return true
}
func init() {
rootCmd.PersistentFlags().StringVar(
&configDirectory,
@ -166,7 +181,7 @@ func init() {
rootCmd.PersistentFlags().BoolVar(
&signatureCheck,
"signature-check",
true,
"bypass signature check (not recommended for binaries)",
signatureCheckDefault(),
"bypass signature check (not recommended for binaries) (default true or value of QUILIBRIUM_SIGNATURE_CHECK env var)",
)
}

View File

@ -19,7 +19,7 @@ import (
// can be contacted on. The "seen" events expire by default after 40 minutes
// (OwnObservedAddressTTL * ActivationThreshold). The are cleaned up during
// the GC rounds set by GCInterval.
var ActivationThresh = 4
var ActivationThresh = 1
// observedAddrManagerWorkerChannelSize defines how many addresses can be enqueued
// for adding to an ObservedAddrManager.

View File

@ -36,7 +36,7 @@ func FormatVersion(version []byte) string {
}
func GetPatchNumber() byte {
return 0x01
return 0x03
}
func GetRCNumber() byte {

View File

@ -349,7 +349,7 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
panic(err)
}
if frame.FrameNumber >= nextFrame.FrameNumber ||
if frame.FrameNumber-100 >= nextFrame.FrameNumber ||
nextFrame.FrameNumber == 0 {
time.Sleep(60 * time.Second)
continue

View File

@ -231,15 +231,6 @@ func (e *DataClockConsensusEngine) applySnapshot(
key = binary.BigEndian.AppendUint64(key, 0)
key = append(key, e.filter...)
iter, err := temporaryStore.NewIter(
nil,
nil,
)
for iter.First(); iter.Valid(); iter.Next() {
fmt.Printf("%x\n", iter.Key())
}
_, _, err = temporaryClockStore.GetDataClockFrame(
e.filter,
max.FrameNumber+1,

View File

@ -141,6 +141,14 @@ func (e *DataClockConsensusEngine) runPreMidnightProofWorker() {
time.Sleep(10 * time.Second)
cc.Close()
cc = nil
err = e.pubSub.Reconnect([]byte(peerId))
if err != nil {
e.logger.Error(
"got error response, waiting...",
zap.Error(err),
)
time.Sleep(10 * time.Second)
}
continue
}
@ -211,13 +219,11 @@ func (e *DataClockConsensusEngine) runPreMidnightProofWorker() {
Signature: sig,
},
},
grpc.MaxCallSendMsgSize(1*1024*1024),
grpc.MaxCallRecvMsgSize(1*1024*1024),
)
if err != nil {
e.logger.Error(
"got error response, waiting...",
zap.Error(err),
)
if strings.Contains(
err.Error(),
application.ErrInvalidStateTransition.Error(),
@ -229,10 +235,23 @@ func (e *DataClockConsensusEngine) runPreMidnightProofWorker() {
return
}
e.logger.Error(
"got error response, waiting...",
zap.Error(err),
)
resume = make([]byte, 32)
cc.Close()
cc = nil
time.Sleep(10 * time.Second)
err = e.pubSub.Reconnect([]byte(peerId))
if err != nil {
e.logger.Error(
"got error response, waiting...",
zap.Error(err),
)
time.Sleep(10 * time.Second)
}
break
}

View File

@ -72,6 +72,7 @@ func (p pubsub) GetPublicKey() []byte { return p.pubkey }
func (pubsub) GetPeerScore(peerId []byte) int64 { return 0 }
func (pubsub) SetPeerScore(peerId []byte, score int64) {}
func (pubsub) AddPeerScore(peerId []byte, scoreDelta int64) {}
func (pubsub) Reconnect(peerId []byte) error { return nil }
type outputs struct {
difficulty uint32

View File

@ -11,6 +11,8 @@ import (
"io/fs"
"log"
"math/big"
"net/http"
npprof "net/http/pprof"
"os"
"os/exec"
"os/signal"
@ -77,6 +79,11 @@ var (
"",
"write memory profile after 20m to this file",
)
pprofServer = flag.String(
"pprof-server",
"",
"enable pprof server on specified address (e.g. localhost:6060)",
)
nodeInfo = flag.Bool(
"node-info",
false,
@ -225,10 +232,23 @@ func main() {
if err != nil {
log.Fatal(err)
}
defer f.Close()
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
if *pprofServer != "" && *core == 0 {
go func() {
mux := http.NewServeMux()
mux.HandleFunc("/debug/pprof/", npprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", npprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", npprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", npprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", npprof.Trace)
log.Fatal(http.ListenAndServe(*pprofServer, mux))
}()
}
if *balance {
config, err := config.LoadConfig(*configDirectory, "", false)
if err != nil {
@ -429,7 +449,7 @@ func main() {
return
}
runtime.GOMAXPROCS(1)
// runtime.GOMAXPROCS(1)
if nodeConfig.ListenGRPCMultiaddr != "" {
srv, err := rpc.NewRPCServer(

View File

@ -31,6 +31,7 @@ import (
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/mr-tron/base58"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
@ -92,6 +93,80 @@ func getPeerID(p2pConfig *config.P2PConfig) peer.ID {
return id
}
func NewBlossomSubStreamer(
p2pConfig *config.P2PConfig,
logger *zap.Logger,
) *BlossomSub {
ctx := context.Background()
opts := []libp2pconfig.Option{
libp2p.ListenAddrStrings(p2pConfig.ListenMultiaddr),
}
bootstrappers := []peer.AddrInfo{}
peerinfo, err := peer.AddrInfoFromString("/ip4/185.209.178.191/udp/8336/quic-v1/p2p/QmcKQjpQmLpbDsiif2MuakhHFyxWvqYauPsJDaXnLav7PJ")
if err != nil {
panic(err)
}
bootstrappers = append(bootstrappers, *peerinfo)
var privKey crypto.PrivKey
if p2pConfig.PeerPrivKey != "" {
peerPrivKey, err := hex.DecodeString(p2pConfig.PeerPrivKey)
if err != nil {
panic(errors.Wrap(err, "error unmarshaling peerkey"))
}
privKey, err = crypto.UnmarshalEd448PrivateKey(peerPrivKey)
if err != nil {
panic(errors.Wrap(err, "error unmarshaling peerkey"))
}
opts = append(opts, libp2p.Identity(privKey))
}
bs := &BlossomSub{
ctx: ctx,
logger: logger,
bitmaskMap: make(map[string]*blossomsub.Bitmask),
signKey: privKey,
peerScore: make(map[string]int64),
isBootstrapPeer: false,
network: p2pConfig.Network,
}
h, err := libp2p.New(opts...)
if err != nil {
panic(errors.Wrap(err, "error constructing p2p"))
}
logger.Info("established peer id", zap.String("peer_id", h.ID().String()))
kademliaDHT := initDHT(
ctx,
p2pConfig,
logger,
h,
false,
bootstrappers,
)
routingDiscovery := routing.NewRoutingDiscovery(kademliaDHT)
util.Advertise(ctx, routingDiscovery, getNetworkNamespace(p2pConfig.Network))
if err != nil {
panic(err)
}
peerID := h.ID()
bs.peerID = peerID
bs.h = h
bs.signKey = privKey
return bs
}
func NewBlossomSub(
p2pConfig *config.P2PConfig,
logger *zap.Logger,
@ -163,6 +238,23 @@ func NewBlossomSub(
opts = append(opts, libp2p.Identity(privKey))
}
allowedPeers := []peer.AddrInfo{}
allowedPeers = append(allowedPeers, bootstrappers...)
directPeers := []peer.AddrInfo{}
if len(p2pConfig.DirectPeers) > 0 {
logger.Info("found direct peers in config")
for _, peerAddr := range p2pConfig.DirectPeers {
peerinfo, err := peer.AddrInfoFromString(peerAddr)
if err != nil {
panic(err)
}
logger.Info("adding direct peer", zap.String("peer", peerinfo.ID.String()))
directPeers = append(directPeers, *peerinfo)
}
}
allowedPeers = append(allowedPeers, directPeers...)
if p2pConfig.LowWatermarkConnections != 0 &&
p2pConfig.HighWatermarkConnections != 0 {
cm, err := connmgr.NewConnManager(
@ -176,7 +268,7 @@ func NewBlossomSub(
rm, err := resourceManager(
p2pConfig.HighWatermarkConnections,
bootstrappers,
allowedPeers,
)
if err != nil {
panic(err)
@ -223,7 +315,9 @@ func NewBlossomSub(
verifyReachability(p2pConfig)
discoverPeers(p2pConfig, ctx, logger, h, routingDiscovery)
discoverPeers(p2pConfig, ctx, logger, h, routingDiscovery, true)
go monitorPeers(ctx, logger, h)
// TODO: turn into an option flag for console logging, this is too noisy for
// default logging behavior
@ -244,22 +338,9 @@ func NewBlossomSub(
blossomsub.WithStrictSignatureVerification(true),
}
if len(p2pConfig.DirectPeers) > 0 {
logger.Info("Found direct peers in config")
directPeers := []peer.AddrInfo{}
for _, peerAddr := range p2pConfig.DirectPeers {
peerinfo, err := peer.AddrInfoFromString(peerAddr)
if err != nil {
panic(err)
}
logger.Info("Adding direct peer", zap.String("peer", peerinfo.ID.String()))
directPeers = append(directPeers, *peerinfo)
}
if len(directPeers) > 0 {
blossomOpts = append(blossomOpts, blossomsub.WithDirectPeers(directPeers))
}
}
if tracer != nil {
blossomOpts = append(blossomOpts, blossomsub.WithEventTracer(tracer))
@ -303,12 +384,24 @@ func NewBlossomSub(
bs.h = h
bs.signKey = privKey
allowedPeerIDs := make(map[peer.ID]struct{}, len(allowedPeers))
for _, peerInfo := range allowedPeers {
allowedPeerIDs[peerInfo.ID] = struct{}{}
}
go func() {
for {
time.Sleep(30 * time.Second)
for _, b := range bs.bitmaskMap {
if len(b.ListPeers()) < 4 {
discoverPeers(p2pConfig, bs.ctx, logger, bs.h, routingDiscovery)
bitmaskPeers := b.ListPeers()
peerCount := len(bitmaskPeers)
for _, p := range bitmaskPeers {
if _, ok := allowedPeerIDs[p]; ok {
peerCount--
}
}
if peerCount < 4 {
discoverPeers(p2pConfig, bs.ctx, logger, bs.h, routingDiscovery, false)
break
}
}
}
@ -319,7 +412,7 @@ func NewBlossomSub(
// adjusted from Lotus' reference implementation, addressing
// https://github.com/libp2p/go-libp2p/issues/1640
func resourceManager(highWatermark uint, bootstrappers []peer.AddrInfo) (
func resourceManager(highWatermark uint, allowed []peer.AddrInfo) (
network.ResourceManager,
error,
) {
@ -386,18 +479,18 @@ func resourceManager(highWatermark uint, bootstrappers []peer.AddrInfo) (
)
resolver := madns.DefaultResolver
var bootstrapperMaddrs []ma.Multiaddr
for _, pi := range bootstrappers {
var allowedMaddrs []ma.Multiaddr
for _, pi := range allowed {
for _, addr := range pi.Addrs {
resolved, err := resolver.Resolve(context.Background(), addr)
if err != nil {
continue
}
bootstrapperMaddrs = append(bootstrapperMaddrs, resolved...)
allowedMaddrs = append(allowedMaddrs, resolved...)
}
}
opts = append(opts, rcmgr.WithAllowlistedMultiaddrs(bootstrapperMaddrs))
opts = append(opts, rcmgr.WithAllowlistedMultiaddrs(allowedMaddrs))
mgr, err := rcmgr.NewResourceManager(limiter, opts...)
if err != nil {
@ -505,6 +598,79 @@ func (b *BlossomSub) GetRandomPeer(bitmask []byte) ([]byte, error) {
return []byte(peers[sel.Int64()]), nil
}
// monitorPeers periodically looks up the peers connected to the host and pings them
// up to 3 times to ensure they are still reachable. If the peer is not reachable after
// 3 attempts, the connections to the peer are closed.
func monitorPeers(ctx context.Context, logger *zap.Logger, h host.Host) {
const timeout, period, attempts = time.Minute, time.Minute, 3
// Do not allow the pings to dial new connections. Adding new peers is a separate
// process and should not be done during the ping process.
ctx = network.WithNoDial(ctx, "monitor peers")
pingOnce := func(ctx context.Context, logger *zap.Logger, id peer.ID) bool {
pingCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
select {
case <-ctx.Done():
case <-pingCtx.Done():
logger.Debug("ping timeout")
return false
case res := <-ping.Ping(pingCtx, h, id):
if res.Error != nil {
logger.Debug("ping error", zap.Error(res.Error))
return false
}
logger.Debug("ping success", zap.Duration("rtt", res.RTT))
}
return true
}
ping := func(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, id peer.ID) {
defer wg.Done()
var conns []network.Conn
for i := 0; i < attempts; i++ {
// There are no fine grained semantics in libp2p that would allow us to 'ping via
// a specific connection'. We can only ping a peer, which will attempt to open a stream via a connection.
// As such, we save a snapshot of the connections that were potentially in use before
// the ping, and close them if the ping fails. If new connections occur between the snapshot
// and the ping, they will not be closed, and will be pinged in the next iteration.
conns = h.Network().ConnsToPeer(id)
if pingOnce(ctx, logger, id) {
return
}
}
for _, conn := range conns {
_ = conn.Close()
}
}
for {
select {
case <-ctx.Done():
return
case <-time.After(period):
// This is once again a snapshot of the peers at the time of the ping. If new peers
// are added between the snapshot and the ping, they will be pinged in the next iteration.
peers := h.Network().Peers()
connected := make([]peer.ID, 0, len(peers))
for _, p := range peers {
// The connection status may change both before and after the check. Still, it is better
// to focus on pinging only connections which are potentially connected at the moment of the check.
switch h.Network().Connectedness(p) {
case network.Connected, network.Limited:
connected = append(connected, p)
}
}
logger.Debug("pinging connected peers", zap.Int("peer_count", len(connected)))
wg := &sync.WaitGroup{}
for _, id := range connected {
logger := logger.With(zap.String("peer_id", id.String()))
wg.Add(1)
go ping(ctx, logger, wg, id)
}
wg.Wait()
logger.Debug("pinged connected peers")
}
}
}
func initDHT(
ctx context.Context,
p2pConfig *config.P2PConfig,
@ -517,19 +683,12 @@ func initDHT(
var kademliaDHT *dht.IpfsDHT
var err error
if isBootstrapPeer {
if p2pConfig.Network == 0 {
panic(
"this release is for normal peers only, if you are running a " +
"bootstrap node, please use v2.0-bootstrap",
)
} else {
kademliaDHT, err = dht.New(
ctx,
h,
dht.Mode(dht.ModeServer),
dht.BootstrapPeers(bootstrappers...),
)
}
} else {
kademliaDHT, err = dht.New(
ctx,
@ -546,9 +705,13 @@ func initDHT(
}
reconnect := func() {
wg := &sync.WaitGroup{}
defer wg.Wait()
for _, peerinfo := range bootstrappers {
peerinfo := peerinfo
wg.Add(1)
go func() {
defer wg.Done()
if peerinfo.ID == h.ID() ||
h.Network().Connectedness(peerinfo.ID) == network.Connected ||
h.Network().Connectedness(peerinfo.ID) == network.Limited {
@ -570,10 +733,21 @@ func initDHT(
reconnect()
bootstrapPeerIDs := make(map[peer.ID]struct{}, len(bootstrappers))
for _, peerinfo := range bootstrappers {
bootstrapPeerIDs[peerinfo.ID] = struct{}{}
}
go func() {
for {
time.Sleep(30 * time.Second)
if len(h.Network().Peers()) == 0 {
found := false
for _, p := range h.Network().Peers() {
if _, ok := bootstrapPeerIDs[p]; ok {
found = true
break
}
}
if !found {
reconnect()
}
}
@ -582,6 +756,19 @@ func initDHT(
return kademliaDHT
}
func (b *BlossomSub) Reconnect(peerId []byte) error {
peer := peer.ID(peerId)
info := b.h.Peerstore().PeerInfo(peer)
b.h.ConnManager().Unprotect(info.ID, "bootstrap")
time.Sleep(10 * time.Second)
if err := b.h.Connect(b.ctx, info); err != nil {
return errors.Wrap(err, "reconnect")
}
b.h.ConnManager().Protect(info.ID, "bootstrap")
return nil
}
func (b *BlossomSub) GetPeerScore(peerId []byte) int64 {
b.peerScoreMx.Lock()
score := b.peerScore[string(peerId)]
@ -841,10 +1028,12 @@ func discoverPeers(
logger *zap.Logger,
h host.Host,
routingDiscovery *routing.RoutingDiscovery,
init bool,
) {
logger.Info("initiating peer discovery")
discover := func() {
logger.Info("initiating peer discovery")
defer logger.Info("completed peer discovery")
peerChan, err := routingDiscovery.FindPeers(
ctx,
getNetworkNamespace(p2pConfig.Network),
@ -854,16 +1043,21 @@ func discoverPeers(
return
}
count := 12
wg := &sync.WaitGroup{}
defer wg.Wait()
for peer := range peerChan {
if count == 0 {
if len(h.Network().Peers()) >= 6 {
break
}
peer := peer
wg.Add(1)
go func() {
defer wg.Done()
if peer.ID == h.ID() ||
h.Network().Connectedness(peer.ID) == network.Connected ||
h.Network().Connectedness(peer.ID) == network.Limited {
continue
return
}
logger.Debug("found peer", zap.String("peer_id", peer.ID.String()))
@ -875,18 +1069,20 @@ func discoverPeers(
zap.Error(err),
)
} else {
count--
logger.Debug(
"connected to peer",
zap.String("peer_id", peer.ID.String()),
)
}
}()
}
}
if init {
go discover()
} else {
discover()
logger.Info("completed peer discovery")
}
}
func mergeDefaults(p2pConfig *config.P2PConfig) blossomsub.BlossomSubParams {

View File

@ -36,4 +36,5 @@ type PubSub interface {
GetPeerScore(peerId []byte) int64
SetPeerScore(peerId []byte, score int64)
AddPeerScore(peerId []byte, scoreDelta int64)
Reconnect(peerId []byte) error
}