From 3dd9a0c5f3293d10ec9ac25de491cc7b661442e3 Mon Sep 17 00:00:00 2001 From: Cassandra Heart <7929478+CassOnMars@users.noreply.github.com> Date: Thu, 31 Oct 2024 16:46:58 -0500 Subject: [PATCH] get develop caught up (#322) * Update qcommander.sh bootrap (#304) * v2.0.1 (#308) * roll up v2.0.1-b2 to develop * b2-fixed * adjust return data of fast sync so it doesn't return the earliest frame * -b3 * fix: announce peer based on leading frame, not initial frame; fix: looping bug * fix: last batch fails due to underflow; qol: make logging chattier * -b4 * resolve frame cache issue * fix: mint loop + re-migrate * fix: register execution panic * fix: mint loop, other side * fix: handle unexpected return of nil status * final -b4 * handle subtle change to migration * qol: add heuristic to handle corruption scenario * bump genesis * qol: use separate channel for worker * final parameterization, parallelize streams * deprecate signers 10, 11, 14, 17 * adjust signatory check size to match rotated out signers * V2.0.2.3 (#321) * roll up v2.0.1-b2 to develop * b2-fixed * adjust return data of fast sync so it doesn't return the earliest frame * -b3 * fix: announce peer based on leading frame, not initial frame; fix: looping bug * fix: last batch fails due to underflow; qol: make logging chattier * -b4 * resolve frame cache issue * fix: mint loop + re-migrate * fix: register execution panic * fix: mint loop, other side * fix: handle unexpected return of nil status * final -b4 * handle subtle change to migration * qol: add heuristic to handle corruption scenario * bump genesis * qol: use separate channel for worker * final parameterization, parallelize streams * Add direct peers to blossomsub (#309) Co-authored-by: Tyler Sturos * chore(docker): add ca-certificates to fix x509 error. (#307) * Update qcommander.sh bootrap (#304) * chore(docker): add ca-certificates to fix x509 error. --------- Co-authored-by: Tyler Sturos <55340199+tjsturos@users.noreply.github.com> * deprecate signers 10, 11, 14, 17 * adjust signatory check size to match rotated out signers * qol: sync by rebroadcast * upgrade version * more small adjustments * wait a little longer * fix: don't use iterator for frame directly until iterator is fixed * change iterator, genesis for testnet * adjust to previous sync handling * adjust: don't grab the very latest while it's already being broadcasted * ok, ready for testnet * handle rebroadcast quirks * more adjustments from testing * faster * temporarily bulk process on frame candidates * resolve separate frames * don't loop * make worker reset resume to check where it should continue * move window * reduce signature count now that supermajority signed last * resolve bottlenecks * remove GOMAXPROCS limit for now * revisions for v2.0.2.1 * bump version * bulk import * reintroduce sync * small adustments to make life better * check bitmask for peers and keep alive * adjust reconnect * ensure peer doesn't fall off address list * adjust blossomsub to background discovery * bump version * remove dev check * remove debug log line * further adjustments * a little more logic around connection management * v2.0.2.3 * Fix peer discovery (#319) * Fix peer discovery * Make peer discovery connections parallel * Monitor peers via pings (#317) * Support QUILIBRIUM_SIGNATURE_CHECK in client (#314) * Ensure direct peers are not pruned by resource limits (#315) * Support pprof profiling via HTTP (#313) * Fix CPU profiling * Add pprof server support * Additional peering connection improvements (#320) * Lookup peers if not enough external peers are available * Make bootstrap peer discovery sensitive to a lack of bootstrappers --------- Co-authored-by: Tyler Sturos <55340199+tjsturos@users.noreply.github.com> Co-authored-by: Tyler Sturos Co-authored-by: linquanisaac <33619994+linquanisaac@users.noreply.github.com> Co-authored-by: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> --------- Co-authored-by: Tyler Sturos <55340199+tjsturos@users.noreply.github.com> Co-authored-by: Tyler Sturos Co-authored-by: linquanisaac <33619994+linquanisaac@users.noreply.github.com> Co-authored-by: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> --- client/cmd/all.go | 306 +++++++++------- client/cmd/balance.go | 42 ++- client/cmd/coins.go | 39 +++ client/cmd/root.go | 19 +- go-libp2p/p2p/protocol/identify/obsaddr.go | 2 +- node/config/version.go | 2 +- .../data/data_clock_consensus_engine.go | 2 +- node/consensus/data/frame_importer.go | 9 - .../data/pre_midnight_proof_worker.go | 27 +- node/consensus/data/token_handle_mint_test.go | 1 + node/main.go | 22 +- node/p2p/blossomsub.go | 328 ++++++++++++++---- node/p2p/pubsub.go | 1 + 13 files changed, 596 insertions(+), 204 deletions(-) diff --git a/client/cmd/all.go b/client/cmd/all.go index 2aaf069..1a67b57 100644 --- a/client/cmd/all.go +++ b/client/cmd/all.go @@ -6,13 +6,19 @@ import ( "encoding/binary" "fmt" "os" - - gotime "time" + "strings" + "time" "github.com/iden3/go-iden3-crypto/poseidon" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/pkg/errors" "github.com/spf13/cobra" "go.uber.org/zap" - "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token" + "google.golang.org/grpc" + "source.quilibrium.com/quilibrium/monorepo/node/config" + "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token/application" + "source.quilibrium.com/quilibrium/monorepo/node/p2p" "source.quilibrium.com/quilibrium/monorepo/node/protobufs" "source.quilibrium.com/quilibrium/monorepo/node/store" ) @@ -27,12 +33,6 @@ var allCmd = &cobra.Command{ os.Exit(1) } - conn, err := GetGRPCClient() - if err != nil { - panic(err) - } - defer conn.Close() - if !LightNode { fmt.Println( "mint all cannot be run unless node is not running. ensure your node " + @@ -41,8 +41,6 @@ var allCmd = &cobra.Command{ os.Exit(1) } - client := protobufs.NewNodeServiceClient(conn) - db := store.NewPebbleDB(NodeConfig.DB) logger, _ := zap.NewProduction() dataProofStore := store.NewPebbleDataProofStore(db, logger) @@ -57,137 +55,213 @@ var allCmd = &cobra.Command{ panic(err) } + pubSub := p2p.NewBlossomSub(NodeConfig.P2P, logger) + logger.Info("connecting to network") + time.Sleep(5 * time.Second) + increment, _, _, err := dataProofStore.GetLatestDataTimeProof( []byte(peerId), ) + if err != nil { + if errors.Is(err, store.ErrNotFound) { + logger.Info("could not find pre-2.0 proofs") + return + } - addr, err := poseidon.HashBytes([]byte(peerId)) + panic(err) + } + + addrBI, err := poseidon.HashBytes([]byte(peerId)) if err != nil { panic(err) } + addr := addrBI.FillBytes(make([]byte, 32)) + + genesis := config.GetGenesis() + bpub, err := crypto.UnmarshalEd448PublicKey(genesis.Beacon) if err != nil { panic(err) } - resp, err := client.GetPreCoinProofsByAccount( - context.Background(), - &protobufs.GetPreCoinProofsByAccountRequest{ - Address: addr.FillBytes(make([]byte, 32)), - }, - ) + bpeerId, err := peer.IDFromPublicKey(bpub) if err != nil { - panic(err) + panic(errors.Wrap(err, "error getting peer id")) } resume := make([]byte, 32) - for _, pr := range resp.Proofs { - if pr.IndexProof != nil { - resume, err = token.GetAddressOfPreCoinProof(pr) - if err != nil { - panic(err) - } - increment = pr.Difficulty - 1 - } - } - - if increment == 0 && !bytes.Equal(resume, make([]byte, 32)) { - fmt.Println("already completed pre-midnight mint") - return - } - - proofs := [][]byte{ - []byte("pre-dusk"), - resume, - } - - batchCount := 0 - for i := int(increment); i >= 0; i-- { - _, parallelism, input, output, err := dataProofStore.GetDataTimeProof( - []byte(peerId), - uint32(i), + cc, err := pubSub.GetDirectChannel([]byte(bpeerId), "worker") + if err != nil { + logger.Info( + "could not establish direct channel, waiting...", + zap.Error(err), ) - if err == nil { - p := []byte{} - p = binary.BigEndian.AppendUint32(p, uint32(i)) - p = binary.BigEndian.AppendUint32(p, parallelism) - p = binary.BigEndian.AppendUint64(p, uint64(len(input))) - p = append(p, input...) - p = binary.BigEndian.AppendUint64(p, uint64(len(output))) - p = append(p, output...) - - proofs = append(proofs, p) - } else { - fmt.Println("could not find data time proof for peer and increment, stopping at increment", i) - panic(err) + time.Sleep(10 * time.Second) + } + for { + if cc == nil { + cc, err = pubSub.GetDirectChannel([]byte(bpeerId), "worker") + if err != nil { + logger.Info( + "could not establish direct channel, waiting...", + zap.Error(err), + ) + cc = nil + time.Sleep(10 * time.Second) + continue + } } - batchCount++ - if batchCount == 200 || i == 0 { - fmt.Println("publishing proof batch, increment", i) - payload := []byte("mint") - for _, i := range proofs { - payload = append(payload, i...) - } - sig, err := privKey.Sign(payload) - if err != nil { - panic(err) + client := protobufs.NewDataServiceClient(cc) + + if bytes.Equal(resume, make([]byte, 32)) { + status, err := client.GetPreMidnightMintStatus( + context.Background(), + &protobufs.PreMidnightMintStatusRequest{ + Owner: addr, + }, + grpc.MaxCallSendMsgSize(1*1024*1024), + grpc.MaxCallRecvMsgSize(1*1024*1024), + ) + if err != nil || status == nil { + logger.Error( + "got error response, waiting...", + zap.Error(err), + ) + time.Sleep(10 * time.Second) + cc.Close() + cc = nil + err = pubSub.Reconnect([]byte(peerId)) + if err != nil { + logger.Error( + "got error response, waiting...", + zap.Error(err), + ) + time.Sleep(10 * time.Second) + } + continue } - _, err = client.SendMessage( - context.Background(), - &protobufs.TokenRequest{ - Request: &protobufs.TokenRequest_Mint{ - Mint: &protobufs.MintCoinRequest{ - Proofs: proofs, - Signature: &protobufs.Ed448Signature{ - PublicKey: &protobufs.Ed448PublicKey{ - KeyValue: pub, - }, - Signature: sig, + resume = status.Address + + if status.Increment != 0 { + increment = status.Increment - 1 + } else if !bytes.Equal(status.Address, make([]byte, 32)) { + increment = 0 + } + } + + proofs := [][]byte{ + []byte("pre-dusk"), + resume, + } + + batchCount := 0 + // the cast is important, it underflows without: + for i := int(increment); i >= 0; i-- { + _, parallelism, input, output, err := dataProofStore.GetDataTimeProof( + []byte(peerId), + uint32(i), + ) + if err == nil { + p := []byte{} + p = binary.BigEndian.AppendUint32(p, uint32(i)) + p = binary.BigEndian.AppendUint32(p, parallelism) + p = binary.BigEndian.AppendUint64(p, uint64(len(input))) + p = append(p, input...) + p = binary.BigEndian.AppendUint64(p, uint64(len(output))) + p = append(p, output...) + + proofs = append(proofs, p) + } else { + logger.Error( + "could not find data time proof for peer and increment, stopping worker", + zap.String("peer_id", peerId.String()), + zap.Int("increment", i), + ) + cc.Close() + cc = nil + return + } + + batchCount++ + if batchCount == 200 || i == 0 { + logger.Info("publishing proof batch", zap.Int("increment", i)) + + payload := []byte("mint") + for _, i := range proofs { + payload = append(payload, i...) + } + sig, err := pubSub.SignMessage(payload) + if err != nil { + cc.Close() + panic(err) + } + + resp, err := client.HandlePreMidnightMint( + context.Background(), + &protobufs.MintCoinRequest{ + Proofs: proofs, + Signature: &protobufs.Ed448Signature{ + PublicKey: &protobufs.Ed448PublicKey{ + KeyValue: pub, }, + Signature: sig, }, }, - }, - ) - if err != nil { - panic(err) - } - - waitForConf: - for { - gotime.Sleep(20 * gotime.Second) - resp, err := client.GetPreCoinProofsByAccount( - context.Background(), - &protobufs.GetPreCoinProofsByAccountRequest{ - Address: addr.FillBytes(make([]byte, 32)), - }, + grpc.MaxCallSendMsgSize(1*1024*1024), + grpc.MaxCallRecvMsgSize(1*1024*1024), ) - if err != nil { - for _, pr := range resp.Proofs { - if pr.IndexProof != nil { - newResume, err := token.GetAddressOfPreCoinProof(pr) - if err != nil { - panic(err) - } - if bytes.Equal(newResume, resume) { - fmt.Println("waiting for confirmation...") - continue waitForConf - } - } - } - } - break - } - batchCount = 0 - proofs = [][]byte{ - []byte("pre-dusk"), - resume, - } - if i == 0 { - fmt.Println("all proofs submitted, returning") - return + if err != nil { + if strings.Contains( + err.Error(), + application.ErrInvalidStateTransition.Error(), + ) && i == 0 { + resume = make([]byte, 32) + logger.Info("pre-midnight proofs submitted, returning") + cc.Close() + cc = nil + return + } + + logger.Error( + "got error response, waiting...", + zap.Error(err), + ) + + resume = make([]byte, 32) + cc.Close() + cc = nil + time.Sleep(10 * time.Second) + err = pubSub.Reconnect([]byte(peerId)) + if err != nil { + logger.Error( + "got error response, waiting...", + zap.Error(err), + ) + time.Sleep(10 * time.Second) + } + break + } + + resume = resp.Address + batchCount = 0 + proofs = [][]byte{ + []byte("pre-dusk"), + resume, + } + + if i == 0 { + logger.Info("pre-midnight proofs submitted, returning") + cc.Close() + cc = nil + return + } else { + increment = uint32(i) - 1 + } + + break } } } diff --git a/client/cmd/balance.go b/client/cmd/balance.go index 2eb0975..ab1fc57 100644 --- a/client/cmd/balance.go +++ b/client/cmd/balance.go @@ -22,6 +22,16 @@ var balanceCmd = &cobra.Command{ client := protobufs.NewNodeServiceClient(conn) peerId := GetPeerIDFromConfig(NodeConfig) + privKey, err := GetPrivKeyFromConfig(NodeConfig) + if err != nil { + panic(err) + } + + pub, err := privKey.GetPublic().Raw() + if err != nil { + panic(err) + } + addr, err := poseidon.HashBytes([]byte(peerId)) if err != nil { panic(err) @@ -38,16 +48,42 @@ var balanceCmd = &cobra.Command{ panic(err) } - if info.OwnedTokens == nil { - panic("invalid response from RPC") - } tokens := new(big.Int).SetBytes(info.OwnedTokens) conversionFactor, _ := new(big.Int).SetString("1DCD65000", 16) r := new(big.Rat).SetFrac(tokens, conversionFactor) + + altAddr, err := poseidon.HashBytes([]byte(pub)) + if err != nil { + panic(err) + } + + altAddrBytes := altAddr.FillBytes(make([]byte, 32)) + info, err = client.GetTokenInfo( + context.Background(), + &protobufs.GetTokenInfoRequest{ + Address: altAddrBytes, + }, + ) + if err != nil { + panic(err) + } + + if info.OwnedTokens == nil { + panic("invalid response from RPC") + } + + tokens = new(big.Int).SetBytes(info.OwnedTokens) + r2 := new(big.Rat).SetFrac(tokens, conversionFactor) fmt.Println("Total balance:", r.FloatString(12), fmt.Sprintf( "QUIL (Account 0x%x)", addrBytes, )) + if r2.Cmp(big.NewRat(0, 1)) != 0 { + fmt.Println("Total balance:", r2.FloatString(12), fmt.Sprintf( + "QUIL (Account 0x%x)", + altAddrBytes, + )) + } }, } diff --git a/client/cmd/coins.go b/client/cmd/coins.go index 019a6b2..5e85a49 100644 --- a/client/cmd/coins.go +++ b/client/cmd/coins.go @@ -22,6 +22,16 @@ var coinsCmd = &cobra.Command{ client := protobufs.NewNodeServiceClient(conn) peerId := GetPeerIDFromConfig(NodeConfig) + privKey, err := GetPrivKeyFromConfig(NodeConfig) + if err != nil { + panic(err) + } + + pub, err := privKey.GetPublic().Raw() + if err != nil { + panic(err) + } + addr, err := poseidon.HashBytes([]byte(peerId)) if err != nil { panic(err) @@ -42,6 +52,26 @@ var coinsCmd = &cobra.Command{ panic("invalid response from RPC") } + altAddr, err := poseidon.HashBytes([]byte(pub)) + if err != nil { + panic(err) + } + + altAddrBytes := altAddr.FillBytes(make([]byte, 32)) + resp2, err := client.GetTokensByAccount( + context.Background(), + &protobufs.GetTokensByAccountRequest{ + Address: altAddrBytes, + }, + ) + if err != nil { + panic(err) + } + + if len(resp.Coins) != len(resp.FrameNumbers) { + panic("invalid response from RPC") + } + for i, coin := range resp.Coins { amount := new(big.Int).SetBytes(coin.Amount) conversionFactor, _ := new(big.Int).SetString("1DCD65000", 16) @@ -51,6 +81,15 @@ var coinsCmd = &cobra.Command{ fmt.Sprintf("QUIL (Coin 0x%x)", resp.Addresses[i]), ) } + for i, coin := range resp2.Coins { + amount := new(big.Int).SetBytes(coin.Amount) + conversionFactor, _ := new(big.Int).SetString("1DCD65000", 16) + r := new(big.Rat).SetFrac(amount, conversionFactor) + fmt.Println( + r.FloatString(12), + fmt.Sprintf("QUIL (Coin 0x%x)", resp.Addresses[i]), + ) + } }, } diff --git a/client/cmd/root.go b/client/cmd/root.go index 5b8b5e5..b8db6a2 100644 --- a/client/cmd/root.go +++ b/client/cmd/root.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "fmt" "os" + "strconv" "strings" "github.com/cloudflare/circl/sign/ed448" @@ -150,6 +151,20 @@ func GetGRPCClient() (*grpc.ClientConn, error) { ) } +func signatureCheckDefault() bool { + envVarValue, envVarExists := os.LookupEnv("QUILIBRIUM_SIGNATURE_CHECK") + if envVarExists { + def, err := strconv.ParseBool(envVarValue) + if err == nil { + return def + } else { + fmt.Println("Invalid environment variable QUILIBRIUM_SIGNATURE_CHECK, must be 'true' or 'false'. Got: " + envVarValue) + } + } + + return true +} + func init() { rootCmd.PersistentFlags().StringVar( &configDirectory, @@ -166,7 +181,7 @@ func init() { rootCmd.PersistentFlags().BoolVar( &signatureCheck, "signature-check", - true, - "bypass signature check (not recommended for binaries)", + signatureCheckDefault(), + "bypass signature check (not recommended for binaries) (default true or value of QUILIBRIUM_SIGNATURE_CHECK env var)", ) } diff --git a/go-libp2p/p2p/protocol/identify/obsaddr.go b/go-libp2p/p2p/protocol/identify/obsaddr.go index acc8b17..6bfbc15 100644 --- a/go-libp2p/p2p/protocol/identify/obsaddr.go +++ b/go-libp2p/p2p/protocol/identify/obsaddr.go @@ -19,7 +19,7 @@ import ( // can be contacted on. The "seen" events expire by default after 40 minutes // (OwnObservedAddressTTL * ActivationThreshold). The are cleaned up during // the GC rounds set by GCInterval. -var ActivationThresh = 4 +var ActivationThresh = 1 // observedAddrManagerWorkerChannelSize defines how many addresses can be enqueued // for adding to an ObservedAddrManager. diff --git a/node/config/version.go b/node/config/version.go index f7b78a0..7b93698 100644 --- a/node/config/version.go +++ b/node/config/version.go @@ -36,7 +36,7 @@ func FormatVersion(version []byte) string { } func GetPatchNumber() byte { - return 0x01 + return 0x03 } func GetRCNumber() byte { diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index 316b39b..4f69024 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -349,7 +349,7 @@ func (e *DataClockConsensusEngine) Start() <-chan error { panic(err) } - if frame.FrameNumber >= nextFrame.FrameNumber || + if frame.FrameNumber-100 >= nextFrame.FrameNumber || nextFrame.FrameNumber == 0 { time.Sleep(60 * time.Second) continue diff --git a/node/consensus/data/frame_importer.go b/node/consensus/data/frame_importer.go index 757312b..7f7b1bb 100644 --- a/node/consensus/data/frame_importer.go +++ b/node/consensus/data/frame_importer.go @@ -231,15 +231,6 @@ func (e *DataClockConsensusEngine) applySnapshot( key = binary.BigEndian.AppendUint64(key, 0) key = append(key, e.filter...) - iter, err := temporaryStore.NewIter( - nil, - nil, - ) - - for iter.First(); iter.Valid(); iter.Next() { - fmt.Printf("%x\n", iter.Key()) - } - _, _, err = temporaryClockStore.GetDataClockFrame( e.filter, max.FrameNumber+1, diff --git a/node/consensus/data/pre_midnight_proof_worker.go b/node/consensus/data/pre_midnight_proof_worker.go index 960707e..026b943 100644 --- a/node/consensus/data/pre_midnight_proof_worker.go +++ b/node/consensus/data/pre_midnight_proof_worker.go @@ -141,6 +141,14 @@ func (e *DataClockConsensusEngine) runPreMidnightProofWorker() { time.Sleep(10 * time.Second) cc.Close() cc = nil + err = e.pubSub.Reconnect([]byte(peerId)) + if err != nil { + e.logger.Error( + "got error response, waiting...", + zap.Error(err), + ) + time.Sleep(10 * time.Second) + } continue } @@ -211,13 +219,11 @@ func (e *DataClockConsensusEngine) runPreMidnightProofWorker() { Signature: sig, }, }, + grpc.MaxCallSendMsgSize(1*1024*1024), + grpc.MaxCallRecvMsgSize(1*1024*1024), ) if err != nil { - e.logger.Error( - "got error response, waiting...", - zap.Error(err), - ) if strings.Contains( err.Error(), application.ErrInvalidStateTransition.Error(), @@ -229,10 +235,23 @@ func (e *DataClockConsensusEngine) runPreMidnightProofWorker() { return } + e.logger.Error( + "got error response, waiting...", + zap.Error(err), + ) + resume = make([]byte, 32) cc.Close() cc = nil time.Sleep(10 * time.Second) + err = e.pubSub.Reconnect([]byte(peerId)) + if err != nil { + e.logger.Error( + "got error response, waiting...", + zap.Error(err), + ) + time.Sleep(10 * time.Second) + } break } diff --git a/node/consensus/data/token_handle_mint_test.go b/node/consensus/data/token_handle_mint_test.go index fb17e8c..3302ad4 100644 --- a/node/consensus/data/token_handle_mint_test.go +++ b/node/consensus/data/token_handle_mint_test.go @@ -72,6 +72,7 @@ func (p pubsub) GetPublicKey() []byte { return p.pubkey } func (pubsub) GetPeerScore(peerId []byte) int64 { return 0 } func (pubsub) SetPeerScore(peerId []byte, score int64) {} func (pubsub) AddPeerScore(peerId []byte, scoreDelta int64) {} +func (pubsub) Reconnect(peerId []byte) error { return nil } type outputs struct { difficulty uint32 diff --git a/node/main.go b/node/main.go index b28f2b2..fa83415 100644 --- a/node/main.go +++ b/node/main.go @@ -11,6 +11,8 @@ import ( "io/fs" "log" "math/big" + "net/http" + npprof "net/http/pprof" "os" "os/exec" "os/signal" @@ -77,6 +79,11 @@ var ( "", "write memory profile after 20m to this file", ) + pprofServer = flag.String( + "pprof-server", + "", + "enable pprof server on specified address (e.g. localhost:6060)", + ) nodeInfo = flag.Bool( "node-info", false, @@ -225,10 +232,23 @@ func main() { if err != nil { log.Fatal(err) } + defer f.Close() pprof.StartCPUProfile(f) defer pprof.StopCPUProfile() } + if *pprofServer != "" && *core == 0 { + go func() { + mux := http.NewServeMux() + mux.HandleFunc("/debug/pprof/", npprof.Index) + mux.HandleFunc("/debug/pprof/cmdline", npprof.Cmdline) + mux.HandleFunc("/debug/pprof/profile", npprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", npprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", npprof.Trace) + log.Fatal(http.ListenAndServe(*pprofServer, mux)) + }() + } + if *balance { config, err := config.LoadConfig(*configDirectory, "", false) if err != nil { @@ -429,7 +449,7 @@ func main() { return } - runtime.GOMAXPROCS(1) + // runtime.GOMAXPROCS(1) if nodeConfig.ListenGRPCMultiaddr != "" { srv, err := rpc.NewRPCServer( diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index 2bdcb1f..71d520b 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -31,6 +31,7 @@ import ( rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" "github.com/libp2p/go-libp2p/p2p/net/connmgr" "github.com/libp2p/go-libp2p/p2p/net/swarm" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/mr-tron/base58" ma "github.com/multiformats/go-multiaddr" madns "github.com/multiformats/go-multiaddr-dns" @@ -92,6 +93,80 @@ func getPeerID(p2pConfig *config.P2PConfig) peer.ID { return id } +func NewBlossomSubStreamer( + p2pConfig *config.P2PConfig, + logger *zap.Logger, +) *BlossomSub { + ctx := context.Background() + + opts := []libp2pconfig.Option{ + libp2p.ListenAddrStrings(p2pConfig.ListenMultiaddr), + } + + bootstrappers := []peer.AddrInfo{} + + peerinfo, err := peer.AddrInfoFromString("/ip4/185.209.178.191/udp/8336/quic-v1/p2p/QmcKQjpQmLpbDsiif2MuakhHFyxWvqYauPsJDaXnLav7PJ") + if err != nil { + panic(err) + } + + bootstrappers = append(bootstrappers, *peerinfo) + + var privKey crypto.PrivKey + if p2pConfig.PeerPrivKey != "" { + peerPrivKey, err := hex.DecodeString(p2pConfig.PeerPrivKey) + if err != nil { + panic(errors.Wrap(err, "error unmarshaling peerkey")) + } + + privKey, err = crypto.UnmarshalEd448PrivateKey(peerPrivKey) + if err != nil { + panic(errors.Wrap(err, "error unmarshaling peerkey")) + } + + opts = append(opts, libp2p.Identity(privKey)) + } + + bs := &BlossomSub{ + ctx: ctx, + logger: logger, + bitmaskMap: make(map[string]*blossomsub.Bitmask), + signKey: privKey, + peerScore: make(map[string]int64), + isBootstrapPeer: false, + network: p2pConfig.Network, + } + + h, err := libp2p.New(opts...) + if err != nil { + panic(errors.Wrap(err, "error constructing p2p")) + } + + logger.Info("established peer id", zap.String("peer_id", h.ID().String())) + + kademliaDHT := initDHT( + ctx, + p2pConfig, + logger, + h, + false, + bootstrappers, + ) + routingDiscovery := routing.NewRoutingDiscovery(kademliaDHT) + util.Advertise(ctx, routingDiscovery, getNetworkNamespace(p2pConfig.Network)) + + if err != nil { + panic(err) + } + + peerID := h.ID() + bs.peerID = peerID + bs.h = h + bs.signKey = privKey + + return bs +} + func NewBlossomSub( p2pConfig *config.P2PConfig, logger *zap.Logger, @@ -163,6 +238,23 @@ func NewBlossomSub( opts = append(opts, libp2p.Identity(privKey)) } + allowedPeers := []peer.AddrInfo{} + allowedPeers = append(allowedPeers, bootstrappers...) + + directPeers := []peer.AddrInfo{} + if len(p2pConfig.DirectPeers) > 0 { + logger.Info("found direct peers in config") + for _, peerAddr := range p2pConfig.DirectPeers { + peerinfo, err := peer.AddrInfoFromString(peerAddr) + if err != nil { + panic(err) + } + logger.Info("adding direct peer", zap.String("peer", peerinfo.ID.String())) + directPeers = append(directPeers, *peerinfo) + } + } + allowedPeers = append(allowedPeers, directPeers...) + if p2pConfig.LowWatermarkConnections != 0 && p2pConfig.HighWatermarkConnections != 0 { cm, err := connmgr.NewConnManager( @@ -176,7 +268,7 @@ func NewBlossomSub( rm, err := resourceManager( p2pConfig.HighWatermarkConnections, - bootstrappers, + allowedPeers, ) if err != nil { panic(err) @@ -223,7 +315,9 @@ func NewBlossomSub( verifyReachability(p2pConfig) - discoverPeers(p2pConfig, ctx, logger, h, routingDiscovery) + discoverPeers(p2pConfig, ctx, logger, h, routingDiscovery, true) + + go monitorPeers(ctx, logger, h) // TODO: turn into an option flag for console logging, this is too noisy for // default logging behavior @@ -244,21 +338,8 @@ func NewBlossomSub( blossomsub.WithStrictSignatureVerification(true), } - if len(p2pConfig.DirectPeers) > 0 { - logger.Info("Found direct peers in config") - directPeers := []peer.AddrInfo{} - for _, peerAddr := range p2pConfig.DirectPeers { - peerinfo, err := peer.AddrInfoFromString(peerAddr) - if err != nil { - panic(err) - } - logger.Info("Adding direct peer", zap.String("peer", peerinfo.ID.String())) - directPeers = append(directPeers, *peerinfo) - } - - if len(directPeers) > 0 { - blossomOpts = append(blossomOpts, blossomsub.WithDirectPeers(directPeers)) - } + if len(directPeers) > 0 { + blossomOpts = append(blossomOpts, blossomsub.WithDirectPeers(directPeers)) } if tracer != nil { @@ -303,12 +384,24 @@ func NewBlossomSub( bs.h = h bs.signKey = privKey + allowedPeerIDs := make(map[peer.ID]struct{}, len(allowedPeers)) + for _, peerInfo := range allowedPeers { + allowedPeerIDs[peerInfo.ID] = struct{}{} + } go func() { for { time.Sleep(30 * time.Second) for _, b := range bs.bitmaskMap { - if len(b.ListPeers()) < 4 { - discoverPeers(p2pConfig, bs.ctx, logger, bs.h, routingDiscovery) + bitmaskPeers := b.ListPeers() + peerCount := len(bitmaskPeers) + for _, p := range bitmaskPeers { + if _, ok := allowedPeerIDs[p]; ok { + peerCount-- + } + } + if peerCount < 4 { + discoverPeers(p2pConfig, bs.ctx, logger, bs.h, routingDiscovery, false) + break } } } @@ -319,7 +412,7 @@ func NewBlossomSub( // adjusted from Lotus' reference implementation, addressing // https://github.com/libp2p/go-libp2p/issues/1640 -func resourceManager(highWatermark uint, bootstrappers []peer.AddrInfo) ( +func resourceManager(highWatermark uint, allowed []peer.AddrInfo) ( network.ResourceManager, error, ) { @@ -386,18 +479,18 @@ func resourceManager(highWatermark uint, bootstrappers []peer.AddrInfo) ( ) resolver := madns.DefaultResolver - var bootstrapperMaddrs []ma.Multiaddr - for _, pi := range bootstrappers { + var allowedMaddrs []ma.Multiaddr + for _, pi := range allowed { for _, addr := range pi.Addrs { resolved, err := resolver.Resolve(context.Background(), addr) if err != nil { continue } - bootstrapperMaddrs = append(bootstrapperMaddrs, resolved...) + allowedMaddrs = append(allowedMaddrs, resolved...) } } - opts = append(opts, rcmgr.WithAllowlistedMultiaddrs(bootstrapperMaddrs)) + opts = append(opts, rcmgr.WithAllowlistedMultiaddrs(allowedMaddrs)) mgr, err := rcmgr.NewResourceManager(limiter, opts...) if err != nil { @@ -505,6 +598,79 @@ func (b *BlossomSub) GetRandomPeer(bitmask []byte) ([]byte, error) { return []byte(peers[sel.Int64()]), nil } +// monitorPeers periodically looks up the peers connected to the host and pings them +// up to 3 times to ensure they are still reachable. If the peer is not reachable after +// 3 attempts, the connections to the peer are closed. +func monitorPeers(ctx context.Context, logger *zap.Logger, h host.Host) { + const timeout, period, attempts = time.Minute, time.Minute, 3 + // Do not allow the pings to dial new connections. Adding new peers is a separate + // process and should not be done during the ping process. + ctx = network.WithNoDial(ctx, "monitor peers") + pingOnce := func(ctx context.Context, logger *zap.Logger, id peer.ID) bool { + pingCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + select { + case <-ctx.Done(): + case <-pingCtx.Done(): + logger.Debug("ping timeout") + return false + case res := <-ping.Ping(pingCtx, h, id): + if res.Error != nil { + logger.Debug("ping error", zap.Error(res.Error)) + return false + } + logger.Debug("ping success", zap.Duration("rtt", res.RTT)) + } + return true + } + ping := func(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, id peer.ID) { + defer wg.Done() + var conns []network.Conn + for i := 0; i < attempts; i++ { + // There are no fine grained semantics in libp2p that would allow us to 'ping via + // a specific connection'. We can only ping a peer, which will attempt to open a stream via a connection. + // As such, we save a snapshot of the connections that were potentially in use before + // the ping, and close them if the ping fails. If new connections occur between the snapshot + // and the ping, they will not be closed, and will be pinged in the next iteration. + conns = h.Network().ConnsToPeer(id) + if pingOnce(ctx, logger, id) { + return + } + } + for _, conn := range conns { + _ = conn.Close() + } + } + for { + select { + case <-ctx.Done(): + return + case <-time.After(period): + // This is once again a snapshot of the peers at the time of the ping. If new peers + // are added between the snapshot and the ping, they will be pinged in the next iteration. + peers := h.Network().Peers() + connected := make([]peer.ID, 0, len(peers)) + for _, p := range peers { + // The connection status may change both before and after the check. Still, it is better + // to focus on pinging only connections which are potentially connected at the moment of the check. + switch h.Network().Connectedness(p) { + case network.Connected, network.Limited: + connected = append(connected, p) + } + } + logger.Debug("pinging connected peers", zap.Int("peer_count", len(connected))) + wg := &sync.WaitGroup{} + for _, id := range connected { + logger := logger.With(zap.String("peer_id", id.String())) + wg.Add(1) + go ping(ctx, logger, wg, id) + } + wg.Wait() + logger.Debug("pinged connected peers") + } + } +} + func initDHT( ctx context.Context, p2pConfig *config.P2PConfig, @@ -517,19 +683,12 @@ func initDHT( var kademliaDHT *dht.IpfsDHT var err error if isBootstrapPeer { - if p2pConfig.Network == 0 { - panic( - "this release is for normal peers only, if you are running a " + - "bootstrap node, please use v2.0-bootstrap", - ) - } else { - kademliaDHT, err = dht.New( - ctx, - h, - dht.Mode(dht.ModeServer), - dht.BootstrapPeers(bootstrappers...), - ) - } + kademliaDHT, err = dht.New( + ctx, + h, + dht.Mode(dht.ModeServer), + dht.BootstrapPeers(bootstrappers...), + ) } else { kademliaDHT, err = dht.New( ctx, @@ -546,9 +705,13 @@ func initDHT( } reconnect := func() { + wg := &sync.WaitGroup{} + defer wg.Wait() for _, peerinfo := range bootstrappers { peerinfo := peerinfo + wg.Add(1) go func() { + defer wg.Done() if peerinfo.ID == h.ID() || h.Network().Connectedness(peerinfo.ID) == network.Connected || h.Network().Connectedness(peerinfo.ID) == network.Limited { @@ -570,10 +733,21 @@ func initDHT( reconnect() + bootstrapPeerIDs := make(map[peer.ID]struct{}, len(bootstrappers)) + for _, peerinfo := range bootstrappers { + bootstrapPeerIDs[peerinfo.ID] = struct{}{} + } go func() { for { time.Sleep(30 * time.Second) - if len(h.Network().Peers()) == 0 { + found := false + for _, p := range h.Network().Peers() { + if _, ok := bootstrapPeerIDs[p]; ok { + found = true + break + } + } + if !found { reconnect() } } @@ -582,6 +756,19 @@ func initDHT( return kademliaDHT } +func (b *BlossomSub) Reconnect(peerId []byte) error { + peer := peer.ID(peerId) + info := b.h.Peerstore().PeerInfo(peer) + b.h.ConnManager().Unprotect(info.ID, "bootstrap") + time.Sleep(10 * time.Second) + if err := b.h.Connect(b.ctx, info); err != nil { + return errors.Wrap(err, "reconnect") + } + + b.h.ConnManager().Protect(info.ID, "bootstrap") + return nil +} + func (b *BlossomSub) GetPeerScore(peerId []byte) int64 { b.peerScoreMx.Lock() score := b.peerScore[string(peerId)] @@ -841,10 +1028,12 @@ func discoverPeers( logger *zap.Logger, h host.Host, routingDiscovery *routing.RoutingDiscovery, + init bool, ) { - logger.Info("initiating peer discovery") - discover := func() { + logger.Info("initiating peer discovery") + defer logger.Info("completed peer discovery") + peerChan, err := routingDiscovery.FindPeers( ctx, getNetworkNamespace(p2pConfig.Network), @@ -854,39 +1043,46 @@ func discoverPeers( return } - count := 12 + wg := &sync.WaitGroup{} + defer wg.Wait() for peer := range peerChan { - if count == 0 { + if len(h.Network().Peers()) >= 6 { break } - peer := peer - if peer.ID == h.ID() || - h.Network().Connectedness(peer.ID) == network.Connected || - h.Network().Connectedness(peer.ID) == network.Limited { - continue - } - logger.Debug("found peer", zap.String("peer_id", peer.ID.String())) - err := h.Connect(ctx, peer) - if err != nil { - logger.Debug( - "error while connecting to blossomsub peer", - zap.String("peer_id", peer.ID.String()), - zap.Error(err), - ) - } else { - count-- - logger.Debug( - "connected to peer", - zap.String("peer_id", peer.ID.String()), - ) - } + peer := peer + wg.Add(1) + go func() { + defer wg.Done() + if peer.ID == h.ID() || + h.Network().Connectedness(peer.ID) == network.Connected || + h.Network().Connectedness(peer.ID) == network.Limited { + return + } + + logger.Debug("found peer", zap.String("peer_id", peer.ID.String())) + err := h.Connect(ctx, peer) + if err != nil { + logger.Debug( + "error while connecting to blossomsub peer", + zap.String("peer_id", peer.ID.String()), + zap.Error(err), + ) + } else { + logger.Debug( + "connected to peer", + zap.String("peer_id", peer.ID.String()), + ) + } + }() } } - discover() - - logger.Info("completed peer discovery") + if init { + go discover() + } else { + discover() + } } func mergeDefaults(p2pConfig *config.P2PConfig) blossomsub.BlossomSubParams { diff --git a/node/p2p/pubsub.go b/node/p2p/pubsub.go index 620f6fd..4f5104c 100644 --- a/node/p2p/pubsub.go +++ b/node/p2p/pubsub.go @@ -36,4 +36,5 @@ type PubSub interface { GetPeerScore(peerId []byte) int64 SetPeerScore(peerId []byte, score int64) AddPeerScore(peerId []byte, scoreDelta int64) + Reconnect(peerId []byte) error }