From 89c85422bee94653ff4cff1f505cebbd2a27f8cd Mon Sep 17 00:00:00 2001 From: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> Date: Sun, 8 Dec 2024 04:12:26 +0100 Subject: [PATCH 1/6] Stop gRPC and HTTP servers on shutdown (#408) * Stop gRPC and HTTP servers on shutdown * Wait for executor to register --- node/app/node.go | 11 +- .../data/data_clock_consensus_engine.go | 53 +++++---- node/main.go | 19 ++-- node/rpc/node_rpc_server.go | 101 +++++++++++------- 4 files changed, 114 insertions(+), 70 deletions(-) diff --git a/node/app/node.go b/node/app/node.go index 6254cc8..e0ed4ae 100644 --- a/node/app/node.go +++ b/node/app/node.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "errors" "fmt" + "sync" "go.uber.org/zap" "golang.org/x/crypto/sha3" @@ -162,9 +163,17 @@ func (n *Node) Start() { } // TODO: add config mapping to engine name/frame registration + wg := sync.WaitGroup{} for _, e := range n.execEngines { - n.engine.RegisterExecutor(e, 0) + wg.Add(1) + go func(e execution.ExecutionEngine) { + defer wg.Done() + if err := <-n.engine.RegisterExecutor(e, 0); err != nil { + panic(err) + } + }(e) } + wg.Wait() } func (n *Node) Stop() { diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index 66a8804..801714d 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -72,6 +72,8 @@ type DataClockConsensusEngine struct { cancel context.CancelFunc wg sync.WaitGroup + grpcServers []*grpc.Server + lastProven uint64 difficulty uint32 config *config.Config @@ -349,38 +351,40 @@ func (e *DataClockConsensusEngine) Start() <-chan error { e.pubSub.Subscribe(e.frameFragmentFilter, e.handleFrameFragmentMessage) e.pubSub.Subscribe(e.txFilter, e.handleTxMessage) e.pubSub.Subscribe(e.infoFilter, e.handleInfoMessage) + + syncServer := qgrpc.NewServer( + grpc.MaxSendMsgSize(40*1024*1024), + grpc.MaxRecvMsgSize(40*1024*1024), + ) + e.grpcServers = append(e.grpcServers[:0:0], syncServer) + protobufs.RegisterDataServiceServer(syncServer, e) go func() { - server := qgrpc.NewServer( - grpc.MaxSendMsgSize(40*1024*1024), - grpc.MaxRecvMsgSize(40*1024*1024), - ) - protobufs.RegisterDataServiceServer(server, e) if err := e.pubSub.StartDirectChannelListener( e.pubSub.GetPeerID(), "sync", - server, + syncServer, ); err != nil { - panic(err) + e.logger.Error("error starting sync server", zap.Error(err)) } }() - go func() { - if e.dataTimeReel.GetFrameProverTries()[0].Contains(e.provingKeyAddress) { - server := qgrpc.NewServer( - grpc.MaxSendMsgSize(1*1024*1024), - grpc.MaxRecvMsgSize(1*1024*1024), - ) - protobufs.RegisterDataServiceServer(server, e) - + if e.FrameProverTrieContains(0, e.provingKeyAddress) { + workerServer := qgrpc.NewServer( + grpc.MaxSendMsgSize(1*1024*1024), + grpc.MaxRecvMsgSize(1*1024*1024), + ) + e.grpcServers = append(e.grpcServers, workerServer) + protobufs.RegisterDataServiceServer(workerServer, e) + go func() { if err := e.pubSub.StartDirectChannelListener( e.pubSub.GetPeerID(), "worker", - server, + workerServer, ); err != nil { - panic(err) + e.logger.Error("error starting worker server", zap.Error(err)) } - } - }() + }() + } e.stateMx.Lock() e.state = consensus.EngineStateCollecting @@ -661,6 +665,16 @@ func (e *DataClockConsensusEngine) PerformTimeProof( } func (e *DataClockConsensusEngine) Stop(force bool) <-chan error { + wg := sync.WaitGroup{} + wg.Add(len(e.grpcServers)) + for _, server := range e.grpcServers { + go func(server *grpc.Server) { + defer wg.Done() + server.GracefulStop() + }(server) + } + wg.Wait() + e.logger.Info("stopping ceremony consensus engine") e.cancel() e.wg.Wait() @@ -684,7 +698,6 @@ func (e *DataClockConsensusEngine) Stop(force bool) <-chan error { e.logger.Warn("error publishing prover pause", zap.Error(err)) } - wg := sync.WaitGroup{} wg.Add(len(e.executionEngines)) executionErrors := make(chan error, len(e.executionEngines)) for name := range e.executionEngines { diff --git a/node/main.go b/node/main.go index 35154ce..32ce019 100644 --- a/node/main.go +++ b/node/main.go @@ -467,6 +467,7 @@ func main() { if !*integrityCheck { go spawnDataWorkers(nodeConfig) + defer stopDataWorkers() } kzg.Init() @@ -510,6 +511,9 @@ func main() { // runtime.GOMAXPROCS(1) + node.Start() + defer node.Stop() + if nodeConfig.ListenGRPCMultiaddr != "" { srv, err := rpc.NewRPCServer( nodeConfig.ListenGRPCMultiaddr, @@ -526,20 +530,13 @@ func main() { if err != nil { panic(err) } - - go func() { - err := srv.Start() - if err != nil { - panic(err) - } - }() + if err := srv.Start(); err != nil { + panic(err) + } + defer srv.Stop() } - node.Start() - <-done - stopDataWorkers() - node.Stop() } var dataWorkers []*exec.Cmd diff --git a/node/rpc/node_rpc_server.go b/node/rpc/node_rpc_server.go index 568b836..be1c5ba 100644 --- a/node/rpc/node_rpc_server.go +++ b/node/rpc/node_rpc_server.go @@ -6,6 +6,7 @@ import ( "math/big" "net/http" "strings" + "sync" "time" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" @@ -43,6 +44,8 @@ type RPCServer struct { pubSub p2p.PubSub masterClock *master.MasterClockConsensusEngine executionEngines []execution.ExecutionEngine + grpcServer *grpc.Server + httpServer *http.Server } // GetFrameInfo implements protobufs.NodeServiceServer. @@ -384,7 +387,33 @@ func NewRPCServer( masterClock *master.MasterClockConsensusEngine, executionEngines []execution.ExecutionEngine, ) (*RPCServer, error) { - return &RPCServer{ + mg, err := multiaddr.NewMultiaddr(listenAddrGRPC) + if err != nil { + return nil, errors.Wrap(err, "new rpc server") + } + mga, err := mn.ToNetAddr(mg) + if err != nil { + return nil, errors.Wrap(err, "new rpc server") + } + + mux := runtime.NewServeMux() + opts := qgrpc.ClientOptions( + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(600*1024*1024), + grpc.MaxCallSendMsgSize(600*1024*1024), + ), + ) + if err := protobufs.RegisterNodeServiceHandlerFromEndpoint( + context.Background(), + mux, + mga.String(), + opts, + ); err != nil { + return nil, err + } + + rpcServer := &RPCServer{ listenAddrGRPC: listenAddrGRPC, listenAddrHTTP: listenAddrHTTP, logger: logger, @@ -395,17 +424,22 @@ func NewRPCServer( pubSub: pubSub, masterClock: masterClock, executionEngines: executionEngines, - }, nil + grpcServer: qgrpc.NewServer( + grpc.MaxRecvMsgSize(600*1024*1024), + grpc.MaxSendMsgSize(600*1024*1024), + ), + httpServer: &http.Server{ + Handler: mux, + }, + } + + protobufs.RegisterNodeServiceServer(rpcServer.grpcServer, rpcServer) + reflection.Register(rpcServer.grpcServer) + + return rpcServer, nil } func (r *RPCServer) Start() error { - s := qgrpc.NewServer( - grpc.MaxRecvMsgSize(600*1024*1024), - grpc.MaxSendMsgSize(600*1024*1024), - ) - protobufs.RegisterNodeServiceServer(s, r) - reflection.Register(s) - mg, err := multiaddr.NewMultiaddr(r.listenAddrGRPC) if err != nil { return errors.Wrap(err, "start") @@ -417,51 +451,42 @@ func (r *RPCServer) Start() error { } go func() { - if err := s.Serve(mn.NetListener(lis)); err != nil { - panic(err) + if err := r.grpcServer.Serve(mn.NetListener(lis)); err != nil { + r.logger.Error("serve error", zap.Error(err)) } }() if r.listenAddrHTTP != "" { - m, err := multiaddr.NewMultiaddr(r.listenAddrHTTP) + mh, err := multiaddr.NewMultiaddr(r.listenAddrHTTP) if err != nil { return errors.Wrap(err, "start") } - ma, err := mn.ToNetAddr(m) - if err != nil { - return errors.Wrap(err, "start") - } - - mga, err := mn.ToNetAddr(mg) + lis, err := mn.Listen(mh) if err != nil { return errors.Wrap(err, "start") } go func() { - mux := runtime.NewServeMux() - opts := qgrpc.ClientOptions( - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithDefaultCallOptions( - grpc.MaxCallRecvMsgSize(600*1024*1024), - grpc.MaxCallSendMsgSize(600*1024*1024), - ), - ) - - if err := protobufs.RegisterNodeServiceHandlerFromEndpoint( - context.Background(), - mux, - mga.String(), - opts, - ); err != nil { - panic(err) - } - - if err := http.ListenAndServe(ma.String(), mux); err != nil { - panic(err) + if err := r.httpServer.Serve(mn.NetListener(lis)); err != nil && !errors.Is(err, http.ErrServerClosed) { + r.logger.Error("serve error", zap.Error(err)) } }() } return nil } + +func (r *RPCServer) Stop() { + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + r.grpcServer.GracefulStop() + }() + go func() { + defer wg.Done() + r.httpServer.Shutdown(context.Background()) + }() + wg.Wait() +} From b9f1f0a6af541795350881aee752d36e477d424b Mon Sep 17 00:00:00 2001 From: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> Date: Sun, 8 Dec 2024 04:19:04 +0100 Subject: [PATCH 2/6] Frame pruning fixes (#405) * Consistently close iterators * Prune frames in batches * Add compact on start flag --- node/consensus/data/frame_pruner.go | 13 -- node/consensus/data/main_data_loop.go | 33 ++++- node/consensus/master/peer_messaging.go | 5 +- node/main.go | 16 +++ node/rpc/node_rpc_server.go | 12 +- node/store/clock.go | 163 ++++++++++++++++-------- node/store/coin.go | 11 +- node/store/data_proof.go | 44 ++----- node/store/inmem.go | 5 +- node/store/key.go | 22 +--- node/store/peerstore.go | 4 +- 11 files changed, 177 insertions(+), 151 deletions(-) delete mode 100644 node/consensus/data/frame_pruner.go diff --git a/node/consensus/data/frame_pruner.go b/node/consensus/data/frame_pruner.go deleted file mode 100644 index dc68f58..0000000 --- a/node/consensus/data/frame_pruner.go +++ /dev/null @@ -1,13 +0,0 @@ -package data - -import "go.uber.org/zap" - -func (e *DataClockConsensusEngine) pruneFrames(maxFrame uint64) error { - e.logger.Info("pruning frames", zap.Uint64("max_frame_to_prune", maxFrame)) - err := e.clockStore.DeleteDataClockFrameRange(e.filter, 1, maxFrame) - if err != nil { - e.logger.Error("failed to prune frames", zap.Error(err)) - return err - } - return nil -} diff --git a/node/consensus/data/main_data_loop.go b/node/consensus/data/main_data_loop.go index 7167b2a..d82f2b8 100644 --- a/node/consensus/data/main_data_loop.go +++ b/node/consensus/data/main_data_loop.go @@ -103,6 +103,10 @@ func (e *DataClockConsensusEngine) runFramePruning() { e.logger.Info("frame pruning enabled, waiting for delay timeout expiry") + from := uint64(1) + maxFrames := uint64(e.config.Engine.MaxFrames) + batchSize := uint64(1000) +outer: for { select { case <-e.ctx.Done(): @@ -113,15 +117,34 @@ func (e *DataClockConsensusEngine) runFramePruning() { panic(err) } - if head.FrameNumber < uint64(e.config.Engine.MaxFrames)+1 || + if head.FrameNumber <= maxFrames || head.FrameNumber <= application.PROOF_FRAME_SENIORITY_REPAIR+1 { continue } - if err := e.pruneFrames( - head.FrameNumber - uint64(e.config.Engine.MaxFrames), - ); err != nil { - e.logger.Error("could not prune", zap.Error(err)) + to := head.FrameNumber - maxFrames + for i := from; i < to; i += batchSize { + start, stop := i, min(i+batchSize, to) + if err := e.clockStore.DeleteDataClockFrameRange(e.filter, start, stop); err != nil { + e.logger.Error( + "failed to prune frames", + zap.Error(err), + zap.Uint64("from", start), + zap.Uint64("to", stop), + ) + continue outer + } + e.logger.Info( + "pruned frames", + zap.Uint64("from", start), + zap.Uint64("to", stop), + ) + select { + case <-e.ctx.Done(): + return + default: + } + from = stop } } } diff --git a/node/consensus/master/peer_messaging.go b/node/consensus/master/peer_messaging.go index 4eac311..1c62bd8 100644 --- a/node/consensus/master/peer_messaging.go +++ b/node/consensus/master/peer_messaging.go @@ -69,6 +69,7 @@ func (e *MasterClockConsensusEngine) Sync( if err != nil { return errors.Wrap(err, "sync") } + defer iter.Close() response := []*protobufs.ClockFrame{} @@ -81,10 +82,6 @@ func (e *MasterClockConsensusEngine) Sync( response = append(response, frame) } - if err = iter.Close(); err != nil { - return errors.Wrap(err, "sync") - } - if len(response) == 0 { return nil } diff --git a/node/main.go b/node/main.go index 32ce019..ee60fa9 100644 --- a/node/main.go +++ b/node/main.go @@ -144,6 +144,11 @@ var ( true, "when enabled, frame execution validation is skipped", ) + compactDB = flag.Bool( + "compact-db", + false, + "compacts the database and exits", + ) ) func signatureCheckDefault() bool { @@ -330,6 +335,17 @@ func main() { panic(err) } + if *compactDB && *core == 0 { + db := store.NewPebbleDB(nodeConfig.DB) + if err := db.CompactAll(); err != nil { + panic(err) + } + if err := db.Close(); err != nil { + panic(err) + } + return + } + if *network != 0 { if nodeConfig.P2P.BootstrapPeers[0] == config.BootstrapPeers[0] { fmt.Println( diff --git a/node/rpc/node_rpc_server.go b/node/rpc/node_rpc_server.go index be1c5ba..741ad87 100644 --- a/node/rpc/node_rpc_server.go +++ b/node/rpc/node_rpc_server.go @@ -97,21 +97,17 @@ func (r *RPCServer) GetFrames( if err != nil { return nil, errors.Wrap(err, "get frames") } + defer iter.Close() frames := []*protobufs.ClockFrame{} for iter.First(); iter.Valid(); iter.Next() { frame, err := iter.Value() if err != nil { - iter.Close() return nil, errors.Wrap(err, "get frames") } frames = append(frames, frame) } - if err := iter.Close(); err != nil { - return nil, errors.Wrap(err, "get frames") - } - return &protobufs.FramesResponse{ TruncatedClockFrames: frames, }, nil @@ -124,21 +120,17 @@ func (r *RPCServer) GetFrames( if err != nil { return nil, errors.Wrap(err, "get frame info") } + defer iter.Close() frames := []*protobufs.ClockFrame{} for iter.First(); iter.Valid(); iter.Next() { frame, err := iter.TruncatedValue() if err != nil { - iter.Close() return nil, errors.Wrap(err, "get frames") } frames = append(frames, frame) } - if err := iter.Close(); err != nil { - return nil, errors.Wrap(err, "get frames") - } - return &protobufs.FramesResponse{ TruncatedClockFrames: frames, }, nil diff --git a/node/store/clock.go b/node/store/clock.go index 3a24de5..0cf6078 100644 --- a/node/store/clock.go +++ b/node/store/clock.go @@ -216,13 +216,13 @@ func (p *PebbleClockIterator) TruncatedValue() ( if err != nil { return nil, errors.Wrap(err, "get truncated clock frame iterator value") } + defer frameCloser.Close() if err := proto.Unmarshal(frameValue, frame); err != nil { return nil, errors.Wrap( errors.Wrap(err, ErrInvalidData.Error()), "get truncated clock frame iterator value", ) } - frameCloser.Close() } else { if err := proto.Unmarshal(value, frame); err != nil { return nil, errors.Wrap( @@ -252,13 +252,13 @@ func (p *PebbleClockIterator) Value() (*protobufs.ClockFrame, error) { if err != nil { return nil, errors.Wrap(err, "get clock frame iterator value") } + defer frameCloser.Close() if err := proto.Unmarshal(frameValue, frame); err != nil { return nil, errors.Wrap( errors.Wrap(err, ErrInvalidData.Error()), "get clock frame iterator value", ) } - defer frameCloser.Close() } else { if err := proto.Unmarshal(value, frame); err != nil { return nil, errors.Wrap( @@ -469,8 +469,8 @@ func (p *PebbleClockStore) GetEarliestMasterClockFrame( return nil, errors.Wrap(err, "get earliest master clock frame") } - defer closer.Close() + frameNumber := binary.BigEndian.Uint64(idxValue) frame, err := p.GetMasterClockFrame(filter, frameNumber) if err != nil { @@ -492,8 +492,8 @@ func (p *PebbleClockStore) GetLatestMasterClockFrame( return nil, errors.Wrap(err, "get latest master clock frame") } - defer closer.Close() + frameNumber := binary.BigEndian.Uint64(idxValue) frame, err := p.GetMasterClockFrame(filter, frameNumber) if err != nil { @@ -516,11 +516,11 @@ func (p *PebbleClockStore) GetMasterClockFrame( return nil, errors.Wrap(err, "get master clock frame") } + defer closer.Close() copied := make([]byte, len(value)) copy(copied[:], value[:]) - defer closer.Close() frame := &protobufs.ClockFrame{} frame.FrameNumber = frameNumber frame.Filter = filter @@ -595,10 +595,8 @@ func (p *PebbleClockStore) PutMasterClockFrame( ); err != nil { return errors.Wrap(err, "put master clock frame") } - } - - if err == nil && closer != nil { - closer.Close() + } else { + _ = closer.Close() } if err = txn.Set( @@ -625,6 +623,7 @@ func (p *PebbleClockStore) GetDataClockFrame( return nil, nil, errors.Wrap(err, "get data clock frame") } + defer closer.Close() frame := &protobufs.ClockFrame{} genesisFramePreIndex := false @@ -641,14 +640,13 @@ func (p *PebbleClockStore) GetDataClockFrame( return nil, nil, errors.Wrap(err, "get data clock frame") } + defer frameCloser.Close() if err := proto.Unmarshal(frameValue, frame); err != nil { return nil, nil, errors.Wrap( errors.Wrap(err, ErrInvalidData.Error()), "get data clock frame", ) } - closer.Close() - defer frameCloser.Close() } else { genesisFramePreIndex = frameNumber == 0 if err := proto.Unmarshal(value, frame); err != nil { @@ -657,7 +655,6 @@ func (p *PebbleClockStore) GetDataClockFrame( "get data clock frame", ) } - defer closer.Close() } if !truncate { @@ -674,15 +671,17 @@ func (p *PebbleClockStore) GetDataClockFrame( proverTrie := &tries.RollingFrecencyCritbitTrie{} trieData, closer, err := p.db.Get(clockProverTrieKey(filter, i, frameNumber)) if err != nil { + if !errors.Is(err, pebble.ErrNotFound) { + return nil, nil, errors.Wrap(err, "get data clock frame") + } break } + defer closer.Close() if err := proverTrie.Deserialize(trieData); err != nil { - closer.Close() return nil, nil, errors.Wrap(err, "get latest data clock frame") } - closer.Close() i++ proverTries = append(proverTries, proverTrie) } @@ -726,7 +725,6 @@ func (p *PebbleClockStore) deleteAggregateProofs( for i := 0; i < len(frame.Input[516:])/74; i++ { commit := frame.Input[516+(i*74) : 516+((i+1)*74)] err := internalDeleteAggregateProof( - p.db, txn, frame.AggregateProofs[i], commit, @@ -757,7 +755,6 @@ func (p *PebbleClockStore) saveAggregateProofs( for i := 0; i < len(frame.Input[516:])/74; i++ { commit := frame.Input[516+(i*74) : 516+((i+1)*74)] err := internalPutAggregateProof( - p.db, txn, frame.AggregateProofs[i], commit, @@ -788,8 +785,8 @@ func (p *PebbleClockStore) GetEarliestDataClockFrame( return nil, errors.Wrap(err, "get earliest data clock frame") } - defer closer.Close() + frameNumber := binary.BigEndian.Uint64(idxValue) frame, _, err := p.GetDataClockFrame(filter, frameNumber, false) if err != nil { @@ -811,6 +808,7 @@ func (p *PebbleClockStore) GetLatestDataClockFrame( return nil, nil, errors.Wrap(err, "get latest data clock frame") } + defer closer.Close() frameNumber := binary.BigEndian.Uint64(idxValue) frame, tries, err := p.GetDataClockFrame(filter, frameNumber, false) @@ -822,8 +820,6 @@ func (p *PebbleClockStore) GetLatestDataClockFrame( return nil, nil, errors.Wrap(err, "get latest data clock frame") } - closer.Close() - return frame, tries, nil } @@ -843,6 +839,7 @@ func (p *PebbleClockStore) GetStagedDataClockFrame( } return nil, errors.Wrap(err, "get parent data clock frame") } + defer closer.Close() parent := &protobufs.ClockFrame{} if err := proto.Unmarshal(data, parent); err != nil { @@ -858,10 +855,6 @@ func (p *PebbleClockStore) GetStagedDataClockFrame( } } - if closer != nil { - closer.Close() - } - return parent, nil } @@ -879,9 +872,9 @@ func (p *PebbleClockStore) GetStagedDataClockFramesForFrameNumber( } return nil, errors.Wrap(err, "get staged data clock frames") } + defer iter.Close() frames := []*protobufs.ClockFrame{} - for iter.First(); iter.Valid(); iter.Next() { data := iter.Value() frame := &protobufs.ClockFrame{} @@ -899,8 +892,6 @@ func (p *PebbleClockStore) GetStagedDataClockFramesForFrameNumber( frames = append(frames, frame) } - iter.Close() - return frames, nil } @@ -992,10 +983,8 @@ func (p *PebbleClockStore) CommitDataClockFrame( ); err != nil { return errors.Wrap(err, "commit data clock frame") } - } - - if err == nil && closer != nil { - closer.Close() + } else { + _ = closer.Close() } if !backfill { @@ -1058,36 +1047,99 @@ func (p *PebbleClockStore) DeleteDataClockFrameRange( for i := fromFrameNumber; i < toFrameNumber; i++ { frames, err := p.GetStagedDataClockFramesForFrameNumber(filter, i) if err != nil { - continue + if !errors.Is(err, ErrNotFound) && !errors.Is(err, ErrInvalidData) { + _ = txn.Abort() + return errors.Wrap(err, "delete data clock frame range") + } + frames = nil } + outer: for _, frame := range frames { - err = p.deleteAggregateProofs(txn, frame) - if err != nil { - continue + for _, ap := range frame.AggregateProofs { + for _, inc := range ap.InclusionCommitments { + // The commitments collide for very small frames, and as such we have to detect them early + // and avoid deleting them. Common cases for such collisions are prover announcement messages + // which do not contain the frame number, so their binary contents are equivalent between + // multiple frames. + if len(inc.Data) < 2048 { + continue outer + } + if inc.TypeUrl == protobufs.IntrinsicExecutionOutputType { + o := &protobufs.IntrinsicExecutionOutput{} + if err := proto.Unmarshal(inc.Data, o); err != nil { + _ = txn.Abort() + return errors.Wrap(err, "delete data clock frame range") + } + // The commitments collide for empty frames, and as such we have to detect them early + // and avoid deleting them. + if len(o.Output) == 0 || len(o.Proof) == 0 { + continue outer + } + } + } + } + if err := p.deleteAggregateProofs(txn, frame); err != nil { + if !errors.Is(err, pebble.ErrNotFound) { + _ = txn.Abort() + return errors.Wrap(err, "delete data clock frame range") + } } } - err = txn.DeleteRange( + if err := txn.DeleteRange( clockDataParentIndexKey(filter, i, bytes.Repeat([]byte{0x00}, 32)), clockDataParentIndexKey(filter, i, bytes.Repeat([]byte{0xff}, 32)), - ) - if err != nil { - continue + ); err != nil { + if !errors.Is(err, pebble.ErrNotFound) { + _ = txn.Abort() + return errors.Wrap(err, "delete data clock frame range") + } } - err = txn.Delete(clockDataFrameKey(filter, i)) - if err != nil { - continue + if err := txn.Delete(clockDataFrameKey(filter, i)); err != nil { + if !errors.Is(err, pebble.ErrNotFound) { + _ = txn.Abort() + return errors.Wrap(err, "delete data clock frame range") + } + } + + // The prover trie keys are not stored continuously with respect + // to the same frame number. As such, we need to manually iterate + // and discover such keys. + for t := uint16(0); t <= 0xffff; t++ { + _, closer, err := p.db.Get(clockProverTrieKey(filter, t, i)) + if err != nil { + if !errors.Is(err, pebble.ErrNotFound) { + _ = txn.Abort() + return errors.Wrap(err, "delete data clock frame range") + } else { + break + } + } + _ = closer.Close() + if err := txn.Delete(clockProverTrieKey(filter, t, i)); err != nil { + _ = txn.Abort() + return errors.Wrap(err, "delete data clock frame range") + } + } + + if err := txn.DeleteRange( + clockDataTotalDistanceKey(filter, i, bytes.Repeat([]byte{0x00}, 32)), + clockDataTotalDistanceKey(filter, i, bytes.Repeat([]byte{0xff}, 32)), + ); err != nil { + if !errors.Is(err, pebble.ErrNotFound) { + _ = txn.Abort() + return errors.Wrap(err, "delete data clock frame range") + } } } - if err = txn.Commit(); err != nil { - txn.Abort() + if err := txn.Commit(); err != nil { return errors.Wrap(err, "delete data clock frame range") } - return errors.Wrap(err, "delete data clock frame range") + return nil } func (p *PebbleClockStore) ResetMasterClockFrames(filter []byte) error { @@ -1138,7 +1190,7 @@ func (p *PebbleClockStore) Compact( if bytes.Compare(version, config.GetVersion()) < 0 { cleared = false } - closer.Close() + defer closer.Close() } if !cleared { @@ -1190,11 +1242,10 @@ func (p *PebbleClockStore) Compact( return errors.Wrap(err, "compact") } + defer closer.Close() last := binary.BigEndian.Uint64(idxValue) - closer.Close() - for frameNumber := uint64(1); frameNumber <= last; frameNumber++ { value, closer, err := p.db.Get(clockDataFrameKey(dataFilter, frameNumber)) if err != nil { @@ -1204,6 +1255,7 @@ func (p *PebbleClockStore) Compact( return errors.Wrap(err, "compact") } + defer closer.Close() frame := &protobufs.ClockFrame{} @@ -1212,6 +1264,7 @@ func (p *PebbleClockStore) Compact( if err != nil { return errors.Wrap(err, "compact") } + defer frameCloser.Close() if err := proto.Unmarshal(frameValue, frame); err != nil { return errors.Wrap(err, "compact") } @@ -1226,9 +1279,6 @@ func (p *PebbleClockStore) Compact( make([]byte, 32), )), ) - - closer.Close() - frameCloser.Close() } else { if err := proto.Unmarshal(value, frame); err != nil { return errors.Wrap(err, "compact") @@ -1255,14 +1305,15 @@ func (p *PebbleClockStore) Compact( make([]byte, 32), )), ) + if err != nil { + return errors.Wrap(err, "compact") + } parents = append(parents, clockDataParentIndexKey(dataFilter, frameNumber, selector.FillBytes( make([]byte, 32), )), ) - - closer.Close() } for i := 0; i < len(frame.Input[516:])/74; i++ { @@ -1523,10 +1574,8 @@ func (p *PebbleClockStore) GetTotalDistance( return nil, errors.Wrap(err, "get total distance") } - defer closer.Close() dist := new(big.Int).SetBytes(value) - return dist, nil } @@ -1564,7 +1613,6 @@ func (p *PebbleClockStore) GetPeerSeniorityMap(filter []byte) ( if err = dec.Decode(&seniorityMap); err != nil { return nil, errors.Wrap(err, "get peer seniority map") } - return seniorityMap, nil } @@ -1612,9 +1660,12 @@ func (p *PebbleClockStore) SetProverTriesForFrame( clockProverTrieKey(frame.Filter, uint16(start), frame.FrameNumber), ) if err != nil { + if !errors.Is(err, pebble.ErrNotFound) { + return errors.Wrap(err, "set prover tries for frame") + } break } - closer.Close() + _ = closer.Close() if err = p.db.Delete( clockProverTrieKey(frame.Filter, uint16(start), frame.FrameNumber), diff --git a/node/store/coin.go b/node/store/coin.go index d337a8f..e152d82 100644 --- a/node/store/coin.go +++ b/node/store/coin.go @@ -136,8 +136,8 @@ func (p *PebbleCoinStore) GetCoinsForOwner( err = errors.Wrap(err, "get coins for owner") return nil, nil, nil, err } - defer iter.Close() + frameNumbers := []uint64{} addresses := [][]byte{} coins := []*protobufs.Coin{} @@ -176,8 +176,8 @@ func (p *PebbleCoinStore) GetPreCoinProofsForOwner(owner []byte) ( err = errors.Wrap(err, "get pre coin proofs for owner") return nil, nil, err } - defer iter.Close() + frameNumbers := []uint64{} proofs := []*protobufs.PreCoinProof{} for iter.First(); iter.Valid(); iter.Next() { @@ -215,7 +215,6 @@ func (p *PebbleCoinStore) GetCoinByAddress(txn Transaction, address []byte) ( err = errors.Wrap(err, "get coin by address") return nil, err } - defer closer.Close() coin := &protobufs.Coin{} @@ -240,7 +239,6 @@ func (p *PebbleCoinStore) GetPreCoinProofByAddress(address []byte) ( err = errors.Wrap(err, "get pre coin proof by address") return nil, err } - defer closer.Close() proof := &protobufs.PreCoinProof{} @@ -386,9 +384,9 @@ func (p *PebbleCoinStore) GetLatestFrameProcessed() (uint64, error) { return 0, errors.Wrap(err, "get latest frame processed") } + defer closer.Close() frameNumber := binary.BigEndian.Uint64(v) - closer.Close() return frameNumber, nil } @@ -524,13 +522,12 @@ func (p *PebbleCoinStore) Migrate(filter []byte, genesisSeedHex string) error { } return p.internalMigrate(filter, seed) } + defer closer.Close() if !bytes.Equal(compare, seed) { return p.internalMigrate(filter, seed) } - closer.Close() - status, closer, err := p.db.Get(migrationKey()) if err != nil { if !errors.Is(err, pebble.ErrNotFound) { diff --git a/node/store/data_proof.go b/node/store/data_proof.go index b2c6336..d079b7a 100644 --- a/node/store/data_proof.go +++ b/node/store/data_proof.go @@ -139,8 +139,8 @@ func internalGetAggregateProof( return nil, errors.Wrap(err, "get aggregate proof") } - defer closer.Close() + copied := make([]byte, len(value[8:])) limit := binary.BigEndian.Uint64(value[0:8]) copy(copied, value[8:]) @@ -159,6 +159,7 @@ func internalGetAggregateProof( if err != nil { return nil, errors.Wrap(err, "get aggregate proof") } + defer iter.Close() i := uint32(0) @@ -199,14 +200,11 @@ func internalGetAggregateProof( return nil, errors.Wrap(err, "get aggregate proof") } + defer dataCloser.Close() segCopy := make([]byte, len(segValue)) copy(segCopy, segValue) chunks = append(chunks, segCopy) - - if err = dataCloser.Close(); err != nil { - return nil, errors.Wrap(err, "get aggregate proof") - } } if string(url) == protobufs.IntrinsicExecutionOutputType { @@ -236,10 +234,6 @@ func internalGetAggregateProof( i++ } - if err = iter.Close(); err != nil { - return nil, errors.Wrap(err, "get aggregate proof") - } - return aggregate, nil } @@ -263,8 +257,8 @@ func internalListAggregateProofKeys( return nil, nil, nil, errors.Wrap(err, "list aggregate proof") } - defer closer.Close() + copied := make([]byte, len(value[8:])) limit := binary.BigEndian.Uint64(value[0:8]) copy(copied, value[8:]) @@ -278,6 +272,7 @@ func internalListAggregateProofKeys( return nil, nil, nil, errors.Wrap(err, "list aggregate proof") } + defer iter.Close() i := uint32(0) commits = append(commits, dataProofInclusionKey(filter, commitment, 0)) @@ -305,10 +300,6 @@ func internalListAggregateProofKeys( i++ } - if err = iter.Close(); err != nil { - return nil, nil, nil, errors.Wrap(err, "list aggregate proof") - } - return proofs, commits, data, nil } @@ -326,17 +317,10 @@ func (p *PebbleDataProofStore) GetAggregateProof( } func internalDeleteAggregateProof( - db KVDB, txn Transaction, aggregateProof *protobufs.InclusionAggregateProof, commitment []byte, ) error { - buf := binary.BigEndian.AppendUint64( - nil, - uint64(len(aggregateProof.InclusionCommitments)), - ) - buf = append(buf, aggregateProof.Proof...) - for i, inc := range aggregateProof.InclusionCommitments { var segments [][]byte if inc.TypeUrl == protobufs.IntrinsicExecutionOutputType { @@ -378,7 +362,6 @@ func internalDeleteAggregateProof( } func internalPutAggregateProof( - db KVDB, txn Transaction, aggregateProof *protobufs.InclusionAggregateProof, commitment []byte, @@ -447,7 +430,6 @@ func (p *PebbleDataProofStore) PutAggregateProof( commitment []byte, ) error { return internalPutAggregateProof( - p.db, txn, aggregateProof, commitment, @@ -467,8 +449,8 @@ func (p *PebbleDataProofStore) GetDataTimeProof( err = errors.Wrap(err, "get data time proof") return } - defer closer.Close() + if len(data) < 24 { err = ErrInvalidData return @@ -513,13 +495,10 @@ func (p *PebbleDataProofStore) GetTotalReward( return nil, errors.Wrap(err, "get total difficulty sum") } + defer closer.Close() if len(prev) != 0 { reward.SetBytes(prev[4:]) - - if err = closer.Close(); err != nil { - return nil, errors.Wrap(err, "get total difficulty sum") - } } return reward, nil @@ -556,15 +535,12 @@ func (p *PebbleDataProofStore) PutDataTimeProof( if err != nil && (!errors.Is(err, pebble.ErrNotFound) || increment != 0) { return errors.Wrap(err, "put data time proof") } + defer closer.Close() if len(prev) != 0 { priorSum.SetBytes(prev[4:]) prevIncrement := binary.BigEndian.Uint32(prev[:4]) - if err = closer.Close(); err != nil { - return errors.Wrap(err, "put data time proof") - } - if prevIncrement != increment-1 { return errors.Wrap(errors.New("invalid increment"), "put data time proof") } @@ -609,15 +585,13 @@ func (p *PebbleDataProofStore) GetLatestDataTimeProof(peerId []byte) ( return 0, 0, nil, errors.Wrap(err, "get latest data time proof") } + defer closer.Close() if len(prev) < 4 { return 0, 0, nil, ErrInvalidData } increment = binary.BigEndian.Uint32(prev[:4]) - if err = closer.Close(); err != nil { - return 0, 0, nil, errors.Wrap(err, "get latest data time proof") - } _, parallelism, _, output, err = p.GetDataTimeProof(peerId, increment) diff --git a/node/store/inmem.go b/node/store/inmem.go index 558595c..54110a9 100644 --- a/node/store/inmem.go +++ b/node/store/inmem.go @@ -281,15 +281,13 @@ func (t *InMemKVDBTransaction) DeleteRange( if err != nil { return err } + defer iter.Close() for iter.First(); iter.Valid(); iter.Next() { t.changes = append(t.changes, InMemKVDBOperation{ op: DeleteOperation, key: iter.Key(), }) - if err != nil { - return err - } } return nil @@ -416,6 +414,7 @@ func (d *InMemKVDB) DeleteRange(start, end []byte) error { if err != nil { return err } + defer iter.Close() for iter.First(); iter.Valid(); iter.Next() { err = d.Delete(iter.Key()) diff --git a/node/store/key.go b/node/store/key.go index 1169430..a69bafc 100644 --- a/node/store/key.go +++ b/node/store/key.go @@ -270,6 +270,8 @@ func (p *PebbleKeyStore) IncludeProvingKey( staged, closer, err := p.db.Get(stagedProvingKeyKey(provingKey.PublicKey())) if err != nil && !errors.Is(err, ErrNotFound) { return errors.Wrap(err, "include proving key") + } else if err == nil { + defer closer.Close() } if staged != nil { @@ -279,9 +281,6 @@ func (p *PebbleKeyStore) IncludeProvingKey( return errors.Wrap(err, "include proving key") } } - if err := closer.Close(); err != nil { - return errors.Wrap(err, "include proving key") - } return nil } @@ -297,16 +296,13 @@ func (p *PebbleKeyStore) GetStagedProvingKey( return nil, errors.Wrap(err, "get staged proving key") } + defer closer.Close() stagedKey := &protobufs.ProvingKeyAnnouncement{} if err = proto.Unmarshal(data, stagedKey); err != nil { return nil, errors.Wrap(err, "get staged proving key") } - if err := closer.Close(); err != nil { - return nil, errors.Wrap(err, "get staged proving key") - } - return stagedKey, nil } @@ -322,12 +318,9 @@ func (p *PebbleKeyStore) GetLatestKeyBundle( return nil, errors.Wrap(err, "get latest key bundle") } + defer closer.Close() frameNumber := binary.BigEndian.Uint64(value) - if err := closer.Close(); err != nil { - return nil, errors.Wrap(err, "get latest key bundle") - } - value, closer, err = p.db.Get(keyBundleKey(provingKey, frameNumber)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { @@ -336,7 +329,6 @@ func (p *PebbleKeyStore) GetLatestKeyBundle( return nil, errors.Wrap(err, "get latest key bundle") } - defer closer.Close() announcement := &protobufs.InclusionCommitment{} @@ -440,10 +432,8 @@ func (p *PebbleKeyStore) PutKeyBundle( ); err != nil { return errors.Wrap(err, "put key bundle") } - } - - if err == nil && closer != nil { - closer.Close() + } else { + _ = closer.Close() } if err = txn.Set( diff --git a/node/store/peerstore.go b/node/store/peerstore.go index c59e260..85f0c3c 100644 --- a/node/store/peerstore.go +++ b/node/store/peerstore.go @@ -74,10 +74,10 @@ func (d *PeerstoreDatastore) Get( } return nil, err } + defer closer.Close() out := make([]byte, len(val)) copy(out[:], val[:]) - closer.Close() return val, nil } @@ -226,10 +226,10 @@ func (t *transaction) Get( } return nil, errors.Wrap(err, "get") } + defer closer.Close() out := make([]byte, len(b)) copy(out[:], b[:]) - closer.Close() return b, nil } From 9a09dc904b2148a4b2eeb450d97abfd14eeb3db8 Mon Sep 17 00:00:00 2001 From: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> Date: Sun, 8 Dec 2024 04:19:15 +0100 Subject: [PATCH 3/6] Use default peer outbound queue size (#402) --- go-libp2p-blossomsub/pubsub.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go-libp2p-blossomsub/pubsub.go b/go-libp2p-blossomsub/pubsub.go index 13e323e..ff74d89 100644 --- a/go-libp2p-blossomsub/pubsub.go +++ b/go-libp2p-blossomsub/pubsub.go @@ -276,7 +276,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option disc: &discover{}, softMaxMessageSize: DefaultSoftMaxMessageSize, hardMaxMessageSize: DefaultHardMaxMessageSize, - peerOutboundQueueSize: 32, + peerOutboundQueueSize: DefaultPeerOutboundQueueSize, signID: h.ID(), signKey: nil, signPolicy: LaxSign, From a329bdab3a0c0b2a1c601c6b36b3fa2c32281b81 Mon Sep 17 00:00:00 2001 From: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> Date: Sun, 8 Dec 2024 04:21:03 +0100 Subject: [PATCH 4/6] Limit sync candidates (#407) --- node/config/engine.go | 2 ++ node/consensus/data/consensus_frames.go | 18 +++++++++++++----- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/node/config/engine.go b/node/config/engine.go index 09b237a..b428f15 100644 --- a/node/config/engine.go +++ b/node/config/engine.go @@ -94,6 +94,8 @@ type EngineConfig struct { AutoMergeCoins bool `yaml:"autoMergeCoins"` // Maximum wait time for a frame to be downloaded from a peer. SyncTimeout time.Duration `yaml:"syncTimeout"` + // Number of candidate peers per category to sync with. + SyncCandidates int `yaml:"syncCandidates"` // Values used only for testing – do not override these in production, your // node will get kicked out diff --git a/node/consensus/data/consensus_frames.go b/node/consensus/data/consensus_frames.go index bd7c5fd..68c82b6 100644 --- a/node/consensus/data/consensus_frames.go +++ b/node/consensus/data/consensus_frames.go @@ -20,7 +20,10 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/protobufs" ) -const defaultSyncTimeout = 4 * time.Second +const ( + defaultSyncTimeout = 4 * time.Second + defaultSyncCandidates = 8 +) func (e *DataClockConsensusEngine) syncWithMesh() error { e.logger.Info("collecting vdf proofs") @@ -304,11 +307,16 @@ func (e *DataClockConsensusEngine) GetAheadPeers(frameNumber uint64) []internal. } } + syncCandidates := e.config.Engine.SyncCandidates + if syncCandidates == 0 { + syncCandidates = defaultSyncCandidates + } + return slices.Concat( - internal.WeightedSampleWithoutReplacement(nearCandidates, len(nearCandidates)), - internal.WeightedSampleWithoutReplacement(reachableCandidates, len(reachableCandidates)), - internal.WeightedSampleWithoutReplacement(unknownCandidates, len(unknownCandidates)), - internal.WeightedSampleWithoutReplacement(unreachableCandidates, len(unreachableCandidates)), + internal.WeightedSampleWithoutReplacement(nearCandidates, min(len(nearCandidates), syncCandidates)), + internal.WeightedSampleWithoutReplacement(reachableCandidates, min(len(reachableCandidates), syncCandidates)), + internal.WeightedSampleWithoutReplacement(unknownCandidates, min(len(unknownCandidates), syncCandidates)), + internal.WeightedSampleWithoutReplacement(unreachableCandidates, min(len(unreachableCandidates), syncCandidates)), ) } From b728d8d76f51898a7fc09dd0e572b7c2d6790f5d Mon Sep 17 00:00:00 2001 From: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> Date: Wed, 11 Dec 2024 02:10:49 +0100 Subject: [PATCH 5/6] Centralize configuration defaults and upgrade message limits (#410) * Apply config defaults early * Apply engine config defaults early * Apply P2P config defaults early * Remove default duplicates * Fix casing * Add sync message size configuration --- node/config/config.go | 31 +++- node/config/engine.go | 52 +++++- node/config/p2p.go | 160 +++++++++++++++++- node/consensus/data/broadcast_messaging.go | 2 +- node/consensus/data/consensus_frames.go | 18 +- .../data/data_clock_consensus_engine.go | 34 +--- node/consensus/data/peer_messaging.go | 2 +- node/main.go | 9 - node/p2p/blossomsub.go | 156 +---------------- 9 files changed, 250 insertions(+), 214 deletions(-) diff --git a/node/config/config.go b/node/config/config.go index a552e28..7afd8ee 100644 --- a/node/config/config.go +++ b/node/config/config.go @@ -22,6 +22,24 @@ import ( "gopkg.in/yaml.v2" ) +type GRPCMessageLimitsConfig struct { + MaxRecvMsgSize int `yaml:"maxRecvMsgSize"` + MaxSendMsgSize int `yaml:"maxSendMsgSize"` +} + +// WithDefaults returns a copy of the GRPCMessageLimitsConfig with any missing fields set to +// their default values. +func (c GRPCMessageLimitsConfig) WithDefaults(recv, send int) GRPCMessageLimitsConfig { + cpy := c + if cpy.MaxRecvMsgSize == 0 { + cpy.MaxRecvMsgSize = recv + } + if cpy.MaxSendMsgSize == 0 { + cpy.MaxSendMsgSize = send + } + return cpy +} + type Config struct { Key *KeyConfig `yaml:"key"` P2P *P2PConfig `yaml:"p2p"` @@ -32,6 +50,16 @@ type Config struct { LogFile string `yaml:"logFile"` } +// WithDefaults returns a copy of the config with default values filled in. +func (c Config) WithDefaults() Config { + cpy := c + p2p := cpy.P2P.WithDefaults() + cpy.P2P = &p2p + engine := cpy.Engine.WithDefaults() + cpy.Engine = &engine + return cpy +} + func NewConfig(configPath string) (*Config, error) { file, err := os.Open(configPath) if err != nil { @@ -447,7 +475,8 @@ func LoadConfig(configPath string, proverKey string, skipGenesisCheck bool) ( config.P2P.BootstrapPeers = peers } - return config, nil + withDefaults := config.WithDefaults() + return &withDefaults, nil } func SaveConfig(configPath string, config *Config) error { diff --git a/node/config/engine.go b/node/config/engine.go index b428f15..a9b6db0 100644 --- a/node/config/engine.go +++ b/node/config/engine.go @@ -2,6 +2,17 @@ package config import "time" +const ( + defaultMinimumPeersRequired = 3 + defaultDataWorkerBaseListenMultiaddr = "/ip4/127.0.0.1/tcp/%d" + defaultDataWorkerBaseListenPort = 40000 + defaultDataWorkerMemoryLimit = 1792 * 1024 * 1024 // 1.75 GiB + defaultSyncTimeout = 4 * time.Second + defaultSyncCandidates = 8 + defaultSyncMessageReceiveLimit = 1 * 1024 * 1024 + defaultSyncMessageSendLimit = 600 * 1024 * 1024 +) + type FramePublishFragmentationReedSolomonConfig struct { // The number of data shards to use for Reed-Solomon encoding and decoding. DataShards int `yaml:"dataShards"` @@ -9,7 +20,8 @@ type FramePublishFragmentationReedSolomonConfig struct { ParityShards int `yaml:"parityShards"` } -// WithDefaults sets default values for any fields that are not set. +// WithDefaults returns a copy of the FramePublishFragmentationReedSolomonConfig with any missing fields set to +// their default values. func (c FramePublishFragmentationReedSolomonConfig) WithDefaults() FramePublishFragmentationReedSolomonConfig { cpy := c if cpy.DataShards == 0 { @@ -29,7 +41,8 @@ type FramePublishFragmentationConfig struct { ReedSolomon FramePublishFragmentationReedSolomonConfig `yaml:"reedSolomon"` } -// WithDefaults sets default values for any fields that are not set. +// WithDefaults returns a copy of the FramePublishFragmentationConfig with any missing fields set to +// their default values. func (c FramePublishFragmentationConfig) WithDefaults() FramePublishFragmentationConfig { cpy := c if cpy.Algorithm == "" { @@ -53,7 +66,8 @@ type FramePublishConfig struct { BallastSize int `yaml:"ballastSize"` } -// WithDefaults sets default values for any fields that are not set. +// WithDefaults returns a copy of the FramePublishConfig with any missing fields set to +// their default values. func (c FramePublishConfig) WithDefaults() FramePublishConfig { cpy := c if cpy.Mode == "" { @@ -96,6 +110,8 @@ type EngineConfig struct { SyncTimeout time.Duration `yaml:"syncTimeout"` // Number of candidate peers per category to sync with. SyncCandidates int `yaml:"syncCandidates"` + // The configuration for the GRPC message limits. + SyncMessageLimits GRPCMessageLimitsConfig `yaml:"syncMessageLimits"` // Values used only for testing – do not override these in production, your // node will get kicked out @@ -106,3 +122,33 @@ type EngineConfig struct { // EXPERIMENTAL: The configuration for frame publishing. FramePublish FramePublishConfig `yaml:"framePublish"` } + +// WithDefaults returns a copy of the EngineConfig with any missing fields set to +// their default values. +func (c EngineConfig) WithDefaults() EngineConfig { + cpy := c + if cpy.MinimumPeersRequired == 0 { + cpy.MinimumPeersRequired = defaultMinimumPeersRequired + } + if cpy.DataWorkerBaseListenMultiaddr == "" { + cpy.DataWorkerBaseListenMultiaddr = defaultDataWorkerBaseListenMultiaddr + } + if cpy.DataWorkerBaseListenPort == 0 { + cpy.DataWorkerBaseListenPort = defaultDataWorkerBaseListenPort + } + if cpy.DataWorkerMemoryLimit == 0 { + cpy.DataWorkerMemoryLimit = defaultDataWorkerMemoryLimit + } + if cpy.SyncTimeout == 0 { + cpy.SyncTimeout = defaultSyncTimeout + } + if cpy.SyncCandidates == 0 { + cpy.SyncCandidates = defaultSyncCandidates + } + cpy.SyncMessageLimits = cpy.SyncMessageLimits.WithDefaults( + defaultSyncMessageReceiveLimit, + defaultSyncMessageSendLimit, + ) + cpy.FramePublish = cpy.FramePublish.WithDefaults() + return cpy +} diff --git a/node/config/p2p.go b/node/config/p2p.go index b6b54e5..504fa9d 100644 --- a/node/config/p2p.go +++ b/node/config/p2p.go @@ -2,6 +2,22 @@ package config import ( "time" + + blossomsub "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub" + qruntime "source.quilibrium.com/quilibrium/monorepo/node/internal/runtime" +) + +const ( + defaultLowWatermarkConnections = 160 + defaultHighWatermarkConnections = 192 + defaultGRPCServerRateLimit = 10 + defaultMinBootstrapPeers = 3 + defaultBootstrapParallelism = 10 + defaultDiscoveryParallelism = 50 + defaultDiscoveryPeerLookupLimit = 1000 + defaultPingTimeout = 5 * time.Second + defaultPingPeriod = 30 * time.Second + defaultPingAttempts = 3 ) type P2PConfig struct { @@ -43,7 +59,7 @@ type P2PConfig struct { LowWatermarkConnections int `yaml:"lowWatermarkConnections"` HighWatermarkConnections int `yaml:"highWatermarkConnections"` DirectPeers []string `yaml:"directPeers"` - GrpcServerRateLimit int `yaml:"grpcServerRateLimit"` + GRPCServerRateLimit int `yaml:"grpcServerRateLimit"` MinBootstrapPeers int `yaml:"minBootstrapPeers"` BootstrapParallelism int `yaml:"bootstrapParallelism"` DiscoveryParallelism int `yaml:"discoveryParallelism"` @@ -56,3 +72,145 @@ type P2PConfig struct { SubscriptionQueueSize int `yaml:"subscriptionQueueSize"` PeerOutboundQueueSize int `yaml:"peerOutboundQueueSize"` } + +// WithDefaults returns a copy of the P2PConfig with any missing fields set to +// their default values. +func (c P2PConfig) WithDefaults() P2PConfig { + cpy := c + if cpy.D == 0 { + cpy.D = blossomsub.BlossomSubD + } + if cpy.DLo == 0 { + cpy.DLo = blossomsub.BlossomSubDlo + } + if cpy.DHi == 0 { + cpy.DHi = blossomsub.BlossomSubDhi + } + if cpy.DScore == 0 { + cpy.DScore = blossomsub.BlossomSubDscore + } + if cpy.DOut == 0 { + cpy.DOut = blossomsub.BlossomSubDout + } + if cpy.HistoryLength == 0 { + cpy.HistoryLength = blossomsub.BlossomSubHistoryLength + } + if cpy.HistoryGossip == 0 { + cpy.HistoryGossip = blossomsub.BlossomSubHistoryGossip + } + if cpy.DLazy == 0 { + cpy.DLazy = blossomsub.BlossomSubDlazy + } + if cpy.GossipFactor == 0 { + cpy.GossipFactor = blossomsub.BlossomSubGossipFactor + } + if cpy.GossipRetransmission == 0 { + cpy.GossipRetransmission = blossomsub.BlossomSubGossipRetransmission + } + if cpy.HeartbeatInitialDelay == 0 { + cpy.HeartbeatInitialDelay = blossomsub.BlossomSubHeartbeatInitialDelay + } + if cpy.HeartbeatInterval == 0 { + cpy.HeartbeatInterval = blossomsub.BlossomSubHeartbeatInterval + } + if cpy.FanoutTTL == 0 { + cpy.FanoutTTL = blossomsub.BlossomSubFanoutTTL + } + if cpy.PrunePeers == 0 { + cpy.PrunePeers = blossomsub.BlossomSubPrunePeers + } + if cpy.PruneBackoff == 0 { + cpy.PruneBackoff = blossomsub.BlossomSubPruneBackoff + } + if cpy.UnsubscribeBackoff == 0 { + cpy.UnsubscribeBackoff = blossomsub.BlossomSubUnsubscribeBackoff + } + if cpy.Connectors == 0 { + cpy.Connectors = blossomsub.BlossomSubConnectors + } + if cpy.MaxPendingConnections == 0 { + cpy.MaxPendingConnections = blossomsub.BlossomSubMaxPendingConnections + } + if cpy.ConnectionTimeout == 0 { + cpy.ConnectionTimeout = blossomsub.BlossomSubConnectionTimeout + } + if cpy.DirectConnectTicks == 0 { + cpy.DirectConnectTicks = blossomsub.BlossomSubDirectConnectTicks + } + if cpy.DirectConnectInitialDelay == 0 { + cpy.DirectConnectInitialDelay = + blossomsub.BlossomSubDirectConnectInitialDelay + } + if cpy.OpportunisticGraftTicks == 0 { + cpy.OpportunisticGraftTicks = + blossomsub.BlossomSubOpportunisticGraftTicks + } + if cpy.OpportunisticGraftPeers == 0 { + cpy.OpportunisticGraftPeers = + blossomsub.BlossomSubOpportunisticGraftPeers + } + if cpy.GraftFloodThreshold == 0 { + cpy.GraftFloodThreshold = blossomsub.BlossomSubGraftFloodThreshold + } + if cpy.MaxIHaveLength == 0 { + cpy.MaxIHaveLength = blossomsub.BlossomSubMaxIHaveLength + } + if cpy.MaxIHaveMessages == 0 { + cpy.MaxIHaveMessages = blossomsub.BlossomSubMaxIHaveMessages + } + if cpy.MaxIDontWantMessages == 0 { + cpy.MaxIDontWantMessages = blossomsub.BlossomSubMaxIDontWantMessages + } + if cpy.IWantFollowupTime == 0 { + cpy.IWantFollowupTime = blossomsub.BlossomSubIWantFollowupTime + } + if cpy.IDontWantMessageThreshold == 0 { + cpy.IDontWantMessageThreshold = blossomsub.BlossomSubIDontWantMessageThreshold + } + if cpy.IDontWantMessageTTL == 0 { + cpy.IDontWantMessageTTL = blossomsub.BlossomSubIDontWantMessageTTL + } + if cpy.LowWatermarkConnections == 0 { + cpy.LowWatermarkConnections = defaultLowWatermarkConnections + } + if cpy.HighWatermarkConnections == 0 { + cpy.HighWatermarkConnections = defaultHighWatermarkConnections + } + if cpy.GRPCServerRateLimit == 0 { + cpy.GRPCServerRateLimit = defaultGRPCServerRateLimit + } + if cpy.MinBootstrapPeers == 0 { + cpy.MinBootstrapPeers = defaultMinBootstrapPeers + } + if cpy.BootstrapParallelism == 0 { + cpy.BootstrapParallelism = defaultBootstrapParallelism + } + if cpy.DiscoveryParallelism == 0 { + cpy.DiscoveryParallelism = defaultDiscoveryParallelism + } + if cpy.DiscoveryPeerLookupLimit == 0 { + cpy.DiscoveryPeerLookupLimit = defaultDiscoveryPeerLookupLimit + } + if cpy.PingTimeout == 0 { + cpy.PingTimeout = defaultPingTimeout + } + if cpy.PingPeriod == 0 { + cpy.PingPeriod = defaultPingPeriod + } + if cpy.PingAttempts == 0 { + cpy.PingAttempts = defaultPingAttempts + } + if cpy.ValidateQueueSize == 0 { + cpy.ValidateQueueSize = blossomsub.DefaultValidateQueueSize + } + if cpy.ValidateWorkers == 0 { + cpy.ValidateWorkers = qruntime.WorkerCount(0, false) + } + if cpy.SubscriptionQueueSize == 0 { + cpy.SubscriptionQueueSize = blossomsub.DefaultSubscriptionQueueSize + } + if cpy.PeerOutboundQueueSize == 0 { + cpy.PeerOutboundQueueSize = blossomsub.DefaultPeerOutboundQueueSize + } + return cpy +} diff --git a/node/consensus/data/broadcast_messaging.go b/node/consensus/data/broadcast_messaging.go index 312306b..5fd19fb 100644 --- a/node/consensus/data/broadcast_messaging.go +++ b/node/consensus/data/broadcast_messaging.go @@ -98,7 +98,7 @@ func (e *DataClockConsensusEngine) publishProof( } e.peerMapMx.Unlock() - cfg := e.config.Engine.FramePublish.WithDefaults() + cfg := e.config.Engine.FramePublish if cfg.BallastSize > 0 { frame = proto.Clone(frame).(*protobufs.ClockFrame) frame.Padding = make([]byte, cfg.BallastSize) diff --git a/node/consensus/data/consensus_frames.go b/node/consensus/data/consensus_frames.go index 68c82b6..99c659b 100644 --- a/node/consensus/data/consensus_frames.go +++ b/node/consensus/data/consensus_frames.go @@ -20,11 +20,6 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/protobufs" ) -const ( - defaultSyncTimeout = 4 * time.Second - defaultSyncCandidates = 8 -) - func (e *DataClockConsensusEngine) syncWithMesh() error { e.logger.Info("collecting vdf proofs") @@ -308,10 +303,6 @@ func (e *DataClockConsensusEngine) GetAheadPeers(frameNumber uint64) []internal. } syncCandidates := e.config.Engine.SyncCandidates - if syncCandidates == 0 { - syncCandidates = defaultSyncCandidates - } - return slices.Concat( internal.WeightedSampleWithoutReplacement(nearCandidates, min(len(nearCandidates), syncCandidates)), internal.WeightedSampleWithoutReplacement(reachableCandidates, min(len(reachableCandidates), syncCandidates)), @@ -350,10 +341,6 @@ func (e *DataClockConsensusEngine) syncWithPeer( }() syncTimeout := e.config.Engine.SyncTimeout - if syncTimeout == 0 { - syncTimeout = defaultSyncTimeout - } - dialCtx, cancelDial := context.WithTimeout(e.ctx, syncTimeout) defer cancelDial() cc, err := e.pubSub.GetDirectChannel(dialCtx, peerId, "sync") @@ -379,7 +366,10 @@ func (e *DataClockConsensusEngine) syncWithPeer( &protobufs.GetDataFrameRequest{ FrameNumber: latest.FrameNumber + 1, }, - grpc.MaxCallRecvMsgSize(600*1024*1024), + // The message size limits are swapped because the server is the one + // sending the data. + grpc.MaxCallRecvMsgSize(e.config.Engine.SyncMessageLimits.MaxSendMsgSize), + grpc.MaxCallSendMsgSize(e.config.Engine.SyncMessageLimits.MaxRecvMsgSize), ) cancelGet() if err != nil { diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index 801714d..69cea60 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -220,21 +220,11 @@ func NewDataClockConsensusEngine( panic(errors.New("peer info manager is nil")) } - minimumPeersRequired := cfg.Engine.MinimumPeersRequired - if minimumPeersRequired == 0 { - minimumPeersRequired = 3 - } - difficulty := cfg.Engine.Difficulty if difficulty == 0 { difficulty = 160000 } - rateLimit := cfg.P2P.GrpcServerRateLimit - if rateLimit == 0 { - rateLimit = 10 - } - clockFrameFragmentBuffer, err := fragmentation.NewClockFrameFragmentCircularBuffer( fragmentation.NewReedSolomonClockFrameFragmentBuffer, 16, @@ -272,7 +262,7 @@ func NewDataClockConsensusEngine( syncingStatus: SyncStatusNotSyncing, peerMap: map[string]*peerInfo{}, uncooperativePeersMap: map[string]*peerInfo{}, - minimumPeersRequired: minimumPeersRequired, + minimumPeersRequired: cfg.Engine.MinimumPeersRequired, report: report, frameProver: frameProver, masterTimeReel: masterTimeReel, @@ -285,7 +275,7 @@ func NewDataClockConsensusEngine( config: cfg, preMidnightMint: map[string]struct{}{}, grpcRateLimiter: NewRateLimiter( - rateLimit, + cfg.P2P.GRPCServerRateLimit, time.Minute, ), requestSyncCh: make(chan struct{}, 1), @@ -353,8 +343,8 @@ func (e *DataClockConsensusEngine) Start() <-chan error { e.pubSub.Subscribe(e.infoFilter, e.handleInfoMessage) syncServer := qgrpc.NewServer( - grpc.MaxSendMsgSize(40*1024*1024), - grpc.MaxRecvMsgSize(40*1024*1024), + grpc.MaxRecvMsgSize(e.config.Engine.SyncMessageLimits.MaxRecvMsgSize), + grpc.MaxSendMsgSize(e.config.Engine.SyncMessageLimits.MaxSendMsgSize), ) e.grpcServers = append(e.grpcServers[:0:0], syncServer) protobufs.RegisterDataServiceServer(syncServer, e) @@ -889,14 +879,6 @@ func ( zap.Uint32("client", index), ) - if e.config.Engine.DataWorkerBaseListenMultiaddr == "" { - e.config.Engine.DataWorkerBaseListenMultiaddr = "/ip4/127.0.0.1/tcp/%d" - } - - if e.config.Engine.DataWorkerBaseListenPort == 0 { - e.config.Engine.DataWorkerBaseListenPort = 40000 - } - ma, err := multiaddr.NewMultiaddr( fmt.Sprintf( e.config.Engine.DataWorkerBaseListenMultiaddr, @@ -1001,14 +983,6 @@ func (e *DataClockConsensusEngine) createParallelDataClientsFromBaseMultiaddr( zap.Int("parallelism", parallelism), ) - if e.config.Engine.DataWorkerBaseListenMultiaddr == "" { - e.config.Engine.DataWorkerBaseListenMultiaddr = "/ip4/127.0.0.1/tcp/%d" - } - - if e.config.Engine.DataWorkerBaseListenPort == 0 { - e.config.Engine.DataWorkerBaseListenPort = 40000 - } - clients := make([]protobufs.DataIPCServiceClient, parallelism) for i := 0; i < parallelism; i++ { diff --git a/node/consensus/data/peer_messaging.go b/node/consensus/data/peer_messaging.go index a8a991c..b47c94e 100644 --- a/node/consensus/data/peer_messaging.go +++ b/node/consensus/data/peer_messaging.go @@ -35,7 +35,7 @@ func (e *DataClockConsensusEngine) GetDataFrame( if !ok { return nil, status.Error(codes.Internal, "remote peer ID not found") } - if e.config.P2P.GrpcServerRateLimit != -1 { + if e.config.P2P.GRPCServerRateLimit != -1 { if err := e.grpcRateLimiter.Allow(peerID); err != nil { return nil, err } diff --git a/node/main.go b/node/main.go index ee60fa9..25b1b1d 100644 --- a/node/main.go +++ b/node/main.go @@ -400,15 +400,6 @@ func main() { return } - if nodeConfig.Engine.DataWorkerBaseListenMultiaddr == "" { - nodeConfig.Engine.DataWorkerBaseListenMultiaddr = "/ip4/127.0.0.1/tcp/%d" - } - if nodeConfig.Engine.DataWorkerBaseListenPort == 0 { - nodeConfig.Engine.DataWorkerBaseListenPort = 40000 - } - if nodeConfig.Engine.DataWorkerMemoryLimit == 0 { - nodeConfig.Engine.DataWorkerMemoryLimit = 1792 * 1024 * 1024 // 1.75GiB - } if len(nodeConfig.Engine.DataWorkerMultiaddrs) == 0 { maxProcs, numCPU := runtime.GOMAXPROCS(0), runtime.NumCPU() if maxProcs > numCPU && !nodeConfig.Engine.AllowExcessiveGOMAXPROCS { diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index cfd1ad9..a7cb340 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -45,26 +45,13 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/config" qgrpc "source.quilibrium.com/quilibrium/monorepo/node/internal/grpc" "source.quilibrium.com/quilibrium/monorepo/node/internal/observability" - qruntime "source.quilibrium.com/quilibrium/monorepo/node/internal/runtime" "source.quilibrium.com/quilibrium/monorepo/node/p2p/internal" "source.quilibrium.com/quilibrium/monorepo/node/protobufs" ) -// The default watermarks are the defaults used by libp2p.DefaultConnectionManager. -// We explicitly set them here in order to force internal consistency between the -// connection manager and the resource manager. const ( - defaultLowWatermarkConnections = 160 - defaultHighWatermarkConnections = 192 - defaultMinBootstrapPeers = 3 - defaultBootstrapParallelism = 10 - defaultDiscoveryParallelism = 50 - defaultDiscoveryPeerLookupLimit = 1000 - defaultPingTimeout = 5 * time.Second - defaultPingPeriod = 30 * time.Second - defaultPingAttempts = 3 - DecayInterval = 10 * time.Second - AppDecay = .9 + DecayInterval = 10 * time.Second + AppDecay = .9 ) type appScore struct { @@ -192,7 +179,6 @@ func NewBlossomSub( logger *zap.Logger, ) *BlossomSub { ctx := context.Background() - p2pConfig = withDefaults(p2pConfig) opts := []libp2pconfig.Option{ libp2p.ListenAddrStrings(p2pConfig.ListenMultiaddr), @@ -1043,144 +1029,6 @@ func (b *BlossomSub) SignMessage(msg []byte) ([]byte, error) { return sig, errors.Wrap(err, "sign message") } -func withDefaults(p2pConfig *config.P2PConfig) *config.P2PConfig { - cfg := *p2pConfig - p2pConfig = &cfg - if p2pConfig.D == 0 { - p2pConfig.D = blossomsub.BlossomSubD - } - if p2pConfig.DLo == 0 { - p2pConfig.DLo = blossomsub.BlossomSubDlo - } - if p2pConfig.DHi == 0 { - p2pConfig.DHi = blossomsub.BlossomSubDhi - } - if p2pConfig.DScore == 0 { - p2pConfig.DScore = blossomsub.BlossomSubDscore - } - if p2pConfig.DOut == 0 { - p2pConfig.DOut = blossomsub.BlossomSubDout - } - if p2pConfig.HistoryLength == 0 { - p2pConfig.HistoryLength = blossomsub.BlossomSubHistoryLength - } - if p2pConfig.HistoryGossip == 0 { - p2pConfig.HistoryGossip = blossomsub.BlossomSubHistoryGossip - } - if p2pConfig.DLazy == 0 { - p2pConfig.DLazy = blossomsub.BlossomSubDlazy - } - if p2pConfig.GossipFactor == 0 { - p2pConfig.GossipFactor = blossomsub.BlossomSubGossipFactor - } - if p2pConfig.GossipRetransmission == 0 { - p2pConfig.GossipRetransmission = blossomsub.BlossomSubGossipRetransmission - } - if p2pConfig.HeartbeatInitialDelay == 0 { - p2pConfig.HeartbeatInitialDelay = blossomsub.BlossomSubHeartbeatInitialDelay - } - if p2pConfig.HeartbeatInterval == 0 { - p2pConfig.HeartbeatInterval = blossomsub.BlossomSubHeartbeatInterval - } - if p2pConfig.FanoutTTL == 0 { - p2pConfig.FanoutTTL = blossomsub.BlossomSubFanoutTTL - } - if p2pConfig.PrunePeers == 0 { - p2pConfig.PrunePeers = blossomsub.BlossomSubPrunePeers - } - if p2pConfig.PruneBackoff == 0 { - p2pConfig.PruneBackoff = blossomsub.BlossomSubPruneBackoff - } - if p2pConfig.UnsubscribeBackoff == 0 { - p2pConfig.UnsubscribeBackoff = blossomsub.BlossomSubUnsubscribeBackoff - } - if p2pConfig.Connectors == 0 { - p2pConfig.Connectors = blossomsub.BlossomSubConnectors - } - if p2pConfig.MaxPendingConnections == 0 { - p2pConfig.MaxPendingConnections = blossomsub.BlossomSubMaxPendingConnections - } - if p2pConfig.ConnectionTimeout == 0 { - p2pConfig.ConnectionTimeout = blossomsub.BlossomSubConnectionTimeout - } - if p2pConfig.DirectConnectTicks == 0 { - p2pConfig.DirectConnectTicks = blossomsub.BlossomSubDirectConnectTicks - } - if p2pConfig.DirectConnectInitialDelay == 0 { - p2pConfig.DirectConnectInitialDelay = - blossomsub.BlossomSubDirectConnectInitialDelay - } - if p2pConfig.OpportunisticGraftTicks == 0 { - p2pConfig.OpportunisticGraftTicks = - blossomsub.BlossomSubOpportunisticGraftTicks - } - if p2pConfig.OpportunisticGraftPeers == 0 { - p2pConfig.OpportunisticGraftPeers = - blossomsub.BlossomSubOpportunisticGraftPeers - } - if p2pConfig.GraftFloodThreshold == 0 { - p2pConfig.GraftFloodThreshold = blossomsub.BlossomSubGraftFloodThreshold - } - if p2pConfig.MaxIHaveLength == 0 { - p2pConfig.MaxIHaveLength = blossomsub.BlossomSubMaxIHaveLength - } - if p2pConfig.MaxIHaveMessages == 0 { - p2pConfig.MaxIHaveMessages = blossomsub.BlossomSubMaxIHaveMessages - } - if p2pConfig.MaxIDontWantMessages == 0 { - p2pConfig.MaxIDontWantMessages = blossomsub.BlossomSubMaxIDontWantMessages - } - if p2pConfig.IWantFollowupTime == 0 { - p2pConfig.IWantFollowupTime = blossomsub.BlossomSubIWantFollowupTime - } - if p2pConfig.IDontWantMessageThreshold == 0 { - p2pConfig.IDontWantMessageThreshold = blossomsub.BlossomSubIDontWantMessageThreshold - } - if p2pConfig.IDontWantMessageTTL == 0 { - p2pConfig.IDontWantMessageTTL = blossomsub.BlossomSubIDontWantMessageTTL - } - if p2pConfig.LowWatermarkConnections == 0 { - p2pConfig.LowWatermarkConnections = defaultLowWatermarkConnections - } - if p2pConfig.HighWatermarkConnections == 0 { - p2pConfig.HighWatermarkConnections = defaultHighWatermarkConnections - } - if p2pConfig.MinBootstrapPeers == 0 { - p2pConfig.MinBootstrapPeers = defaultMinBootstrapPeers - } - if p2pConfig.BootstrapParallelism == 0 { - p2pConfig.BootstrapParallelism = defaultBootstrapParallelism - } - if p2pConfig.DiscoveryParallelism == 0 { - p2pConfig.DiscoveryParallelism = defaultDiscoveryParallelism - } - if p2pConfig.DiscoveryPeerLookupLimit == 0 { - p2pConfig.DiscoveryPeerLookupLimit = defaultDiscoveryPeerLookupLimit - } - if p2pConfig.PingTimeout == 0 { - p2pConfig.PingTimeout = defaultPingTimeout - } - if p2pConfig.PingPeriod == 0 { - p2pConfig.PingPeriod = defaultPingPeriod - } - if p2pConfig.PingAttempts == 0 { - p2pConfig.PingAttempts = defaultPingAttempts - } - if p2pConfig.ValidateQueueSize == 0 { - p2pConfig.ValidateQueueSize = blossomsub.DefaultValidateQueueSize - } - if p2pConfig.ValidateWorkers == 0 { - p2pConfig.ValidateWorkers = qruntime.WorkerCount(0, false) - } - if p2pConfig.SubscriptionQueueSize == 0 { - p2pConfig.SubscriptionQueueSize = blossomsub.DefaultSubscriptionQueueSize - } - if p2pConfig.PeerOutboundQueueSize == 0 { - p2pConfig.PeerOutboundQueueSize = blossomsub.DefaultPeerOutboundQueueSize - } - return p2pConfig -} - func toBlossomSubParams(p2pConfig *config.P2PConfig) blossomsub.BlossomSubParams { return blossomsub.BlossomSubParams{ D: p2pConfig.D, From 5ad9bdc468704ac10aea267bd4865ac6f525a741 Mon Sep 17 00:00:00 2001 From: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> Date: Wed, 11 Dec 2024 02:11:09 +0100 Subject: [PATCH 6/6] Change default GOGC to 10 (#409) * Set default GOGC to 10 * Do not skip frame prover trie mutex --- node/consensus/data/message_handler.go | 2 +- node/main.go | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/node/consensus/data/message_handler.go b/node/consensus/data/message_handler.go index 3ea9d38..23e856a 100644 --- a/node/consensus/data/message_handler.go +++ b/node/consensus/data/message_handler.go @@ -110,7 +110,7 @@ func (e *DataClockConsensusEngine) runTxMessageHandler() { continue } - if e.frameProverTries[0].Contains(e.provingKeyAddress) { + if e.FrameProverTrieContains(0, e.provingKeyAddress) { wg := &sync.WaitGroup{} for name := range e.executionEngines { name := name diff --git a/node/main.go b/node/main.go index 25b1b1d..7b43dbf 100644 --- a/node/main.go +++ b/node/main.go @@ -467,6 +467,9 @@ func main() { if _, explicitGOMEMLIMIT := os.LookupEnv("GOMEMLIMIT"); !explicitGOMEMLIMIT { rdebug.SetMemoryLimit(availableOverhead * 8 / 10) } + if _, explicitGOGC := os.LookupEnv("GOGC"); !explicitGOGC { + rdebug.SetGCPercent(10) + } } }