Merge branch 'develop' into develop-2.1

This commit is contained in:
Cassandra Heart 2024-12-09 05:44:53 -06:00
commit b49ba62d9a
No known key found for this signature in database
GPG Key ID: 6352152859385958
16 changed files with 307 additions and 227 deletions

View File

@ -276,7 +276,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
disc: &discover{},
softMaxMessageSize: DefaultSoftMaxMessageSize,
hardMaxMessageSize: DefaultHardMaxMessageSize,
peerOutboundQueueSize: 32,
peerOutboundQueueSize: DefaultPeerOutboundQueueSize,
signID: h.ID(),
signKey: nil,
signPolicy: LaxSign,

View File

@ -4,6 +4,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"sync"
"go.uber.org/zap"
"golang.org/x/crypto/sha3"
@ -162,9 +163,17 @@ func (n *Node) Start() {
}
// TODO: add config mapping to engine name/frame registration
wg := sync.WaitGroup{}
for _, e := range n.execEngines {
n.engine.RegisterExecutor(e, 0)
wg.Add(1)
go func(e execution.ExecutionEngine) {
defer wg.Done()
if err := <-n.engine.RegisterExecutor(e, 0); err != nil {
panic(err)
}
}(e)
}
wg.Wait()
}
func (n *Node) Stop() {

View File

@ -94,6 +94,8 @@ type EngineConfig struct {
AutoMergeCoins bool `yaml:"autoMergeCoins"`
// Maximum wait time for a frame to be downloaded from a peer.
SyncTimeout time.Duration `yaml:"syncTimeout"`
// Number of candidate peers per category to sync with.
SyncCandidates int `yaml:"syncCandidates"`
// Values used only for testing do not override these in production, your
// node will get kicked out

View File

@ -20,7 +20,10 @@ import (
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
)
const defaultSyncTimeout = 4 * time.Second
const (
defaultSyncTimeout = 4 * time.Second
defaultSyncCandidates = 8
)
func (e *DataClockConsensusEngine) syncWithMesh() error {
e.logger.Info("collecting vdf proofs")
@ -304,11 +307,16 @@ func (e *DataClockConsensusEngine) GetAheadPeers(frameNumber uint64) []internal.
}
}
syncCandidates := e.config.Engine.SyncCandidates
if syncCandidates == 0 {
syncCandidates = defaultSyncCandidates
}
return slices.Concat(
internal.WeightedSampleWithoutReplacement(nearCandidates, len(nearCandidates)),
internal.WeightedSampleWithoutReplacement(reachableCandidates, len(reachableCandidates)),
internal.WeightedSampleWithoutReplacement(unknownCandidates, len(unknownCandidates)),
internal.WeightedSampleWithoutReplacement(unreachableCandidates, len(unreachableCandidates)),
internal.WeightedSampleWithoutReplacement(nearCandidates, min(len(nearCandidates), syncCandidates)),
internal.WeightedSampleWithoutReplacement(reachableCandidates, min(len(reachableCandidates), syncCandidates)),
internal.WeightedSampleWithoutReplacement(unknownCandidates, min(len(unknownCandidates), syncCandidates)),
internal.WeightedSampleWithoutReplacement(unreachableCandidates, min(len(unreachableCandidates), syncCandidates)),
)
}

View File

@ -72,6 +72,8 @@ type DataClockConsensusEngine struct {
cancel context.CancelFunc
wg sync.WaitGroup
grpcServers []*grpc.Server
lastProven uint64
difficulty uint32
config *config.Config
@ -349,38 +351,40 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
e.pubSub.Subscribe(e.frameFragmentFilter, e.handleFrameFragmentMessage)
e.pubSub.Subscribe(e.txFilter, e.handleTxMessage)
e.pubSub.Subscribe(e.infoFilter, e.handleInfoMessage)
syncServer := qgrpc.NewServer(
grpc.MaxSendMsgSize(40*1024*1024),
grpc.MaxRecvMsgSize(40*1024*1024),
)
e.grpcServers = append(e.grpcServers[:0:0], syncServer)
protobufs.RegisterDataServiceServer(syncServer, e)
go func() {
server := qgrpc.NewServer(
grpc.MaxSendMsgSize(40*1024*1024),
grpc.MaxRecvMsgSize(40*1024*1024),
)
protobufs.RegisterDataServiceServer(server, e)
if err := e.pubSub.StartDirectChannelListener(
e.pubSub.GetPeerID(),
"sync",
server,
syncServer,
); err != nil {
panic(err)
e.logger.Error("error starting sync server", zap.Error(err))
}
}()
go func() {
if e.dataTimeReel.GetFrameProverTries()[0].Contains(e.provingKeyAddress) {
server := qgrpc.NewServer(
grpc.MaxSendMsgSize(1*1024*1024),
grpc.MaxRecvMsgSize(1*1024*1024),
)
protobufs.RegisterDataServiceServer(server, e)
if e.FrameProverTrieContains(0, e.provingKeyAddress) {
workerServer := qgrpc.NewServer(
grpc.MaxSendMsgSize(1*1024*1024),
grpc.MaxRecvMsgSize(1*1024*1024),
)
e.grpcServers = append(e.grpcServers, workerServer)
protobufs.RegisterDataServiceServer(workerServer, e)
go func() {
if err := e.pubSub.StartDirectChannelListener(
e.pubSub.GetPeerID(),
"worker",
server,
workerServer,
); err != nil {
panic(err)
e.logger.Error("error starting worker server", zap.Error(err))
}
}
}()
}()
}
e.stateMx.Lock()
e.state = consensus.EngineStateCollecting
@ -661,6 +665,16 @@ func (e *DataClockConsensusEngine) PerformTimeProof(
}
func (e *DataClockConsensusEngine) Stop(force bool) <-chan error {
wg := sync.WaitGroup{}
wg.Add(len(e.grpcServers))
for _, server := range e.grpcServers {
go func(server *grpc.Server) {
defer wg.Done()
server.GracefulStop()
}(server)
}
wg.Wait()
e.logger.Info("stopping ceremony consensus engine")
e.cancel()
e.wg.Wait()
@ -684,7 +698,6 @@ func (e *DataClockConsensusEngine) Stop(force bool) <-chan error {
e.logger.Warn("error publishing prover pause", zap.Error(err))
}
wg := sync.WaitGroup{}
wg.Add(len(e.executionEngines))
executionErrors := make(chan error, len(e.executionEngines))
for name := range e.executionEngines {

View File

@ -1,13 +0,0 @@
package data
import "go.uber.org/zap"
func (e *DataClockConsensusEngine) pruneFrames(maxFrame uint64) error {
e.logger.Info("pruning frames", zap.Uint64("max_frame_to_prune", maxFrame))
err := e.clockStore.DeleteDataClockFrameRange(e.filter, 1, maxFrame)
if err != nil {
e.logger.Error("failed to prune frames", zap.Error(err))
return err
}
return nil
}

View File

@ -103,6 +103,10 @@ func (e *DataClockConsensusEngine) runFramePruning() {
e.logger.Info("frame pruning enabled, waiting for delay timeout expiry")
from := uint64(1)
maxFrames := uint64(e.config.Engine.MaxFrames)
batchSize := uint64(1000)
outer:
for {
select {
case <-e.ctx.Done():
@ -113,15 +117,34 @@ func (e *DataClockConsensusEngine) runFramePruning() {
panic(err)
}
if head.FrameNumber < uint64(e.config.Engine.MaxFrames)+1 ||
if head.FrameNumber <= maxFrames ||
head.FrameNumber <= application.PROOF_FRAME_SENIORITY_REPAIR+1 {
continue
}
if err := e.pruneFrames(
head.FrameNumber - uint64(e.config.Engine.MaxFrames),
); err != nil {
e.logger.Error("could not prune", zap.Error(err))
to := head.FrameNumber - maxFrames
for i := from; i < to; i += batchSize {
start, stop := i, min(i+batchSize, to)
if err := e.clockStore.DeleteDataClockFrameRange(e.filter, start, stop); err != nil {
e.logger.Error(
"failed to prune frames",
zap.Error(err),
zap.Uint64("from", start),
zap.Uint64("to", stop),
)
continue outer
}
e.logger.Info(
"pruned frames",
zap.Uint64("from", start),
zap.Uint64("to", stop),
)
select {
case <-e.ctx.Done():
return
default:
}
from = stop
}
}
}

View File

@ -69,6 +69,7 @@ func (e *MasterClockConsensusEngine) Sync(
if err != nil {
return errors.Wrap(err, "sync")
}
defer iter.Close()
response := []*protobufs.ClockFrame{}
@ -81,10 +82,6 @@ func (e *MasterClockConsensusEngine) Sync(
response = append(response, frame)
}
if err = iter.Close(); err != nil {
return errors.Wrap(err, "sync")
}
if len(response) == 0 {
return nil
}

View File

@ -144,6 +144,11 @@ var (
true,
"when enabled, frame execution validation is skipped",
)
compactDB = flag.Bool(
"compact-db",
false,
"compacts the database and exits",
)
)
func signatureCheckDefault() bool {
@ -330,6 +335,17 @@ func main() {
panic(err)
}
if *compactDB && *core == 0 {
db := store.NewPebbleDB(nodeConfig.DB)
if err := db.CompactAll(); err != nil {
panic(err)
}
if err := db.Close(); err != nil {
panic(err)
}
return
}
if *network != 0 {
if nodeConfig.P2P.BootstrapPeers[0] == config.BootstrapPeers[0] {
fmt.Println(
@ -467,6 +483,7 @@ func main() {
if !*integrityCheck {
go spawnDataWorkers(nodeConfig)
defer stopDataWorkers()
}
kzg.Init()
@ -510,6 +527,9 @@ func main() {
// runtime.GOMAXPROCS(1)
node.Start()
defer node.Stop()
if nodeConfig.ListenGRPCMultiaddr != "" {
srv, err := rpc.NewRPCServer(
nodeConfig.ListenGRPCMultiaddr,
@ -526,20 +546,13 @@ func main() {
if err != nil {
panic(err)
}
go func() {
err := srv.Start()
if err != nil {
panic(err)
}
}()
if err := srv.Start(); err != nil {
panic(err)
}
defer srv.Stop()
}
node.Start()
<-done
stopDataWorkers()
node.Stop()
}
var dataWorkers []*exec.Cmd

View File

@ -6,6 +6,7 @@ import (
"math/big"
"net/http"
"strings"
"sync"
"time"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
@ -43,6 +44,8 @@ type RPCServer struct {
pubSub p2p.PubSub
masterClock *master.MasterClockConsensusEngine
executionEngines []execution.ExecutionEngine
grpcServer *grpc.Server
httpServer *http.Server
}
// GetFrameInfo implements protobufs.NodeServiceServer.
@ -94,21 +97,17 @@ func (r *RPCServer) GetFrames(
if err != nil {
return nil, errors.Wrap(err, "get frames")
}
defer iter.Close()
frames := []*protobufs.ClockFrame{}
for iter.First(); iter.Valid(); iter.Next() {
frame, err := iter.Value()
if err != nil {
iter.Close()
return nil, errors.Wrap(err, "get frames")
}
frames = append(frames, frame)
}
if err := iter.Close(); err != nil {
return nil, errors.Wrap(err, "get frames")
}
return &protobufs.FramesResponse{
TruncatedClockFrames: frames,
}, nil
@ -121,21 +120,17 @@ func (r *RPCServer) GetFrames(
if err != nil {
return nil, errors.Wrap(err, "get frame info")
}
defer iter.Close()
frames := []*protobufs.ClockFrame{}
for iter.First(); iter.Valid(); iter.Next() {
frame, err := iter.TruncatedValue()
if err != nil {
iter.Close()
return nil, errors.Wrap(err, "get frames")
}
frames = append(frames, frame)
}
if err := iter.Close(); err != nil {
return nil, errors.Wrap(err, "get frames")
}
return &protobufs.FramesResponse{
TruncatedClockFrames: frames,
}, nil
@ -384,7 +379,33 @@ func NewRPCServer(
masterClock *master.MasterClockConsensusEngine,
executionEngines []execution.ExecutionEngine,
) (*RPCServer, error) {
return &RPCServer{
mg, err := multiaddr.NewMultiaddr(listenAddrGRPC)
if err != nil {
return nil, errors.Wrap(err, "new rpc server")
}
mga, err := mn.ToNetAddr(mg)
if err != nil {
return nil, errors.Wrap(err, "new rpc server")
}
mux := runtime.NewServeMux()
opts := qgrpc.ClientOptions(
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(600*1024*1024),
grpc.MaxCallSendMsgSize(600*1024*1024),
),
)
if err := protobufs.RegisterNodeServiceHandlerFromEndpoint(
context.Background(),
mux,
mga.String(),
opts,
); err != nil {
return nil, err
}
rpcServer := &RPCServer{
listenAddrGRPC: listenAddrGRPC,
listenAddrHTTP: listenAddrHTTP,
logger: logger,
@ -395,17 +416,22 @@ func NewRPCServer(
pubSub: pubSub,
masterClock: masterClock,
executionEngines: executionEngines,
}, nil
grpcServer: qgrpc.NewServer(
grpc.MaxRecvMsgSize(600*1024*1024),
grpc.MaxSendMsgSize(600*1024*1024),
),
httpServer: &http.Server{
Handler: mux,
},
}
protobufs.RegisterNodeServiceServer(rpcServer.grpcServer, rpcServer)
reflection.Register(rpcServer.grpcServer)
return rpcServer, nil
}
func (r *RPCServer) Start() error {
s := qgrpc.NewServer(
grpc.MaxRecvMsgSize(600*1024*1024),
grpc.MaxSendMsgSize(600*1024*1024),
)
protobufs.RegisterNodeServiceServer(s, r)
reflection.Register(s)
mg, err := multiaddr.NewMultiaddr(r.listenAddrGRPC)
if err != nil {
return errors.Wrap(err, "start")
@ -417,51 +443,42 @@ func (r *RPCServer) Start() error {
}
go func() {
if err := s.Serve(mn.NetListener(lis)); err != nil {
panic(err)
if err := r.grpcServer.Serve(mn.NetListener(lis)); err != nil {
r.logger.Error("serve error", zap.Error(err))
}
}()
if r.listenAddrHTTP != "" {
m, err := multiaddr.NewMultiaddr(r.listenAddrHTTP)
mh, err := multiaddr.NewMultiaddr(r.listenAddrHTTP)
if err != nil {
return errors.Wrap(err, "start")
}
ma, err := mn.ToNetAddr(m)
if err != nil {
return errors.Wrap(err, "start")
}
mga, err := mn.ToNetAddr(mg)
lis, err := mn.Listen(mh)
if err != nil {
return errors.Wrap(err, "start")
}
go func() {
mux := runtime.NewServeMux()
opts := qgrpc.ClientOptions(
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(600*1024*1024),
grpc.MaxCallSendMsgSize(600*1024*1024),
),
)
if err := protobufs.RegisterNodeServiceHandlerFromEndpoint(
context.Background(),
mux,
mga.String(),
opts,
); err != nil {
panic(err)
}
if err := http.ListenAndServe(ma.String(), mux); err != nil {
panic(err)
if err := r.httpServer.Serve(mn.NetListener(lis)); err != nil && !errors.Is(err, http.ErrServerClosed) {
r.logger.Error("serve error", zap.Error(err))
}
}()
}
return nil
}
func (r *RPCServer) Stop() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
r.grpcServer.GracefulStop()
}()
go func() {
defer wg.Done()
r.httpServer.Shutdown(context.Background())
}()
wg.Wait()
}

View File

@ -223,13 +223,13 @@ func (p *PebbleClockIterator) TruncatedValue() (
if err != nil {
return nil, errors.Wrap(err, "get truncated clock frame iterator value")
}
defer frameCloser.Close()
if err := proto.Unmarshal(frameValue, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get truncated clock frame iterator value",
)
}
frameCloser.Close()
} else {
if err := proto.Unmarshal(value, frame); err != nil {
return nil, errors.Wrap(
@ -259,13 +259,13 @@ func (p *PebbleClockIterator) Value() (*protobufs.ClockFrame, error) {
if err != nil {
return nil, errors.Wrap(err, "get clock frame iterator value")
}
defer frameCloser.Close()
if err := proto.Unmarshal(frameValue, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get clock frame iterator value",
)
}
defer frameCloser.Close()
} else {
if err := proto.Unmarshal(value, frame); err != nil {
return nil, errors.Wrap(
@ -485,8 +485,8 @@ func (p *PebbleClockStore) GetEarliestMasterClockFrame(
return nil, errors.Wrap(err, "get earliest master clock frame")
}
defer closer.Close()
frameNumber := binary.BigEndian.Uint64(idxValue)
frame, err := p.GetMasterClockFrame(filter, frameNumber)
if err != nil {
@ -508,8 +508,8 @@ func (p *PebbleClockStore) GetLatestMasterClockFrame(
return nil, errors.Wrap(err, "get latest master clock frame")
}
defer closer.Close()
frameNumber := binary.BigEndian.Uint64(idxValue)
frame, err := p.GetMasterClockFrame(filter, frameNumber)
if err != nil {
@ -532,11 +532,11 @@ func (p *PebbleClockStore) GetMasterClockFrame(
return nil, errors.Wrap(err, "get master clock frame")
}
defer closer.Close()
copied := make([]byte, len(value))
copy(copied[:], value[:])
defer closer.Close()
frame := &protobufs.ClockFrame{}
frame.FrameNumber = frameNumber
frame.Filter = filter
@ -611,10 +611,8 @@ func (p *PebbleClockStore) PutMasterClockFrame(
); err != nil {
return errors.Wrap(err, "put master clock frame")
}
}
if err == nil && closer != nil {
closer.Close()
} else {
_ = closer.Close()
}
if err = txn.Set(
@ -641,6 +639,7 @@ func (p *PebbleClockStore) GetDataClockFrame(
return nil, nil, errors.Wrap(err, "get data clock frame")
}
defer closer.Close()
frame := &protobufs.ClockFrame{}
genesisFramePreIndex := false
@ -657,14 +656,13 @@ func (p *PebbleClockStore) GetDataClockFrame(
return nil, nil, errors.Wrap(err, "get data clock frame")
}
defer frameCloser.Close()
if err := proto.Unmarshal(frameValue, frame); err != nil {
return nil, nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get data clock frame",
)
}
closer.Close()
defer frameCloser.Close()
} else {
genesisFramePreIndex = frameNumber == 0
if err := proto.Unmarshal(value, frame); err != nil {
@ -673,7 +671,6 @@ func (p *PebbleClockStore) GetDataClockFrame(
"get data clock frame",
)
}
defer closer.Close()
}
if !truncate {
@ -690,15 +687,17 @@ func (p *PebbleClockStore) GetDataClockFrame(
proverTrie := &tries.RollingFrecencyCritbitTrie{}
trieData, closer, err := p.db.Get(clockProverTrieKey(filter, i, frameNumber))
if err != nil {
if !errors.Is(err, pebble.ErrNotFound) {
return nil, nil, errors.Wrap(err, "get data clock frame")
}
break
}
defer closer.Close()
if err := proverTrie.Deserialize(trieData); err != nil {
closer.Close()
return nil, nil, errors.Wrap(err, "get latest data clock frame")
}
closer.Close()
i++
proverTries = append(proverTries, proverTrie)
}
@ -742,7 +741,6 @@ func (p *PebbleClockStore) deleteAggregateProofs(
for i := 0; i < len(frame.Input[516:])/74; i++ {
commit := frame.Input[516+(i*74) : 516+((i+1)*74)]
err := internalDeleteAggregateProof(
p.db,
txn,
frame.AggregateProofs[i],
commit,
@ -773,7 +771,6 @@ func (p *PebbleClockStore) saveAggregateProofs(
for i := 0; i < len(frame.Input[516:])/74; i++ {
commit := frame.Input[516+(i*74) : 516+((i+1)*74)]
err := internalPutAggregateProof(
p.db,
txn,
frame.AggregateProofs[i],
commit,
@ -804,8 +801,8 @@ func (p *PebbleClockStore) GetEarliestDataClockFrame(
return nil, errors.Wrap(err, "get earliest data clock frame")
}
defer closer.Close()
frameNumber := binary.BigEndian.Uint64(idxValue)
frame, _, err := p.GetDataClockFrame(filter, frameNumber, false)
if err != nil {
@ -827,6 +824,7 @@ func (p *PebbleClockStore) GetLatestDataClockFrame(
return nil, nil, errors.Wrap(err, "get latest data clock frame")
}
defer closer.Close()
frameNumber := binary.BigEndian.Uint64(idxValue)
frame, tries, err := p.GetDataClockFrame(filter, frameNumber, false)
@ -838,8 +836,6 @@ func (p *PebbleClockStore) GetLatestDataClockFrame(
return nil, nil, errors.Wrap(err, "get latest data clock frame")
}
closer.Close()
return frame, tries, nil
}
@ -859,6 +855,7 @@ func (p *PebbleClockStore) GetStagedDataClockFrame(
}
return nil, errors.Wrap(err, "get parent data clock frame")
}
defer closer.Close()
parent := &protobufs.ClockFrame{}
if err := proto.Unmarshal(data, parent); err != nil {
@ -874,10 +871,6 @@ func (p *PebbleClockStore) GetStagedDataClockFrame(
}
}
if closer != nil {
closer.Close()
}
return parent, nil
}
@ -895,9 +888,9 @@ func (p *PebbleClockStore) GetStagedDataClockFramesForFrameNumber(
}
return nil, errors.Wrap(err, "get staged data clock frames")
}
defer iter.Close()
frames := []*protobufs.ClockFrame{}
for iter.First(); iter.Valid(); iter.Next() {
data := iter.Value()
frame := &protobufs.ClockFrame{}
@ -915,8 +908,6 @@ func (p *PebbleClockStore) GetStagedDataClockFramesForFrameNumber(
frames = append(frames, frame)
}
iter.Close()
return frames, nil
}
@ -1008,10 +999,8 @@ func (p *PebbleClockStore) CommitDataClockFrame(
); err != nil {
return errors.Wrap(err, "commit data clock frame")
}
}
if err == nil && closer != nil {
closer.Close()
} else {
_ = closer.Close()
}
if !backfill {
@ -1074,36 +1063,99 @@ func (p *PebbleClockStore) DeleteDataClockFrameRange(
for i := fromFrameNumber; i < toFrameNumber; i++ {
frames, err := p.GetStagedDataClockFramesForFrameNumber(filter, i)
if err != nil {
continue
if !errors.Is(err, ErrNotFound) && !errors.Is(err, ErrInvalidData) {
_ = txn.Abort()
return errors.Wrap(err, "delete data clock frame range")
}
frames = nil
}
outer:
for _, frame := range frames {
err = p.deleteAggregateProofs(txn, frame)
if err != nil {
continue
for _, ap := range frame.AggregateProofs {
for _, inc := range ap.InclusionCommitments {
// The commitments collide for very small frames, and as such we have to detect them early
// and avoid deleting them. Common cases for such collisions are prover announcement messages
// which do not contain the frame number, so their binary contents are equivalent between
// multiple frames.
if len(inc.Data) < 2048 {
continue outer
}
if inc.TypeUrl == protobufs.IntrinsicExecutionOutputType {
o := &protobufs.IntrinsicExecutionOutput{}
if err := proto.Unmarshal(inc.Data, o); err != nil {
_ = txn.Abort()
return errors.Wrap(err, "delete data clock frame range")
}
// The commitments collide for empty frames, and as such we have to detect them early
// and avoid deleting them.
if len(o.Output) == 0 || len(o.Proof) == 0 {
continue outer
}
}
}
}
if err := p.deleteAggregateProofs(txn, frame); err != nil {
if !errors.Is(err, pebble.ErrNotFound) {
_ = txn.Abort()
return errors.Wrap(err, "delete data clock frame range")
}
}
}
err = txn.DeleteRange(
if err := txn.DeleteRange(
clockDataParentIndexKey(filter, i, bytes.Repeat([]byte{0x00}, 32)),
clockDataParentIndexKey(filter, i, bytes.Repeat([]byte{0xff}, 32)),
)
if err != nil {
continue
); err != nil {
if !errors.Is(err, pebble.ErrNotFound) {
_ = txn.Abort()
return errors.Wrap(err, "delete data clock frame range")
}
}
err = txn.Delete(clockDataFrameKey(filter, i))
if err != nil {
continue
if err := txn.Delete(clockDataFrameKey(filter, i)); err != nil {
if !errors.Is(err, pebble.ErrNotFound) {
_ = txn.Abort()
return errors.Wrap(err, "delete data clock frame range")
}
}
// The prover trie keys are not stored continuously with respect
// to the same frame number. As such, we need to manually iterate
// and discover such keys.
for t := uint16(0); t <= 0xffff; t++ {
_, closer, err := p.db.Get(clockProverTrieKey(filter, t, i))
if err != nil {
if !errors.Is(err, pebble.ErrNotFound) {
_ = txn.Abort()
return errors.Wrap(err, "delete data clock frame range")
} else {
break
}
}
_ = closer.Close()
if err := txn.Delete(clockProverTrieKey(filter, t, i)); err != nil {
_ = txn.Abort()
return errors.Wrap(err, "delete data clock frame range")
}
}
if err := txn.DeleteRange(
clockDataTotalDistanceKey(filter, i, bytes.Repeat([]byte{0x00}, 32)),
clockDataTotalDistanceKey(filter, i, bytes.Repeat([]byte{0xff}, 32)),
); err != nil {
if !errors.Is(err, pebble.ErrNotFound) {
_ = txn.Abort()
return errors.Wrap(err, "delete data clock frame range")
}
}
}
if err = txn.Commit(); err != nil {
txn.Abort()
if err := txn.Commit(); err != nil {
return errors.Wrap(err, "delete data clock frame range")
}
return errors.Wrap(err, "delete data clock frame range")
return nil
}
func (p *PebbleClockStore) ResetMasterClockFrames(filter []byte) error {
@ -1154,7 +1206,7 @@ func (p *PebbleClockStore) Compact(
if bytes.Compare(version, config.GetVersion()) < 0 {
cleared = false
}
closer.Close()
defer closer.Close()
}
if !cleared {
@ -1206,11 +1258,10 @@ func (p *PebbleClockStore) Compact(
return errors.Wrap(err, "compact")
}
defer closer.Close()
last := binary.BigEndian.Uint64(idxValue)
closer.Close()
for frameNumber := uint64(1); frameNumber <= last; frameNumber++ {
value, closer, err := p.db.Get(clockDataFrameKey(dataFilter, frameNumber))
if err != nil {
@ -1220,6 +1271,7 @@ func (p *PebbleClockStore) Compact(
return errors.Wrap(err, "compact")
}
defer closer.Close()
frame := &protobufs.ClockFrame{}
@ -1228,6 +1280,7 @@ func (p *PebbleClockStore) Compact(
if err != nil {
return errors.Wrap(err, "compact")
}
defer frameCloser.Close()
if err := proto.Unmarshal(frameValue, frame); err != nil {
return errors.Wrap(err, "compact")
}
@ -1242,9 +1295,6 @@ func (p *PebbleClockStore) Compact(
make([]byte, 32),
)),
)
closer.Close()
frameCloser.Close()
} else {
if err := proto.Unmarshal(value, frame); err != nil {
return errors.Wrap(err, "compact")
@ -1271,14 +1321,15 @@ func (p *PebbleClockStore) Compact(
make([]byte, 32),
)),
)
if err != nil {
return errors.Wrap(err, "compact")
}
parents = append(parents,
clockDataParentIndexKey(dataFilter, frameNumber, selector.FillBytes(
make([]byte, 32),
)),
)
closer.Close()
}
for i := 0; i < len(frame.Input[516:])/74; i++ {
@ -1539,10 +1590,8 @@ func (p *PebbleClockStore) GetTotalDistance(
return nil, errors.Wrap(err, "get total distance")
}
defer closer.Close()
dist := new(big.Int).SetBytes(value)
return dist, nil
}
@ -1580,7 +1629,6 @@ func (p *PebbleClockStore) GetPeerSeniorityMap(filter []byte) (
if err = dec.Decode(&seniorityMap); err != nil {
return nil, errors.Wrap(err, "get peer seniority map")
}
return seniorityMap, nil
}
@ -1628,9 +1676,12 @@ func (p *PebbleClockStore) SetProverTriesForFrame(
clockProverTrieKey(frame.Filter, uint16(start), frame.FrameNumber),
)
if err != nil {
if !errors.Is(err, pebble.ErrNotFound) {
return errors.Wrap(err, "set prover tries for frame")
}
break
}
closer.Close()
_ = closer.Close()
if err = p.db.Delete(
clockProverTrieKey(frame.Filter, uint16(start), frame.FrameNumber),

View File

@ -142,8 +142,8 @@ func (p *PebbleCoinStore) GetCoinsForOwner(
err = errors.Wrap(err, "get coins for owner")
return nil, nil, nil, err
}
defer iter.Close()
frameNumbers := []uint64{}
addresses := [][]byte{}
coins := []*protobufs.Coin{}
@ -182,8 +182,8 @@ func (p *PebbleCoinStore) GetPreCoinProofsForOwner(owner []byte) (
err = errors.Wrap(err, "get pre coin proofs for owner")
return nil, nil, err
}
defer iter.Close()
frameNumbers := []uint64{}
proofs := []*protobufs.PreCoinProof{}
for iter.First(); iter.Valid(); iter.Next() {
@ -221,7 +221,6 @@ func (p *PebbleCoinStore) GetCoinByAddress(txn Transaction, address []byte) (
err = errors.Wrap(err, "get coin by address")
return nil, err
}
defer closer.Close()
coin := &protobufs.Coin{}
@ -246,7 +245,6 @@ func (p *PebbleCoinStore) GetPreCoinProofByAddress(address []byte) (
err = errors.Wrap(err, "get pre coin proof by address")
return nil, err
}
defer closer.Close()
proof := &protobufs.PreCoinProof{}
@ -424,9 +422,9 @@ func (p *PebbleCoinStore) GetLatestFrameProcessed() (uint64, error) {
return 0, errors.Wrap(err, "get latest frame processed")
}
defer closer.Close()
frameNumber := binary.BigEndian.Uint64(v)
closer.Close()
return frameNumber, nil
}
@ -562,13 +560,12 @@ func (p *PebbleCoinStore) Migrate(filter []byte, genesisSeedHex string) error {
}
return p.internalMigrate(filter, seed)
}
defer closer.Close()
if !bytes.Equal(compare, seed) {
return p.internalMigrate(filter, seed)
}
closer.Close()
status, closer, err := p.db.Get(migrationKey())
if err != nil {
if !errors.Is(err, pebble.ErrNotFound) {

View File

@ -139,8 +139,8 @@ func internalGetAggregateProof(
return nil, errors.Wrap(err, "get aggregate proof")
}
defer closer.Close()
copied := make([]byte, len(value[8:]))
limit := binary.BigEndian.Uint64(value[0:8])
copy(copied, value[8:])
@ -159,6 +159,7 @@ func internalGetAggregateProof(
if err != nil {
return nil, errors.Wrap(err, "get aggregate proof")
}
defer iter.Close()
i := uint32(0)
@ -199,14 +200,11 @@ func internalGetAggregateProof(
return nil, errors.Wrap(err, "get aggregate proof")
}
defer dataCloser.Close()
segCopy := make([]byte, len(segValue))
copy(segCopy, segValue)
chunks = append(chunks, segCopy)
if err = dataCloser.Close(); err != nil {
return nil, errors.Wrap(err, "get aggregate proof")
}
}
if string(url) == protobufs.IntrinsicExecutionOutputType {
@ -236,10 +234,6 @@ func internalGetAggregateProof(
i++
}
if err = iter.Close(); err != nil {
return nil, errors.Wrap(err, "get aggregate proof")
}
return aggregate, nil
}
@ -263,8 +257,8 @@ func internalListAggregateProofKeys(
return nil, nil, nil, errors.Wrap(err, "list aggregate proof")
}
defer closer.Close()
copied := make([]byte, len(value[8:]))
limit := binary.BigEndian.Uint64(value[0:8])
copy(copied, value[8:])
@ -278,6 +272,7 @@ func internalListAggregateProofKeys(
return nil, nil, nil, errors.Wrap(err, "list aggregate proof")
}
defer iter.Close()
i := uint32(0)
commits = append(commits, dataProofInclusionKey(filter, commitment, 0))
@ -305,10 +300,6 @@ func internalListAggregateProofKeys(
i++
}
if err = iter.Close(); err != nil {
return nil, nil, nil, errors.Wrap(err, "list aggregate proof")
}
return proofs, commits, data, nil
}
@ -326,17 +317,10 @@ func (p *PebbleDataProofStore) GetAggregateProof(
}
func internalDeleteAggregateProof(
db KVDB,
txn Transaction,
aggregateProof *protobufs.InclusionAggregateProof,
commitment []byte,
) error {
buf := binary.BigEndian.AppendUint64(
nil,
uint64(len(aggregateProof.InclusionCommitments)),
)
buf = append(buf, aggregateProof.Proof...)
for i, inc := range aggregateProof.InclusionCommitments {
var segments [][]byte
if inc.TypeUrl == protobufs.IntrinsicExecutionOutputType {
@ -378,7 +362,6 @@ func internalDeleteAggregateProof(
}
func internalPutAggregateProof(
db KVDB,
txn Transaction,
aggregateProof *protobufs.InclusionAggregateProof,
commitment []byte,
@ -447,7 +430,6 @@ func (p *PebbleDataProofStore) PutAggregateProof(
commitment []byte,
) error {
return internalPutAggregateProof(
p.db,
txn,
aggregateProof,
commitment,
@ -467,8 +449,8 @@ func (p *PebbleDataProofStore) GetDataTimeProof(
err = errors.Wrap(err, "get data time proof")
return
}
defer closer.Close()
if len(data) < 24 {
err = ErrInvalidData
return
@ -513,13 +495,10 @@ func (p *PebbleDataProofStore) GetTotalReward(
return nil, errors.Wrap(err, "get total difficulty sum")
}
defer closer.Close()
if len(prev) != 0 {
reward.SetBytes(prev[4:])
if err = closer.Close(); err != nil {
return nil, errors.Wrap(err, "get total difficulty sum")
}
}
return reward, nil
@ -556,15 +535,12 @@ func (p *PebbleDataProofStore) PutDataTimeProof(
if err != nil && (!errors.Is(err, pebble.ErrNotFound) || increment != 0) {
return errors.Wrap(err, "put data time proof")
}
defer closer.Close()
if len(prev) != 0 {
priorSum.SetBytes(prev[4:])
prevIncrement := binary.BigEndian.Uint32(prev[:4])
if err = closer.Close(); err != nil {
return errors.Wrap(err, "put data time proof")
}
if prevIncrement != increment-1 {
return errors.Wrap(errors.New("invalid increment"), "put data time proof")
}
@ -609,15 +585,13 @@ func (p *PebbleDataProofStore) GetLatestDataTimeProof(peerId []byte) (
return 0, 0, nil, errors.Wrap(err, "get latest data time proof")
}
defer closer.Close()
if len(prev) < 4 {
return 0, 0, nil, ErrInvalidData
}
increment = binary.BigEndian.Uint32(prev[:4])
if err = closer.Close(); err != nil {
return 0, 0, nil, errors.Wrap(err, "get latest data time proof")
}
_, parallelism, _, output, err = p.GetDataTimeProof(peerId, increment)

View File

@ -281,15 +281,13 @@ func (t *InMemKVDBTransaction) DeleteRange(
if err != nil {
return err
}
defer iter.Close()
for iter.First(); iter.Valid(); iter.Next() {
t.changes = append(t.changes, InMemKVDBOperation{
op: DeleteOperation,
key: iter.Key(),
})
if err != nil {
return err
}
}
return nil
@ -416,6 +414,7 @@ func (d *InMemKVDB) DeleteRange(start, end []byte) error {
if err != nil {
return err
}
defer iter.Close()
for iter.First(); iter.Valid(); iter.Next() {
err = d.Delete(iter.Key())

View File

@ -270,6 +270,8 @@ func (p *PebbleKeyStore) IncludeProvingKey(
staged, closer, err := p.db.Get(stagedProvingKeyKey(provingKey.PublicKey()))
if err != nil && !errors.Is(err, ErrNotFound) {
return errors.Wrap(err, "include proving key")
} else if err == nil {
defer closer.Close()
}
if staged != nil {
@ -279,9 +281,6 @@ func (p *PebbleKeyStore) IncludeProvingKey(
return errors.Wrap(err, "include proving key")
}
}
if err := closer.Close(); err != nil {
return errors.Wrap(err, "include proving key")
}
return nil
}
@ -297,16 +296,13 @@ func (p *PebbleKeyStore) GetStagedProvingKey(
return nil, errors.Wrap(err, "get staged proving key")
}
defer closer.Close()
stagedKey := &protobufs.ProvingKeyAnnouncement{}
if err = proto.Unmarshal(data, stagedKey); err != nil {
return nil, errors.Wrap(err, "get staged proving key")
}
if err := closer.Close(); err != nil {
return nil, errors.Wrap(err, "get staged proving key")
}
return stagedKey, nil
}
@ -322,12 +318,9 @@ func (p *PebbleKeyStore) GetLatestKeyBundle(
return nil, errors.Wrap(err, "get latest key bundle")
}
defer closer.Close()
frameNumber := binary.BigEndian.Uint64(value)
if err := closer.Close(); err != nil {
return nil, errors.Wrap(err, "get latest key bundle")
}
value, closer, err = p.db.Get(keyBundleKey(provingKey, frameNumber))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
@ -336,7 +329,6 @@ func (p *PebbleKeyStore) GetLatestKeyBundle(
return nil, errors.Wrap(err, "get latest key bundle")
}
defer closer.Close()
announcement := &protobufs.InclusionCommitment{}
@ -440,10 +432,8 @@ func (p *PebbleKeyStore) PutKeyBundle(
); err != nil {
return errors.Wrap(err, "put key bundle")
}
}
if err == nil && closer != nil {
closer.Close()
} else {
_ = closer.Close()
}
if err = txn.Set(

View File

@ -74,10 +74,10 @@ func (d *PeerstoreDatastore) Get(
}
return nil, err
}
defer closer.Close()
out := make([]byte, len(val))
copy(out[:], val[:])
closer.Close()
return val, nil
}
@ -226,10 +226,10 @@ func (t *transaction) Get(
}
return nil, errors.Wrap(err, "get")
}
defer closer.Close()
out := make([]byte, len(b))
copy(out[:], b[:])
closer.Close()
return b, nil
}