From 89c85422bee94653ff4cff1f505cebbd2a27f8cd Mon Sep 17 00:00:00 2001 From: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> Date: Sun, 8 Dec 2024 04:12:26 +0100 Subject: [PATCH 1/4] Stop gRPC and HTTP servers on shutdown (#408) * Stop gRPC and HTTP servers on shutdown * Wait for executor to register --- node/app/node.go | 11 +- .../data/data_clock_consensus_engine.go | 53 +++++---- node/main.go | 19 ++-- node/rpc/node_rpc_server.go | 101 +++++++++++------- 4 files changed, 114 insertions(+), 70 deletions(-) diff --git a/node/app/node.go b/node/app/node.go index 6254cc8..e0ed4ae 100644 --- a/node/app/node.go +++ b/node/app/node.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "errors" "fmt" + "sync" "go.uber.org/zap" "golang.org/x/crypto/sha3" @@ -162,9 +163,17 @@ func (n *Node) Start() { } // TODO: add config mapping to engine name/frame registration + wg := sync.WaitGroup{} for _, e := range n.execEngines { - n.engine.RegisterExecutor(e, 0) + wg.Add(1) + go func(e execution.ExecutionEngine) { + defer wg.Done() + if err := <-n.engine.RegisterExecutor(e, 0); err != nil { + panic(err) + } + }(e) } + wg.Wait() } func (n *Node) Stop() { diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index 66a8804..801714d 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -72,6 +72,8 @@ type DataClockConsensusEngine struct { cancel context.CancelFunc wg sync.WaitGroup + grpcServers []*grpc.Server + lastProven uint64 difficulty uint32 config *config.Config @@ -349,38 +351,40 @@ func (e *DataClockConsensusEngine) Start() <-chan error { e.pubSub.Subscribe(e.frameFragmentFilter, e.handleFrameFragmentMessage) e.pubSub.Subscribe(e.txFilter, e.handleTxMessage) e.pubSub.Subscribe(e.infoFilter, e.handleInfoMessage) + + syncServer := qgrpc.NewServer( + grpc.MaxSendMsgSize(40*1024*1024), + grpc.MaxRecvMsgSize(40*1024*1024), + ) + e.grpcServers = append(e.grpcServers[:0:0], syncServer) + protobufs.RegisterDataServiceServer(syncServer, e) go func() { - server := qgrpc.NewServer( - grpc.MaxSendMsgSize(40*1024*1024), - grpc.MaxRecvMsgSize(40*1024*1024), - ) - protobufs.RegisterDataServiceServer(server, e) if err := e.pubSub.StartDirectChannelListener( e.pubSub.GetPeerID(), "sync", - server, + syncServer, ); err != nil { - panic(err) + e.logger.Error("error starting sync server", zap.Error(err)) } }() - go func() { - if e.dataTimeReel.GetFrameProverTries()[0].Contains(e.provingKeyAddress) { - server := qgrpc.NewServer( - grpc.MaxSendMsgSize(1*1024*1024), - grpc.MaxRecvMsgSize(1*1024*1024), - ) - protobufs.RegisterDataServiceServer(server, e) - + if e.FrameProverTrieContains(0, e.provingKeyAddress) { + workerServer := qgrpc.NewServer( + grpc.MaxSendMsgSize(1*1024*1024), + grpc.MaxRecvMsgSize(1*1024*1024), + ) + e.grpcServers = append(e.grpcServers, workerServer) + protobufs.RegisterDataServiceServer(workerServer, e) + go func() { if err := e.pubSub.StartDirectChannelListener( e.pubSub.GetPeerID(), "worker", - server, + workerServer, ); err != nil { - panic(err) + e.logger.Error("error starting worker server", zap.Error(err)) } - } - }() + }() + } e.stateMx.Lock() e.state = consensus.EngineStateCollecting @@ -661,6 +665,16 @@ func (e *DataClockConsensusEngine) PerformTimeProof( } func (e *DataClockConsensusEngine) Stop(force bool) <-chan error { + wg := sync.WaitGroup{} + wg.Add(len(e.grpcServers)) + for _, server := range e.grpcServers { + go func(server *grpc.Server) { + defer wg.Done() + server.GracefulStop() + }(server) + } + wg.Wait() + e.logger.Info("stopping ceremony consensus engine") e.cancel() e.wg.Wait() @@ -684,7 +698,6 @@ func (e *DataClockConsensusEngine) Stop(force bool) <-chan error { e.logger.Warn("error publishing prover pause", zap.Error(err)) } - wg := sync.WaitGroup{} wg.Add(len(e.executionEngines)) executionErrors := make(chan error, len(e.executionEngines)) for name := range e.executionEngines { diff --git a/node/main.go b/node/main.go index 35154ce..32ce019 100644 --- a/node/main.go +++ b/node/main.go @@ -467,6 +467,7 @@ func main() { if !*integrityCheck { go spawnDataWorkers(nodeConfig) + defer stopDataWorkers() } kzg.Init() @@ -510,6 +511,9 @@ func main() { // runtime.GOMAXPROCS(1) + node.Start() + defer node.Stop() + if nodeConfig.ListenGRPCMultiaddr != "" { srv, err := rpc.NewRPCServer( nodeConfig.ListenGRPCMultiaddr, @@ -526,20 +530,13 @@ func main() { if err != nil { panic(err) } - - go func() { - err := srv.Start() - if err != nil { - panic(err) - } - }() + if err := srv.Start(); err != nil { + panic(err) + } + defer srv.Stop() } - node.Start() - <-done - stopDataWorkers() - node.Stop() } var dataWorkers []*exec.Cmd diff --git a/node/rpc/node_rpc_server.go b/node/rpc/node_rpc_server.go index 568b836..be1c5ba 100644 --- a/node/rpc/node_rpc_server.go +++ b/node/rpc/node_rpc_server.go @@ -6,6 +6,7 @@ import ( "math/big" "net/http" "strings" + "sync" "time" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" @@ -43,6 +44,8 @@ type RPCServer struct { pubSub p2p.PubSub masterClock *master.MasterClockConsensusEngine executionEngines []execution.ExecutionEngine + grpcServer *grpc.Server + httpServer *http.Server } // GetFrameInfo implements protobufs.NodeServiceServer. @@ -384,7 +387,33 @@ func NewRPCServer( masterClock *master.MasterClockConsensusEngine, executionEngines []execution.ExecutionEngine, ) (*RPCServer, error) { - return &RPCServer{ + mg, err := multiaddr.NewMultiaddr(listenAddrGRPC) + if err != nil { + return nil, errors.Wrap(err, "new rpc server") + } + mga, err := mn.ToNetAddr(mg) + if err != nil { + return nil, errors.Wrap(err, "new rpc server") + } + + mux := runtime.NewServeMux() + opts := qgrpc.ClientOptions( + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(600*1024*1024), + grpc.MaxCallSendMsgSize(600*1024*1024), + ), + ) + if err := protobufs.RegisterNodeServiceHandlerFromEndpoint( + context.Background(), + mux, + mga.String(), + opts, + ); err != nil { + return nil, err + } + + rpcServer := &RPCServer{ listenAddrGRPC: listenAddrGRPC, listenAddrHTTP: listenAddrHTTP, logger: logger, @@ -395,17 +424,22 @@ func NewRPCServer( pubSub: pubSub, masterClock: masterClock, executionEngines: executionEngines, - }, nil + grpcServer: qgrpc.NewServer( + grpc.MaxRecvMsgSize(600*1024*1024), + grpc.MaxSendMsgSize(600*1024*1024), + ), + httpServer: &http.Server{ + Handler: mux, + }, + } + + protobufs.RegisterNodeServiceServer(rpcServer.grpcServer, rpcServer) + reflection.Register(rpcServer.grpcServer) + + return rpcServer, nil } func (r *RPCServer) Start() error { - s := qgrpc.NewServer( - grpc.MaxRecvMsgSize(600*1024*1024), - grpc.MaxSendMsgSize(600*1024*1024), - ) - protobufs.RegisterNodeServiceServer(s, r) - reflection.Register(s) - mg, err := multiaddr.NewMultiaddr(r.listenAddrGRPC) if err != nil { return errors.Wrap(err, "start") @@ -417,51 +451,42 @@ func (r *RPCServer) Start() error { } go func() { - if err := s.Serve(mn.NetListener(lis)); err != nil { - panic(err) + if err := r.grpcServer.Serve(mn.NetListener(lis)); err != nil { + r.logger.Error("serve error", zap.Error(err)) } }() if r.listenAddrHTTP != "" { - m, err := multiaddr.NewMultiaddr(r.listenAddrHTTP) + mh, err := multiaddr.NewMultiaddr(r.listenAddrHTTP) if err != nil { return errors.Wrap(err, "start") } - ma, err := mn.ToNetAddr(m) - if err != nil { - return errors.Wrap(err, "start") - } - - mga, err := mn.ToNetAddr(mg) + lis, err := mn.Listen(mh) if err != nil { return errors.Wrap(err, "start") } go func() { - mux := runtime.NewServeMux() - opts := qgrpc.ClientOptions( - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithDefaultCallOptions( - grpc.MaxCallRecvMsgSize(600*1024*1024), - grpc.MaxCallSendMsgSize(600*1024*1024), - ), - ) - - if err := protobufs.RegisterNodeServiceHandlerFromEndpoint( - context.Background(), - mux, - mga.String(), - opts, - ); err != nil { - panic(err) - } - - if err := http.ListenAndServe(ma.String(), mux); err != nil { - panic(err) + if err := r.httpServer.Serve(mn.NetListener(lis)); err != nil && !errors.Is(err, http.ErrServerClosed) { + r.logger.Error("serve error", zap.Error(err)) } }() } return nil } + +func (r *RPCServer) Stop() { + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + r.grpcServer.GracefulStop() + }() + go func() { + defer wg.Done() + r.httpServer.Shutdown(context.Background()) + }() + wg.Wait() +} From b9f1f0a6af541795350881aee752d36e477d424b Mon Sep 17 00:00:00 2001 From: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> Date: Sun, 8 Dec 2024 04:19:04 +0100 Subject: [PATCH 2/4] Frame pruning fixes (#405) * Consistently close iterators * Prune frames in batches * Add compact on start flag --- node/consensus/data/frame_pruner.go | 13 -- node/consensus/data/main_data_loop.go | 33 ++++- node/consensus/master/peer_messaging.go | 5 +- node/main.go | 16 +++ node/rpc/node_rpc_server.go | 12 +- node/store/clock.go | 163 ++++++++++++++++-------- node/store/coin.go | 11 +- node/store/data_proof.go | 44 ++----- node/store/inmem.go | 5 +- node/store/key.go | 22 +--- node/store/peerstore.go | 4 +- 11 files changed, 177 insertions(+), 151 deletions(-) delete mode 100644 node/consensus/data/frame_pruner.go diff --git a/node/consensus/data/frame_pruner.go b/node/consensus/data/frame_pruner.go deleted file mode 100644 index dc68f58..0000000 --- a/node/consensus/data/frame_pruner.go +++ /dev/null @@ -1,13 +0,0 @@ -package data - -import "go.uber.org/zap" - -func (e *DataClockConsensusEngine) pruneFrames(maxFrame uint64) error { - e.logger.Info("pruning frames", zap.Uint64("max_frame_to_prune", maxFrame)) - err := e.clockStore.DeleteDataClockFrameRange(e.filter, 1, maxFrame) - if err != nil { - e.logger.Error("failed to prune frames", zap.Error(err)) - return err - } - return nil -} diff --git a/node/consensus/data/main_data_loop.go b/node/consensus/data/main_data_loop.go index 7167b2a..d82f2b8 100644 --- a/node/consensus/data/main_data_loop.go +++ b/node/consensus/data/main_data_loop.go @@ -103,6 +103,10 @@ func (e *DataClockConsensusEngine) runFramePruning() { e.logger.Info("frame pruning enabled, waiting for delay timeout expiry") + from := uint64(1) + maxFrames := uint64(e.config.Engine.MaxFrames) + batchSize := uint64(1000) +outer: for { select { case <-e.ctx.Done(): @@ -113,15 +117,34 @@ func (e *DataClockConsensusEngine) runFramePruning() { panic(err) } - if head.FrameNumber < uint64(e.config.Engine.MaxFrames)+1 || + if head.FrameNumber <= maxFrames || head.FrameNumber <= application.PROOF_FRAME_SENIORITY_REPAIR+1 { continue } - if err := e.pruneFrames( - head.FrameNumber - uint64(e.config.Engine.MaxFrames), - ); err != nil { - e.logger.Error("could not prune", zap.Error(err)) + to := head.FrameNumber - maxFrames + for i := from; i < to; i += batchSize { + start, stop := i, min(i+batchSize, to) + if err := e.clockStore.DeleteDataClockFrameRange(e.filter, start, stop); err != nil { + e.logger.Error( + "failed to prune frames", + zap.Error(err), + zap.Uint64("from", start), + zap.Uint64("to", stop), + ) + continue outer + } + e.logger.Info( + "pruned frames", + zap.Uint64("from", start), + zap.Uint64("to", stop), + ) + select { + case <-e.ctx.Done(): + return + default: + } + from = stop } } } diff --git a/node/consensus/master/peer_messaging.go b/node/consensus/master/peer_messaging.go index 4eac311..1c62bd8 100644 --- a/node/consensus/master/peer_messaging.go +++ b/node/consensus/master/peer_messaging.go @@ -69,6 +69,7 @@ func (e *MasterClockConsensusEngine) Sync( if err != nil { return errors.Wrap(err, "sync") } + defer iter.Close() response := []*protobufs.ClockFrame{} @@ -81,10 +82,6 @@ func (e *MasterClockConsensusEngine) Sync( response = append(response, frame) } - if err = iter.Close(); err != nil { - return errors.Wrap(err, "sync") - } - if len(response) == 0 { return nil } diff --git a/node/main.go b/node/main.go index 32ce019..ee60fa9 100644 --- a/node/main.go +++ b/node/main.go @@ -144,6 +144,11 @@ var ( true, "when enabled, frame execution validation is skipped", ) + compactDB = flag.Bool( + "compact-db", + false, + "compacts the database and exits", + ) ) func signatureCheckDefault() bool { @@ -330,6 +335,17 @@ func main() { panic(err) } + if *compactDB && *core == 0 { + db := store.NewPebbleDB(nodeConfig.DB) + if err := db.CompactAll(); err != nil { + panic(err) + } + if err := db.Close(); err != nil { + panic(err) + } + return + } + if *network != 0 { if nodeConfig.P2P.BootstrapPeers[0] == config.BootstrapPeers[0] { fmt.Println( diff --git a/node/rpc/node_rpc_server.go b/node/rpc/node_rpc_server.go index be1c5ba..741ad87 100644 --- a/node/rpc/node_rpc_server.go +++ b/node/rpc/node_rpc_server.go @@ -97,21 +97,17 @@ func (r *RPCServer) GetFrames( if err != nil { return nil, errors.Wrap(err, "get frames") } + defer iter.Close() frames := []*protobufs.ClockFrame{} for iter.First(); iter.Valid(); iter.Next() { frame, err := iter.Value() if err != nil { - iter.Close() return nil, errors.Wrap(err, "get frames") } frames = append(frames, frame) } - if err := iter.Close(); err != nil { - return nil, errors.Wrap(err, "get frames") - } - return &protobufs.FramesResponse{ TruncatedClockFrames: frames, }, nil @@ -124,21 +120,17 @@ func (r *RPCServer) GetFrames( if err != nil { return nil, errors.Wrap(err, "get frame info") } + defer iter.Close() frames := []*protobufs.ClockFrame{} for iter.First(); iter.Valid(); iter.Next() { frame, err := iter.TruncatedValue() if err != nil { - iter.Close() return nil, errors.Wrap(err, "get frames") } frames = append(frames, frame) } - if err := iter.Close(); err != nil { - return nil, errors.Wrap(err, "get frames") - } - return &protobufs.FramesResponse{ TruncatedClockFrames: frames, }, nil diff --git a/node/store/clock.go b/node/store/clock.go index 3a24de5..0cf6078 100644 --- a/node/store/clock.go +++ b/node/store/clock.go @@ -216,13 +216,13 @@ func (p *PebbleClockIterator) TruncatedValue() ( if err != nil { return nil, errors.Wrap(err, "get truncated clock frame iterator value") } + defer frameCloser.Close() if err := proto.Unmarshal(frameValue, frame); err != nil { return nil, errors.Wrap( errors.Wrap(err, ErrInvalidData.Error()), "get truncated clock frame iterator value", ) } - frameCloser.Close() } else { if err := proto.Unmarshal(value, frame); err != nil { return nil, errors.Wrap( @@ -252,13 +252,13 @@ func (p *PebbleClockIterator) Value() (*protobufs.ClockFrame, error) { if err != nil { return nil, errors.Wrap(err, "get clock frame iterator value") } + defer frameCloser.Close() if err := proto.Unmarshal(frameValue, frame); err != nil { return nil, errors.Wrap( errors.Wrap(err, ErrInvalidData.Error()), "get clock frame iterator value", ) } - defer frameCloser.Close() } else { if err := proto.Unmarshal(value, frame); err != nil { return nil, errors.Wrap( @@ -469,8 +469,8 @@ func (p *PebbleClockStore) GetEarliestMasterClockFrame( return nil, errors.Wrap(err, "get earliest master clock frame") } - defer closer.Close() + frameNumber := binary.BigEndian.Uint64(idxValue) frame, err := p.GetMasterClockFrame(filter, frameNumber) if err != nil { @@ -492,8 +492,8 @@ func (p *PebbleClockStore) GetLatestMasterClockFrame( return nil, errors.Wrap(err, "get latest master clock frame") } - defer closer.Close() + frameNumber := binary.BigEndian.Uint64(idxValue) frame, err := p.GetMasterClockFrame(filter, frameNumber) if err != nil { @@ -516,11 +516,11 @@ func (p *PebbleClockStore) GetMasterClockFrame( return nil, errors.Wrap(err, "get master clock frame") } + defer closer.Close() copied := make([]byte, len(value)) copy(copied[:], value[:]) - defer closer.Close() frame := &protobufs.ClockFrame{} frame.FrameNumber = frameNumber frame.Filter = filter @@ -595,10 +595,8 @@ func (p *PebbleClockStore) PutMasterClockFrame( ); err != nil { return errors.Wrap(err, "put master clock frame") } - } - - if err == nil && closer != nil { - closer.Close() + } else { + _ = closer.Close() } if err = txn.Set( @@ -625,6 +623,7 @@ func (p *PebbleClockStore) GetDataClockFrame( return nil, nil, errors.Wrap(err, "get data clock frame") } + defer closer.Close() frame := &protobufs.ClockFrame{} genesisFramePreIndex := false @@ -641,14 +640,13 @@ func (p *PebbleClockStore) GetDataClockFrame( return nil, nil, errors.Wrap(err, "get data clock frame") } + defer frameCloser.Close() if err := proto.Unmarshal(frameValue, frame); err != nil { return nil, nil, errors.Wrap( errors.Wrap(err, ErrInvalidData.Error()), "get data clock frame", ) } - closer.Close() - defer frameCloser.Close() } else { genesisFramePreIndex = frameNumber == 0 if err := proto.Unmarshal(value, frame); err != nil { @@ -657,7 +655,6 @@ func (p *PebbleClockStore) GetDataClockFrame( "get data clock frame", ) } - defer closer.Close() } if !truncate { @@ -674,15 +671,17 @@ func (p *PebbleClockStore) GetDataClockFrame( proverTrie := &tries.RollingFrecencyCritbitTrie{} trieData, closer, err := p.db.Get(clockProverTrieKey(filter, i, frameNumber)) if err != nil { + if !errors.Is(err, pebble.ErrNotFound) { + return nil, nil, errors.Wrap(err, "get data clock frame") + } break } + defer closer.Close() if err := proverTrie.Deserialize(trieData); err != nil { - closer.Close() return nil, nil, errors.Wrap(err, "get latest data clock frame") } - closer.Close() i++ proverTries = append(proverTries, proverTrie) } @@ -726,7 +725,6 @@ func (p *PebbleClockStore) deleteAggregateProofs( for i := 0; i < len(frame.Input[516:])/74; i++ { commit := frame.Input[516+(i*74) : 516+((i+1)*74)] err := internalDeleteAggregateProof( - p.db, txn, frame.AggregateProofs[i], commit, @@ -757,7 +755,6 @@ func (p *PebbleClockStore) saveAggregateProofs( for i := 0; i < len(frame.Input[516:])/74; i++ { commit := frame.Input[516+(i*74) : 516+((i+1)*74)] err := internalPutAggregateProof( - p.db, txn, frame.AggregateProofs[i], commit, @@ -788,8 +785,8 @@ func (p *PebbleClockStore) GetEarliestDataClockFrame( return nil, errors.Wrap(err, "get earliest data clock frame") } - defer closer.Close() + frameNumber := binary.BigEndian.Uint64(idxValue) frame, _, err := p.GetDataClockFrame(filter, frameNumber, false) if err != nil { @@ -811,6 +808,7 @@ func (p *PebbleClockStore) GetLatestDataClockFrame( return nil, nil, errors.Wrap(err, "get latest data clock frame") } + defer closer.Close() frameNumber := binary.BigEndian.Uint64(idxValue) frame, tries, err := p.GetDataClockFrame(filter, frameNumber, false) @@ -822,8 +820,6 @@ func (p *PebbleClockStore) GetLatestDataClockFrame( return nil, nil, errors.Wrap(err, "get latest data clock frame") } - closer.Close() - return frame, tries, nil } @@ -843,6 +839,7 @@ func (p *PebbleClockStore) GetStagedDataClockFrame( } return nil, errors.Wrap(err, "get parent data clock frame") } + defer closer.Close() parent := &protobufs.ClockFrame{} if err := proto.Unmarshal(data, parent); err != nil { @@ -858,10 +855,6 @@ func (p *PebbleClockStore) GetStagedDataClockFrame( } } - if closer != nil { - closer.Close() - } - return parent, nil } @@ -879,9 +872,9 @@ func (p *PebbleClockStore) GetStagedDataClockFramesForFrameNumber( } return nil, errors.Wrap(err, "get staged data clock frames") } + defer iter.Close() frames := []*protobufs.ClockFrame{} - for iter.First(); iter.Valid(); iter.Next() { data := iter.Value() frame := &protobufs.ClockFrame{} @@ -899,8 +892,6 @@ func (p *PebbleClockStore) GetStagedDataClockFramesForFrameNumber( frames = append(frames, frame) } - iter.Close() - return frames, nil } @@ -992,10 +983,8 @@ func (p *PebbleClockStore) CommitDataClockFrame( ); err != nil { return errors.Wrap(err, "commit data clock frame") } - } - - if err == nil && closer != nil { - closer.Close() + } else { + _ = closer.Close() } if !backfill { @@ -1058,36 +1047,99 @@ func (p *PebbleClockStore) DeleteDataClockFrameRange( for i := fromFrameNumber; i < toFrameNumber; i++ { frames, err := p.GetStagedDataClockFramesForFrameNumber(filter, i) if err != nil { - continue + if !errors.Is(err, ErrNotFound) && !errors.Is(err, ErrInvalidData) { + _ = txn.Abort() + return errors.Wrap(err, "delete data clock frame range") + } + frames = nil } + outer: for _, frame := range frames { - err = p.deleteAggregateProofs(txn, frame) - if err != nil { - continue + for _, ap := range frame.AggregateProofs { + for _, inc := range ap.InclusionCommitments { + // The commitments collide for very small frames, and as such we have to detect them early + // and avoid deleting them. Common cases for such collisions are prover announcement messages + // which do not contain the frame number, so their binary contents are equivalent between + // multiple frames. + if len(inc.Data) < 2048 { + continue outer + } + if inc.TypeUrl == protobufs.IntrinsicExecutionOutputType { + o := &protobufs.IntrinsicExecutionOutput{} + if err := proto.Unmarshal(inc.Data, o); err != nil { + _ = txn.Abort() + return errors.Wrap(err, "delete data clock frame range") + } + // The commitments collide for empty frames, and as such we have to detect them early + // and avoid deleting them. + if len(o.Output) == 0 || len(o.Proof) == 0 { + continue outer + } + } + } + } + if err := p.deleteAggregateProofs(txn, frame); err != nil { + if !errors.Is(err, pebble.ErrNotFound) { + _ = txn.Abort() + return errors.Wrap(err, "delete data clock frame range") + } } } - err = txn.DeleteRange( + if err := txn.DeleteRange( clockDataParentIndexKey(filter, i, bytes.Repeat([]byte{0x00}, 32)), clockDataParentIndexKey(filter, i, bytes.Repeat([]byte{0xff}, 32)), - ) - if err != nil { - continue + ); err != nil { + if !errors.Is(err, pebble.ErrNotFound) { + _ = txn.Abort() + return errors.Wrap(err, "delete data clock frame range") + } } - err = txn.Delete(clockDataFrameKey(filter, i)) - if err != nil { - continue + if err := txn.Delete(clockDataFrameKey(filter, i)); err != nil { + if !errors.Is(err, pebble.ErrNotFound) { + _ = txn.Abort() + return errors.Wrap(err, "delete data clock frame range") + } + } + + // The prover trie keys are not stored continuously with respect + // to the same frame number. As such, we need to manually iterate + // and discover such keys. + for t := uint16(0); t <= 0xffff; t++ { + _, closer, err := p.db.Get(clockProverTrieKey(filter, t, i)) + if err != nil { + if !errors.Is(err, pebble.ErrNotFound) { + _ = txn.Abort() + return errors.Wrap(err, "delete data clock frame range") + } else { + break + } + } + _ = closer.Close() + if err := txn.Delete(clockProverTrieKey(filter, t, i)); err != nil { + _ = txn.Abort() + return errors.Wrap(err, "delete data clock frame range") + } + } + + if err := txn.DeleteRange( + clockDataTotalDistanceKey(filter, i, bytes.Repeat([]byte{0x00}, 32)), + clockDataTotalDistanceKey(filter, i, bytes.Repeat([]byte{0xff}, 32)), + ); err != nil { + if !errors.Is(err, pebble.ErrNotFound) { + _ = txn.Abort() + return errors.Wrap(err, "delete data clock frame range") + } } } - if err = txn.Commit(); err != nil { - txn.Abort() + if err := txn.Commit(); err != nil { return errors.Wrap(err, "delete data clock frame range") } - return errors.Wrap(err, "delete data clock frame range") + return nil } func (p *PebbleClockStore) ResetMasterClockFrames(filter []byte) error { @@ -1138,7 +1190,7 @@ func (p *PebbleClockStore) Compact( if bytes.Compare(version, config.GetVersion()) < 0 { cleared = false } - closer.Close() + defer closer.Close() } if !cleared { @@ -1190,11 +1242,10 @@ func (p *PebbleClockStore) Compact( return errors.Wrap(err, "compact") } + defer closer.Close() last := binary.BigEndian.Uint64(idxValue) - closer.Close() - for frameNumber := uint64(1); frameNumber <= last; frameNumber++ { value, closer, err := p.db.Get(clockDataFrameKey(dataFilter, frameNumber)) if err != nil { @@ -1204,6 +1255,7 @@ func (p *PebbleClockStore) Compact( return errors.Wrap(err, "compact") } + defer closer.Close() frame := &protobufs.ClockFrame{} @@ -1212,6 +1264,7 @@ func (p *PebbleClockStore) Compact( if err != nil { return errors.Wrap(err, "compact") } + defer frameCloser.Close() if err := proto.Unmarshal(frameValue, frame); err != nil { return errors.Wrap(err, "compact") } @@ -1226,9 +1279,6 @@ func (p *PebbleClockStore) Compact( make([]byte, 32), )), ) - - closer.Close() - frameCloser.Close() } else { if err := proto.Unmarshal(value, frame); err != nil { return errors.Wrap(err, "compact") @@ -1255,14 +1305,15 @@ func (p *PebbleClockStore) Compact( make([]byte, 32), )), ) + if err != nil { + return errors.Wrap(err, "compact") + } parents = append(parents, clockDataParentIndexKey(dataFilter, frameNumber, selector.FillBytes( make([]byte, 32), )), ) - - closer.Close() } for i := 0; i < len(frame.Input[516:])/74; i++ { @@ -1523,10 +1574,8 @@ func (p *PebbleClockStore) GetTotalDistance( return nil, errors.Wrap(err, "get total distance") } - defer closer.Close() dist := new(big.Int).SetBytes(value) - return dist, nil } @@ -1564,7 +1613,6 @@ func (p *PebbleClockStore) GetPeerSeniorityMap(filter []byte) ( if err = dec.Decode(&seniorityMap); err != nil { return nil, errors.Wrap(err, "get peer seniority map") } - return seniorityMap, nil } @@ -1612,9 +1660,12 @@ func (p *PebbleClockStore) SetProverTriesForFrame( clockProverTrieKey(frame.Filter, uint16(start), frame.FrameNumber), ) if err != nil { + if !errors.Is(err, pebble.ErrNotFound) { + return errors.Wrap(err, "set prover tries for frame") + } break } - closer.Close() + _ = closer.Close() if err = p.db.Delete( clockProverTrieKey(frame.Filter, uint16(start), frame.FrameNumber), diff --git a/node/store/coin.go b/node/store/coin.go index d337a8f..e152d82 100644 --- a/node/store/coin.go +++ b/node/store/coin.go @@ -136,8 +136,8 @@ func (p *PebbleCoinStore) GetCoinsForOwner( err = errors.Wrap(err, "get coins for owner") return nil, nil, nil, err } - defer iter.Close() + frameNumbers := []uint64{} addresses := [][]byte{} coins := []*protobufs.Coin{} @@ -176,8 +176,8 @@ func (p *PebbleCoinStore) GetPreCoinProofsForOwner(owner []byte) ( err = errors.Wrap(err, "get pre coin proofs for owner") return nil, nil, err } - defer iter.Close() + frameNumbers := []uint64{} proofs := []*protobufs.PreCoinProof{} for iter.First(); iter.Valid(); iter.Next() { @@ -215,7 +215,6 @@ func (p *PebbleCoinStore) GetCoinByAddress(txn Transaction, address []byte) ( err = errors.Wrap(err, "get coin by address") return nil, err } - defer closer.Close() coin := &protobufs.Coin{} @@ -240,7 +239,6 @@ func (p *PebbleCoinStore) GetPreCoinProofByAddress(address []byte) ( err = errors.Wrap(err, "get pre coin proof by address") return nil, err } - defer closer.Close() proof := &protobufs.PreCoinProof{} @@ -386,9 +384,9 @@ func (p *PebbleCoinStore) GetLatestFrameProcessed() (uint64, error) { return 0, errors.Wrap(err, "get latest frame processed") } + defer closer.Close() frameNumber := binary.BigEndian.Uint64(v) - closer.Close() return frameNumber, nil } @@ -524,13 +522,12 @@ func (p *PebbleCoinStore) Migrate(filter []byte, genesisSeedHex string) error { } return p.internalMigrate(filter, seed) } + defer closer.Close() if !bytes.Equal(compare, seed) { return p.internalMigrate(filter, seed) } - closer.Close() - status, closer, err := p.db.Get(migrationKey()) if err != nil { if !errors.Is(err, pebble.ErrNotFound) { diff --git a/node/store/data_proof.go b/node/store/data_proof.go index b2c6336..d079b7a 100644 --- a/node/store/data_proof.go +++ b/node/store/data_proof.go @@ -139,8 +139,8 @@ func internalGetAggregateProof( return nil, errors.Wrap(err, "get aggregate proof") } - defer closer.Close() + copied := make([]byte, len(value[8:])) limit := binary.BigEndian.Uint64(value[0:8]) copy(copied, value[8:]) @@ -159,6 +159,7 @@ func internalGetAggregateProof( if err != nil { return nil, errors.Wrap(err, "get aggregate proof") } + defer iter.Close() i := uint32(0) @@ -199,14 +200,11 @@ func internalGetAggregateProof( return nil, errors.Wrap(err, "get aggregate proof") } + defer dataCloser.Close() segCopy := make([]byte, len(segValue)) copy(segCopy, segValue) chunks = append(chunks, segCopy) - - if err = dataCloser.Close(); err != nil { - return nil, errors.Wrap(err, "get aggregate proof") - } } if string(url) == protobufs.IntrinsicExecutionOutputType { @@ -236,10 +234,6 @@ func internalGetAggregateProof( i++ } - if err = iter.Close(); err != nil { - return nil, errors.Wrap(err, "get aggregate proof") - } - return aggregate, nil } @@ -263,8 +257,8 @@ func internalListAggregateProofKeys( return nil, nil, nil, errors.Wrap(err, "list aggregate proof") } - defer closer.Close() + copied := make([]byte, len(value[8:])) limit := binary.BigEndian.Uint64(value[0:8]) copy(copied, value[8:]) @@ -278,6 +272,7 @@ func internalListAggregateProofKeys( return nil, nil, nil, errors.Wrap(err, "list aggregate proof") } + defer iter.Close() i := uint32(0) commits = append(commits, dataProofInclusionKey(filter, commitment, 0)) @@ -305,10 +300,6 @@ func internalListAggregateProofKeys( i++ } - if err = iter.Close(); err != nil { - return nil, nil, nil, errors.Wrap(err, "list aggregate proof") - } - return proofs, commits, data, nil } @@ -326,17 +317,10 @@ func (p *PebbleDataProofStore) GetAggregateProof( } func internalDeleteAggregateProof( - db KVDB, txn Transaction, aggregateProof *protobufs.InclusionAggregateProof, commitment []byte, ) error { - buf := binary.BigEndian.AppendUint64( - nil, - uint64(len(aggregateProof.InclusionCommitments)), - ) - buf = append(buf, aggregateProof.Proof...) - for i, inc := range aggregateProof.InclusionCommitments { var segments [][]byte if inc.TypeUrl == protobufs.IntrinsicExecutionOutputType { @@ -378,7 +362,6 @@ func internalDeleteAggregateProof( } func internalPutAggregateProof( - db KVDB, txn Transaction, aggregateProof *protobufs.InclusionAggregateProof, commitment []byte, @@ -447,7 +430,6 @@ func (p *PebbleDataProofStore) PutAggregateProof( commitment []byte, ) error { return internalPutAggregateProof( - p.db, txn, aggregateProof, commitment, @@ -467,8 +449,8 @@ func (p *PebbleDataProofStore) GetDataTimeProof( err = errors.Wrap(err, "get data time proof") return } - defer closer.Close() + if len(data) < 24 { err = ErrInvalidData return @@ -513,13 +495,10 @@ func (p *PebbleDataProofStore) GetTotalReward( return nil, errors.Wrap(err, "get total difficulty sum") } + defer closer.Close() if len(prev) != 0 { reward.SetBytes(prev[4:]) - - if err = closer.Close(); err != nil { - return nil, errors.Wrap(err, "get total difficulty sum") - } } return reward, nil @@ -556,15 +535,12 @@ func (p *PebbleDataProofStore) PutDataTimeProof( if err != nil && (!errors.Is(err, pebble.ErrNotFound) || increment != 0) { return errors.Wrap(err, "put data time proof") } + defer closer.Close() if len(prev) != 0 { priorSum.SetBytes(prev[4:]) prevIncrement := binary.BigEndian.Uint32(prev[:4]) - if err = closer.Close(); err != nil { - return errors.Wrap(err, "put data time proof") - } - if prevIncrement != increment-1 { return errors.Wrap(errors.New("invalid increment"), "put data time proof") } @@ -609,15 +585,13 @@ func (p *PebbleDataProofStore) GetLatestDataTimeProof(peerId []byte) ( return 0, 0, nil, errors.Wrap(err, "get latest data time proof") } + defer closer.Close() if len(prev) < 4 { return 0, 0, nil, ErrInvalidData } increment = binary.BigEndian.Uint32(prev[:4]) - if err = closer.Close(); err != nil { - return 0, 0, nil, errors.Wrap(err, "get latest data time proof") - } _, parallelism, _, output, err = p.GetDataTimeProof(peerId, increment) diff --git a/node/store/inmem.go b/node/store/inmem.go index 558595c..54110a9 100644 --- a/node/store/inmem.go +++ b/node/store/inmem.go @@ -281,15 +281,13 @@ func (t *InMemKVDBTransaction) DeleteRange( if err != nil { return err } + defer iter.Close() for iter.First(); iter.Valid(); iter.Next() { t.changes = append(t.changes, InMemKVDBOperation{ op: DeleteOperation, key: iter.Key(), }) - if err != nil { - return err - } } return nil @@ -416,6 +414,7 @@ func (d *InMemKVDB) DeleteRange(start, end []byte) error { if err != nil { return err } + defer iter.Close() for iter.First(); iter.Valid(); iter.Next() { err = d.Delete(iter.Key()) diff --git a/node/store/key.go b/node/store/key.go index 1169430..a69bafc 100644 --- a/node/store/key.go +++ b/node/store/key.go @@ -270,6 +270,8 @@ func (p *PebbleKeyStore) IncludeProvingKey( staged, closer, err := p.db.Get(stagedProvingKeyKey(provingKey.PublicKey())) if err != nil && !errors.Is(err, ErrNotFound) { return errors.Wrap(err, "include proving key") + } else if err == nil { + defer closer.Close() } if staged != nil { @@ -279,9 +281,6 @@ func (p *PebbleKeyStore) IncludeProvingKey( return errors.Wrap(err, "include proving key") } } - if err := closer.Close(); err != nil { - return errors.Wrap(err, "include proving key") - } return nil } @@ -297,16 +296,13 @@ func (p *PebbleKeyStore) GetStagedProvingKey( return nil, errors.Wrap(err, "get staged proving key") } + defer closer.Close() stagedKey := &protobufs.ProvingKeyAnnouncement{} if err = proto.Unmarshal(data, stagedKey); err != nil { return nil, errors.Wrap(err, "get staged proving key") } - if err := closer.Close(); err != nil { - return nil, errors.Wrap(err, "get staged proving key") - } - return stagedKey, nil } @@ -322,12 +318,9 @@ func (p *PebbleKeyStore) GetLatestKeyBundle( return nil, errors.Wrap(err, "get latest key bundle") } + defer closer.Close() frameNumber := binary.BigEndian.Uint64(value) - if err := closer.Close(); err != nil { - return nil, errors.Wrap(err, "get latest key bundle") - } - value, closer, err = p.db.Get(keyBundleKey(provingKey, frameNumber)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { @@ -336,7 +329,6 @@ func (p *PebbleKeyStore) GetLatestKeyBundle( return nil, errors.Wrap(err, "get latest key bundle") } - defer closer.Close() announcement := &protobufs.InclusionCommitment{} @@ -440,10 +432,8 @@ func (p *PebbleKeyStore) PutKeyBundle( ); err != nil { return errors.Wrap(err, "put key bundle") } - } - - if err == nil && closer != nil { - closer.Close() + } else { + _ = closer.Close() } if err = txn.Set( diff --git a/node/store/peerstore.go b/node/store/peerstore.go index c59e260..85f0c3c 100644 --- a/node/store/peerstore.go +++ b/node/store/peerstore.go @@ -74,10 +74,10 @@ func (d *PeerstoreDatastore) Get( } return nil, err } + defer closer.Close() out := make([]byte, len(val)) copy(out[:], val[:]) - closer.Close() return val, nil } @@ -226,10 +226,10 @@ func (t *transaction) Get( } return nil, errors.Wrap(err, "get") } + defer closer.Close() out := make([]byte, len(b)) copy(out[:], b[:]) - closer.Close() return b, nil } From 9a09dc904b2148a4b2eeb450d97abfd14eeb3db8 Mon Sep 17 00:00:00 2001 From: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> Date: Sun, 8 Dec 2024 04:19:15 +0100 Subject: [PATCH 3/4] Use default peer outbound queue size (#402) --- go-libp2p-blossomsub/pubsub.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go-libp2p-blossomsub/pubsub.go b/go-libp2p-blossomsub/pubsub.go index 13e323e..ff74d89 100644 --- a/go-libp2p-blossomsub/pubsub.go +++ b/go-libp2p-blossomsub/pubsub.go @@ -276,7 +276,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option disc: &discover{}, softMaxMessageSize: DefaultSoftMaxMessageSize, hardMaxMessageSize: DefaultHardMaxMessageSize, - peerOutboundQueueSize: 32, + peerOutboundQueueSize: DefaultPeerOutboundQueueSize, signID: h.ID(), signKey: nil, signPolicy: LaxSign, From a329bdab3a0c0b2a1c601c6b36b3fa2c32281b81 Mon Sep 17 00:00:00 2001 From: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> Date: Sun, 8 Dec 2024 04:21:03 +0100 Subject: [PATCH 4/4] Limit sync candidates (#407) --- node/config/engine.go | 2 ++ node/consensus/data/consensus_frames.go | 18 +++++++++++++----- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/node/config/engine.go b/node/config/engine.go index 09b237a..b428f15 100644 --- a/node/config/engine.go +++ b/node/config/engine.go @@ -94,6 +94,8 @@ type EngineConfig struct { AutoMergeCoins bool `yaml:"autoMergeCoins"` // Maximum wait time for a frame to be downloaded from a peer. SyncTimeout time.Duration `yaml:"syncTimeout"` + // Number of candidate peers per category to sync with. + SyncCandidates int `yaml:"syncCandidates"` // Values used only for testing – do not override these in production, your // node will get kicked out diff --git a/node/consensus/data/consensus_frames.go b/node/consensus/data/consensus_frames.go index bd7c5fd..68c82b6 100644 --- a/node/consensus/data/consensus_frames.go +++ b/node/consensus/data/consensus_frames.go @@ -20,7 +20,10 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/protobufs" ) -const defaultSyncTimeout = 4 * time.Second +const ( + defaultSyncTimeout = 4 * time.Second + defaultSyncCandidates = 8 +) func (e *DataClockConsensusEngine) syncWithMesh() error { e.logger.Info("collecting vdf proofs") @@ -304,11 +307,16 @@ func (e *DataClockConsensusEngine) GetAheadPeers(frameNumber uint64) []internal. } } + syncCandidates := e.config.Engine.SyncCandidates + if syncCandidates == 0 { + syncCandidates = defaultSyncCandidates + } + return slices.Concat( - internal.WeightedSampleWithoutReplacement(nearCandidates, len(nearCandidates)), - internal.WeightedSampleWithoutReplacement(reachableCandidates, len(reachableCandidates)), - internal.WeightedSampleWithoutReplacement(unknownCandidates, len(unknownCandidates)), - internal.WeightedSampleWithoutReplacement(unreachableCandidates, len(unreachableCandidates)), + internal.WeightedSampleWithoutReplacement(nearCandidates, min(len(nearCandidates), syncCandidates)), + internal.WeightedSampleWithoutReplacement(reachableCandidates, min(len(reachableCandidates), syncCandidates)), + internal.WeightedSampleWithoutReplacement(unknownCandidates, min(len(unknownCandidates), syncCandidates)), + internal.WeightedSampleWithoutReplacement(unreachableCandidates, min(len(unreachableCandidates), syncCandidates)), ) }