mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 10:27:26 +08:00
Merge branch 'develop' into v2.0.5-p1
This commit is contained in:
commit
9b4248e573
@ -276,7 +276,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
|
||||
disc: &discover{},
|
||||
softMaxMessageSize: DefaultSoftMaxMessageSize,
|
||||
hardMaxMessageSize: DefaultHardMaxMessageSize,
|
||||
peerOutboundQueueSize: 32,
|
||||
peerOutboundQueueSize: DefaultPeerOutboundQueueSize,
|
||||
signID: h.ID(),
|
||||
signKey: nil,
|
||||
signPolicy: LaxSign,
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/crypto/sha3"
|
||||
@ -162,9 +163,17 @@ func (n *Node) Start() {
|
||||
}
|
||||
|
||||
// TODO: add config mapping to engine name/frame registration
|
||||
wg := sync.WaitGroup{}
|
||||
for _, e := range n.execEngines {
|
||||
n.engine.RegisterExecutor(e, 0)
|
||||
wg.Add(1)
|
||||
go func(e execution.ExecutionEngine) {
|
||||
defer wg.Done()
|
||||
if err := <-n.engine.RegisterExecutor(e, 0); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}(e)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (n *Node) Stop() {
|
||||
|
||||
@ -22,6 +22,24 @@ import (
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
type GRPCMessageLimitsConfig struct {
|
||||
MaxRecvMsgSize int `yaml:"maxRecvMsgSize"`
|
||||
MaxSendMsgSize int `yaml:"maxSendMsgSize"`
|
||||
}
|
||||
|
||||
// WithDefaults returns a copy of the GRPCMessageLimitsConfig with any missing fields set to
|
||||
// their default values.
|
||||
func (c GRPCMessageLimitsConfig) WithDefaults(recv, send int) GRPCMessageLimitsConfig {
|
||||
cpy := c
|
||||
if cpy.MaxRecvMsgSize == 0 {
|
||||
cpy.MaxRecvMsgSize = recv
|
||||
}
|
||||
if cpy.MaxSendMsgSize == 0 {
|
||||
cpy.MaxSendMsgSize = send
|
||||
}
|
||||
return cpy
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Key *KeyConfig `yaml:"key"`
|
||||
P2P *P2PConfig `yaml:"p2p"`
|
||||
@ -32,6 +50,16 @@ type Config struct {
|
||||
LogFile string `yaml:"logFile"`
|
||||
}
|
||||
|
||||
// WithDefaults returns a copy of the config with default values filled in.
|
||||
func (c Config) WithDefaults() Config {
|
||||
cpy := c
|
||||
p2p := cpy.P2P.WithDefaults()
|
||||
cpy.P2P = &p2p
|
||||
engine := cpy.Engine.WithDefaults()
|
||||
cpy.Engine = &engine
|
||||
return cpy
|
||||
}
|
||||
|
||||
func NewConfig(configPath string) (*Config, error) {
|
||||
file, err := os.Open(configPath)
|
||||
if err != nil {
|
||||
@ -447,7 +475,8 @@ func LoadConfig(configPath string, proverKey string, skipGenesisCheck bool) (
|
||||
config.P2P.BootstrapPeers = peers
|
||||
}
|
||||
|
||||
return config, nil
|
||||
withDefaults := config.WithDefaults()
|
||||
return &withDefaults, nil
|
||||
}
|
||||
|
||||
func SaveConfig(configPath string, config *Config) error {
|
||||
|
||||
@ -2,6 +2,17 @@ package config
|
||||
|
||||
import "time"
|
||||
|
||||
const (
|
||||
defaultMinimumPeersRequired = 3
|
||||
defaultDataWorkerBaseListenMultiaddr = "/ip4/127.0.0.1/tcp/%d"
|
||||
defaultDataWorkerBaseListenPort = 40000
|
||||
defaultDataWorkerMemoryLimit = 1792 * 1024 * 1024 // 1.75 GiB
|
||||
defaultSyncTimeout = 4 * time.Second
|
||||
defaultSyncCandidates = 8
|
||||
defaultSyncMessageReceiveLimit = 1 * 1024 * 1024
|
||||
defaultSyncMessageSendLimit = 600 * 1024 * 1024
|
||||
)
|
||||
|
||||
type FramePublishFragmentationReedSolomonConfig struct {
|
||||
// The number of data shards to use for Reed-Solomon encoding and decoding.
|
||||
DataShards int `yaml:"dataShards"`
|
||||
@ -9,7 +20,8 @@ type FramePublishFragmentationReedSolomonConfig struct {
|
||||
ParityShards int `yaml:"parityShards"`
|
||||
}
|
||||
|
||||
// WithDefaults sets default values for any fields that are not set.
|
||||
// WithDefaults returns a copy of the FramePublishFragmentationReedSolomonConfig with any missing fields set to
|
||||
// their default values.
|
||||
func (c FramePublishFragmentationReedSolomonConfig) WithDefaults() FramePublishFragmentationReedSolomonConfig {
|
||||
cpy := c
|
||||
if cpy.DataShards == 0 {
|
||||
@ -29,7 +41,8 @@ type FramePublishFragmentationConfig struct {
|
||||
ReedSolomon FramePublishFragmentationReedSolomonConfig `yaml:"reedSolomon"`
|
||||
}
|
||||
|
||||
// WithDefaults sets default values for any fields that are not set.
|
||||
// WithDefaults returns a copy of the FramePublishFragmentationConfig with any missing fields set to
|
||||
// their default values.
|
||||
func (c FramePublishFragmentationConfig) WithDefaults() FramePublishFragmentationConfig {
|
||||
cpy := c
|
||||
if cpy.Algorithm == "" {
|
||||
@ -53,7 +66,8 @@ type FramePublishConfig struct {
|
||||
BallastSize int `yaml:"ballastSize"`
|
||||
}
|
||||
|
||||
// WithDefaults sets default values for any fields that are not set.
|
||||
// WithDefaults returns a copy of the FramePublishConfig with any missing fields set to
|
||||
// their default values.
|
||||
func (c FramePublishConfig) WithDefaults() FramePublishConfig {
|
||||
cpy := c
|
||||
if cpy.Mode == "" {
|
||||
@ -94,6 +108,10 @@ type EngineConfig struct {
|
||||
AutoMergeCoins bool `yaml:"autoMergeCoins"`
|
||||
// Maximum wait time for a frame to be downloaded from a peer.
|
||||
SyncTimeout time.Duration `yaml:"syncTimeout"`
|
||||
// Number of candidate peers per category to sync with.
|
||||
SyncCandidates int `yaml:"syncCandidates"`
|
||||
// The configuration for the GRPC message limits.
|
||||
SyncMessageLimits GRPCMessageLimitsConfig `yaml:"syncMessageLimits"`
|
||||
|
||||
// Values used only for testing – do not override these in production, your
|
||||
// node will get kicked out
|
||||
@ -104,3 +122,33 @@ type EngineConfig struct {
|
||||
// EXPERIMENTAL: The configuration for frame publishing.
|
||||
FramePublish FramePublishConfig `yaml:"framePublish"`
|
||||
}
|
||||
|
||||
// WithDefaults returns a copy of the EngineConfig with any missing fields set to
|
||||
// their default values.
|
||||
func (c EngineConfig) WithDefaults() EngineConfig {
|
||||
cpy := c
|
||||
if cpy.MinimumPeersRequired == 0 {
|
||||
cpy.MinimumPeersRequired = defaultMinimumPeersRequired
|
||||
}
|
||||
if cpy.DataWorkerBaseListenMultiaddr == "" {
|
||||
cpy.DataWorkerBaseListenMultiaddr = defaultDataWorkerBaseListenMultiaddr
|
||||
}
|
||||
if cpy.DataWorkerBaseListenPort == 0 {
|
||||
cpy.DataWorkerBaseListenPort = defaultDataWorkerBaseListenPort
|
||||
}
|
||||
if cpy.DataWorkerMemoryLimit == 0 {
|
||||
cpy.DataWorkerMemoryLimit = defaultDataWorkerMemoryLimit
|
||||
}
|
||||
if cpy.SyncTimeout == 0 {
|
||||
cpy.SyncTimeout = defaultSyncTimeout
|
||||
}
|
||||
if cpy.SyncCandidates == 0 {
|
||||
cpy.SyncCandidates = defaultSyncCandidates
|
||||
}
|
||||
cpy.SyncMessageLimits = cpy.SyncMessageLimits.WithDefaults(
|
||||
defaultSyncMessageReceiveLimit,
|
||||
defaultSyncMessageSendLimit,
|
||||
)
|
||||
cpy.FramePublish = cpy.FramePublish.WithDefaults()
|
||||
return cpy
|
||||
}
|
||||
|
||||
@ -2,6 +2,22 @@ package config
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
blossomsub "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub"
|
||||
qruntime "source.quilibrium.com/quilibrium/monorepo/node/internal/runtime"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultLowWatermarkConnections = 160
|
||||
defaultHighWatermarkConnections = 192
|
||||
defaultGRPCServerRateLimit = 10
|
||||
defaultMinBootstrapPeers = 3
|
||||
defaultBootstrapParallelism = 10
|
||||
defaultDiscoveryParallelism = 50
|
||||
defaultDiscoveryPeerLookupLimit = 1000
|
||||
defaultPingTimeout = 5 * time.Second
|
||||
defaultPingPeriod = 30 * time.Second
|
||||
defaultPingAttempts = 3
|
||||
)
|
||||
|
||||
type P2PConfig struct {
|
||||
@ -43,7 +59,7 @@ type P2PConfig struct {
|
||||
LowWatermarkConnections int `yaml:"lowWatermarkConnections"`
|
||||
HighWatermarkConnections int `yaml:"highWatermarkConnections"`
|
||||
DirectPeers []string `yaml:"directPeers"`
|
||||
GrpcServerRateLimit int `yaml:"grpcServerRateLimit"`
|
||||
GRPCServerRateLimit int `yaml:"grpcServerRateLimit"`
|
||||
MinBootstrapPeers int `yaml:"minBootstrapPeers"`
|
||||
BootstrapParallelism int `yaml:"bootstrapParallelism"`
|
||||
DiscoveryParallelism int `yaml:"discoveryParallelism"`
|
||||
@ -56,3 +72,145 @@ type P2PConfig struct {
|
||||
SubscriptionQueueSize int `yaml:"subscriptionQueueSize"`
|
||||
PeerOutboundQueueSize int `yaml:"peerOutboundQueueSize"`
|
||||
}
|
||||
|
||||
// WithDefaults returns a copy of the P2PConfig with any missing fields set to
|
||||
// their default values.
|
||||
func (c P2PConfig) WithDefaults() P2PConfig {
|
||||
cpy := c
|
||||
if cpy.D == 0 {
|
||||
cpy.D = blossomsub.BlossomSubD
|
||||
}
|
||||
if cpy.DLo == 0 {
|
||||
cpy.DLo = blossomsub.BlossomSubDlo
|
||||
}
|
||||
if cpy.DHi == 0 {
|
||||
cpy.DHi = blossomsub.BlossomSubDhi
|
||||
}
|
||||
if cpy.DScore == 0 {
|
||||
cpy.DScore = blossomsub.BlossomSubDscore
|
||||
}
|
||||
if cpy.DOut == 0 {
|
||||
cpy.DOut = blossomsub.BlossomSubDout
|
||||
}
|
||||
if cpy.HistoryLength == 0 {
|
||||
cpy.HistoryLength = blossomsub.BlossomSubHistoryLength
|
||||
}
|
||||
if cpy.HistoryGossip == 0 {
|
||||
cpy.HistoryGossip = blossomsub.BlossomSubHistoryGossip
|
||||
}
|
||||
if cpy.DLazy == 0 {
|
||||
cpy.DLazy = blossomsub.BlossomSubDlazy
|
||||
}
|
||||
if cpy.GossipFactor == 0 {
|
||||
cpy.GossipFactor = blossomsub.BlossomSubGossipFactor
|
||||
}
|
||||
if cpy.GossipRetransmission == 0 {
|
||||
cpy.GossipRetransmission = blossomsub.BlossomSubGossipRetransmission
|
||||
}
|
||||
if cpy.HeartbeatInitialDelay == 0 {
|
||||
cpy.HeartbeatInitialDelay = blossomsub.BlossomSubHeartbeatInitialDelay
|
||||
}
|
||||
if cpy.HeartbeatInterval == 0 {
|
||||
cpy.HeartbeatInterval = blossomsub.BlossomSubHeartbeatInterval
|
||||
}
|
||||
if cpy.FanoutTTL == 0 {
|
||||
cpy.FanoutTTL = blossomsub.BlossomSubFanoutTTL
|
||||
}
|
||||
if cpy.PrunePeers == 0 {
|
||||
cpy.PrunePeers = blossomsub.BlossomSubPrunePeers
|
||||
}
|
||||
if cpy.PruneBackoff == 0 {
|
||||
cpy.PruneBackoff = blossomsub.BlossomSubPruneBackoff
|
||||
}
|
||||
if cpy.UnsubscribeBackoff == 0 {
|
||||
cpy.UnsubscribeBackoff = blossomsub.BlossomSubUnsubscribeBackoff
|
||||
}
|
||||
if cpy.Connectors == 0 {
|
||||
cpy.Connectors = blossomsub.BlossomSubConnectors
|
||||
}
|
||||
if cpy.MaxPendingConnections == 0 {
|
||||
cpy.MaxPendingConnections = blossomsub.BlossomSubMaxPendingConnections
|
||||
}
|
||||
if cpy.ConnectionTimeout == 0 {
|
||||
cpy.ConnectionTimeout = blossomsub.BlossomSubConnectionTimeout
|
||||
}
|
||||
if cpy.DirectConnectTicks == 0 {
|
||||
cpy.DirectConnectTicks = blossomsub.BlossomSubDirectConnectTicks
|
||||
}
|
||||
if cpy.DirectConnectInitialDelay == 0 {
|
||||
cpy.DirectConnectInitialDelay =
|
||||
blossomsub.BlossomSubDirectConnectInitialDelay
|
||||
}
|
||||
if cpy.OpportunisticGraftTicks == 0 {
|
||||
cpy.OpportunisticGraftTicks =
|
||||
blossomsub.BlossomSubOpportunisticGraftTicks
|
||||
}
|
||||
if cpy.OpportunisticGraftPeers == 0 {
|
||||
cpy.OpportunisticGraftPeers =
|
||||
blossomsub.BlossomSubOpportunisticGraftPeers
|
||||
}
|
||||
if cpy.GraftFloodThreshold == 0 {
|
||||
cpy.GraftFloodThreshold = blossomsub.BlossomSubGraftFloodThreshold
|
||||
}
|
||||
if cpy.MaxIHaveLength == 0 {
|
||||
cpy.MaxIHaveLength = blossomsub.BlossomSubMaxIHaveLength
|
||||
}
|
||||
if cpy.MaxIHaveMessages == 0 {
|
||||
cpy.MaxIHaveMessages = blossomsub.BlossomSubMaxIHaveMessages
|
||||
}
|
||||
if cpy.MaxIDontWantMessages == 0 {
|
||||
cpy.MaxIDontWantMessages = blossomsub.BlossomSubMaxIDontWantMessages
|
||||
}
|
||||
if cpy.IWantFollowupTime == 0 {
|
||||
cpy.IWantFollowupTime = blossomsub.BlossomSubIWantFollowupTime
|
||||
}
|
||||
if cpy.IDontWantMessageThreshold == 0 {
|
||||
cpy.IDontWantMessageThreshold = blossomsub.BlossomSubIDontWantMessageThreshold
|
||||
}
|
||||
if cpy.IDontWantMessageTTL == 0 {
|
||||
cpy.IDontWantMessageTTL = blossomsub.BlossomSubIDontWantMessageTTL
|
||||
}
|
||||
if cpy.LowWatermarkConnections == 0 {
|
||||
cpy.LowWatermarkConnections = defaultLowWatermarkConnections
|
||||
}
|
||||
if cpy.HighWatermarkConnections == 0 {
|
||||
cpy.HighWatermarkConnections = defaultHighWatermarkConnections
|
||||
}
|
||||
if cpy.GRPCServerRateLimit == 0 {
|
||||
cpy.GRPCServerRateLimit = defaultGRPCServerRateLimit
|
||||
}
|
||||
if cpy.MinBootstrapPeers == 0 {
|
||||
cpy.MinBootstrapPeers = defaultMinBootstrapPeers
|
||||
}
|
||||
if cpy.BootstrapParallelism == 0 {
|
||||
cpy.BootstrapParallelism = defaultBootstrapParallelism
|
||||
}
|
||||
if cpy.DiscoveryParallelism == 0 {
|
||||
cpy.DiscoveryParallelism = defaultDiscoveryParallelism
|
||||
}
|
||||
if cpy.DiscoveryPeerLookupLimit == 0 {
|
||||
cpy.DiscoveryPeerLookupLimit = defaultDiscoveryPeerLookupLimit
|
||||
}
|
||||
if cpy.PingTimeout == 0 {
|
||||
cpy.PingTimeout = defaultPingTimeout
|
||||
}
|
||||
if cpy.PingPeriod == 0 {
|
||||
cpy.PingPeriod = defaultPingPeriod
|
||||
}
|
||||
if cpy.PingAttempts == 0 {
|
||||
cpy.PingAttempts = defaultPingAttempts
|
||||
}
|
||||
if cpy.ValidateQueueSize == 0 {
|
||||
cpy.ValidateQueueSize = blossomsub.DefaultValidateQueueSize
|
||||
}
|
||||
if cpy.ValidateWorkers == 0 {
|
||||
cpy.ValidateWorkers = qruntime.WorkerCount(0, false)
|
||||
}
|
||||
if cpy.SubscriptionQueueSize == 0 {
|
||||
cpy.SubscriptionQueueSize = blossomsub.DefaultSubscriptionQueueSize
|
||||
}
|
||||
if cpy.PeerOutboundQueueSize == 0 {
|
||||
cpy.PeerOutboundQueueSize = blossomsub.DefaultPeerOutboundQueueSize
|
||||
}
|
||||
return cpy
|
||||
}
|
||||
|
||||
@ -98,7 +98,7 @@ func (e *DataClockConsensusEngine) publishProof(
|
||||
}
|
||||
e.peerMapMx.Unlock()
|
||||
|
||||
cfg := e.config.Engine.FramePublish.WithDefaults()
|
||||
cfg := e.config.Engine.FramePublish
|
||||
if cfg.BallastSize > 0 {
|
||||
frame = proto.Clone(frame).(*protobufs.ClockFrame)
|
||||
frame.Padding = make([]byte, cfg.BallastSize)
|
||||
|
||||
@ -20,8 +20,6 @@ import (
|
||||
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
|
||||
)
|
||||
|
||||
const defaultSyncTimeout = 4 * time.Second
|
||||
|
||||
func (e *DataClockConsensusEngine) syncWithMesh() error {
|
||||
e.logger.Info("collecting vdf proofs")
|
||||
|
||||
@ -304,11 +302,12 @@ func (e *DataClockConsensusEngine) GetAheadPeers(frameNumber uint64) []internal.
|
||||
}
|
||||
}
|
||||
|
||||
syncCandidates := e.config.Engine.SyncCandidates
|
||||
return slices.Concat(
|
||||
internal.WeightedSampleWithoutReplacement(nearCandidates, len(nearCandidates)),
|
||||
internal.WeightedSampleWithoutReplacement(reachableCandidates, len(reachableCandidates)),
|
||||
internal.WeightedSampleWithoutReplacement(unknownCandidates, len(unknownCandidates)),
|
||||
internal.WeightedSampleWithoutReplacement(unreachableCandidates, len(unreachableCandidates)),
|
||||
internal.WeightedSampleWithoutReplacement(nearCandidates, min(len(nearCandidates), syncCandidates)),
|
||||
internal.WeightedSampleWithoutReplacement(reachableCandidates, min(len(reachableCandidates), syncCandidates)),
|
||||
internal.WeightedSampleWithoutReplacement(unknownCandidates, min(len(unknownCandidates), syncCandidates)),
|
||||
internal.WeightedSampleWithoutReplacement(unreachableCandidates, min(len(unreachableCandidates), syncCandidates)),
|
||||
)
|
||||
}
|
||||
|
||||
@ -342,10 +341,6 @@ func (e *DataClockConsensusEngine) syncWithPeer(
|
||||
}()
|
||||
|
||||
syncTimeout := e.config.Engine.SyncTimeout
|
||||
if syncTimeout == 0 {
|
||||
syncTimeout = defaultSyncTimeout
|
||||
}
|
||||
|
||||
dialCtx, cancelDial := context.WithTimeout(e.ctx, syncTimeout)
|
||||
defer cancelDial()
|
||||
cc, err := e.pubSub.GetDirectChannel(dialCtx, peerId, "sync")
|
||||
@ -371,7 +366,10 @@ func (e *DataClockConsensusEngine) syncWithPeer(
|
||||
&protobufs.GetDataFrameRequest{
|
||||
FrameNumber: latest.FrameNumber + 1,
|
||||
},
|
||||
grpc.MaxCallRecvMsgSize(600*1024*1024),
|
||||
// The message size limits are swapped because the server is the one
|
||||
// sending the data.
|
||||
grpc.MaxCallRecvMsgSize(e.config.Engine.SyncMessageLimits.MaxSendMsgSize),
|
||||
grpc.MaxCallSendMsgSize(e.config.Engine.SyncMessageLimits.MaxRecvMsgSize),
|
||||
)
|
||||
cancelGet()
|
||||
if err != nil {
|
||||
|
||||
@ -72,6 +72,8 @@ type DataClockConsensusEngine struct {
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
|
||||
grpcServers []*grpc.Server
|
||||
|
||||
lastProven uint64
|
||||
difficulty uint32
|
||||
config *config.Config
|
||||
@ -218,21 +220,11 @@ func NewDataClockConsensusEngine(
|
||||
panic(errors.New("peer info manager is nil"))
|
||||
}
|
||||
|
||||
minimumPeersRequired := cfg.Engine.MinimumPeersRequired
|
||||
if minimumPeersRequired == 0 {
|
||||
minimumPeersRequired = 3
|
||||
}
|
||||
|
||||
difficulty := cfg.Engine.Difficulty
|
||||
if difficulty == 0 {
|
||||
difficulty = 160000
|
||||
}
|
||||
|
||||
rateLimit := cfg.P2P.GrpcServerRateLimit
|
||||
if rateLimit == 0 {
|
||||
rateLimit = 10
|
||||
}
|
||||
|
||||
clockFrameFragmentBuffer, err := fragmentation.NewClockFrameFragmentCircularBuffer(
|
||||
fragmentation.NewReedSolomonClockFrameFragmentBuffer,
|
||||
16,
|
||||
@ -270,7 +262,7 @@ func NewDataClockConsensusEngine(
|
||||
syncingStatus: SyncStatusNotSyncing,
|
||||
peerMap: map[string]*peerInfo{},
|
||||
uncooperativePeersMap: map[string]*peerInfo{},
|
||||
minimumPeersRequired: minimumPeersRequired,
|
||||
minimumPeersRequired: cfg.Engine.MinimumPeersRequired,
|
||||
report: report,
|
||||
frameProver: frameProver,
|
||||
masterTimeReel: masterTimeReel,
|
||||
@ -283,7 +275,7 @@ func NewDataClockConsensusEngine(
|
||||
config: cfg,
|
||||
preMidnightMint: map[string]struct{}{},
|
||||
grpcRateLimiter: NewRateLimiter(
|
||||
rateLimit,
|
||||
cfg.P2P.GRPCServerRateLimit,
|
||||
time.Minute,
|
||||
),
|
||||
requestSyncCh: make(chan struct{}, 1),
|
||||
@ -349,38 +341,40 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
|
||||
e.pubSub.Subscribe(e.frameFragmentFilter, e.handleFrameFragmentMessage)
|
||||
e.pubSub.Subscribe(e.txFilter, e.handleTxMessage)
|
||||
e.pubSub.Subscribe(e.infoFilter, e.handleInfoMessage)
|
||||
|
||||
syncServer := qgrpc.NewServer(
|
||||
grpc.MaxRecvMsgSize(e.config.Engine.SyncMessageLimits.MaxRecvMsgSize),
|
||||
grpc.MaxSendMsgSize(e.config.Engine.SyncMessageLimits.MaxSendMsgSize),
|
||||
)
|
||||
e.grpcServers = append(e.grpcServers[:0:0], syncServer)
|
||||
protobufs.RegisterDataServiceServer(syncServer, e)
|
||||
go func() {
|
||||
server := qgrpc.NewServer(
|
||||
grpc.MaxSendMsgSize(40*1024*1024),
|
||||
grpc.MaxRecvMsgSize(40*1024*1024),
|
||||
)
|
||||
protobufs.RegisterDataServiceServer(server, e)
|
||||
if err := e.pubSub.StartDirectChannelListener(
|
||||
e.pubSub.GetPeerID(),
|
||||
"sync",
|
||||
server,
|
||||
syncServer,
|
||||
); err != nil {
|
||||
panic(err)
|
||||
e.logger.Error("error starting sync server", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
if e.dataTimeReel.GetFrameProverTries()[0].Contains(e.provingKeyAddress) {
|
||||
server := qgrpc.NewServer(
|
||||
grpc.MaxSendMsgSize(1*1024*1024),
|
||||
grpc.MaxRecvMsgSize(1*1024*1024),
|
||||
)
|
||||
protobufs.RegisterDataServiceServer(server, e)
|
||||
|
||||
if e.FrameProverTrieContains(0, e.provingKeyAddress) {
|
||||
workerServer := qgrpc.NewServer(
|
||||
grpc.MaxSendMsgSize(1*1024*1024),
|
||||
grpc.MaxRecvMsgSize(1*1024*1024),
|
||||
)
|
||||
e.grpcServers = append(e.grpcServers, workerServer)
|
||||
protobufs.RegisterDataServiceServer(workerServer, e)
|
||||
go func() {
|
||||
if err := e.pubSub.StartDirectChannelListener(
|
||||
e.pubSub.GetPeerID(),
|
||||
"worker",
|
||||
server,
|
||||
workerServer,
|
||||
); err != nil {
|
||||
panic(err)
|
||||
e.logger.Error("error starting worker server", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}()
|
||||
}()
|
||||
}
|
||||
|
||||
e.stateMx.Lock()
|
||||
e.state = consensus.EngineStateCollecting
|
||||
@ -661,6 +655,16 @@ func (e *DataClockConsensusEngine) PerformTimeProof(
|
||||
}
|
||||
|
||||
func (e *DataClockConsensusEngine) Stop(force bool) <-chan error {
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(len(e.grpcServers))
|
||||
for _, server := range e.grpcServers {
|
||||
go func(server *grpc.Server) {
|
||||
defer wg.Done()
|
||||
server.GracefulStop()
|
||||
}(server)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
e.logger.Info("stopping ceremony consensus engine")
|
||||
e.cancel()
|
||||
e.wg.Wait()
|
||||
@ -684,7 +688,6 @@ func (e *DataClockConsensusEngine) Stop(force bool) <-chan error {
|
||||
e.logger.Warn("error publishing prover pause", zap.Error(err))
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(len(e.executionEngines))
|
||||
executionErrors := make(chan error, len(e.executionEngines))
|
||||
for name := range e.executionEngines {
|
||||
@ -876,14 +879,6 @@ func (
|
||||
zap.Uint32("client", index),
|
||||
)
|
||||
|
||||
if e.config.Engine.DataWorkerBaseListenMultiaddr == "" {
|
||||
e.config.Engine.DataWorkerBaseListenMultiaddr = "/ip4/127.0.0.1/tcp/%d"
|
||||
}
|
||||
|
||||
if e.config.Engine.DataWorkerBaseListenPort == 0 {
|
||||
e.config.Engine.DataWorkerBaseListenPort = 40000
|
||||
}
|
||||
|
||||
ma, err := multiaddr.NewMultiaddr(
|
||||
fmt.Sprintf(
|
||||
e.config.Engine.DataWorkerBaseListenMultiaddr,
|
||||
@ -988,14 +983,6 @@ func (e *DataClockConsensusEngine) createParallelDataClientsFromBaseMultiaddr(
|
||||
zap.Int("parallelism", parallelism),
|
||||
)
|
||||
|
||||
if e.config.Engine.DataWorkerBaseListenMultiaddr == "" {
|
||||
e.config.Engine.DataWorkerBaseListenMultiaddr = "/ip4/127.0.0.1/tcp/%d"
|
||||
}
|
||||
|
||||
if e.config.Engine.DataWorkerBaseListenPort == 0 {
|
||||
e.config.Engine.DataWorkerBaseListenPort = 40000
|
||||
}
|
||||
|
||||
clients := make([]protobufs.DataIPCServiceClient, parallelism)
|
||||
|
||||
for i := 0; i < parallelism; i++ {
|
||||
|
||||
@ -1,13 +0,0 @@
|
||||
package data
|
||||
|
||||
import "go.uber.org/zap"
|
||||
|
||||
func (e *DataClockConsensusEngine) pruneFrames(maxFrame uint64) error {
|
||||
e.logger.Info("pruning frames", zap.Uint64("max_frame_to_prune", maxFrame))
|
||||
err := e.clockStore.DeleteDataClockFrameRange(e.filter, 1, maxFrame)
|
||||
if err != nil {
|
||||
e.logger.Error("failed to prune frames", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -103,6 +103,10 @@ func (e *DataClockConsensusEngine) runFramePruning() {
|
||||
|
||||
e.logger.Info("frame pruning enabled, waiting for delay timeout expiry")
|
||||
|
||||
from := uint64(1)
|
||||
maxFrames := uint64(e.config.Engine.MaxFrames)
|
||||
batchSize := uint64(1000)
|
||||
outer:
|
||||
for {
|
||||
select {
|
||||
case <-e.ctx.Done():
|
||||
@ -113,15 +117,34 @@ func (e *DataClockConsensusEngine) runFramePruning() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if head.FrameNumber < uint64(e.config.Engine.MaxFrames)+1 ||
|
||||
if head.FrameNumber <= maxFrames ||
|
||||
head.FrameNumber <= application.PROOF_FRAME_SENIORITY_REPAIR+1 {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := e.pruneFrames(
|
||||
head.FrameNumber - uint64(e.config.Engine.MaxFrames),
|
||||
); err != nil {
|
||||
e.logger.Error("could not prune", zap.Error(err))
|
||||
to := head.FrameNumber - maxFrames
|
||||
for i := from; i < to; i += batchSize {
|
||||
start, stop := i, min(i+batchSize, to)
|
||||
if err := e.clockStore.DeleteDataClockFrameRange(e.filter, start, stop); err != nil {
|
||||
e.logger.Error(
|
||||
"failed to prune frames",
|
||||
zap.Error(err),
|
||||
zap.Uint64("from", start),
|
||||
zap.Uint64("to", stop),
|
||||
)
|
||||
continue outer
|
||||
}
|
||||
e.logger.Info(
|
||||
"pruned frames",
|
||||
zap.Uint64("from", start),
|
||||
zap.Uint64("to", stop),
|
||||
)
|
||||
select {
|
||||
case <-e.ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
from = stop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -110,7 +110,7 @@ func (e *DataClockConsensusEngine) runTxMessageHandler() {
|
||||
continue
|
||||
}
|
||||
|
||||
if e.frameProverTries[0].Contains(e.provingKeyAddress) {
|
||||
if e.FrameProverTrieContains(0, e.provingKeyAddress) {
|
||||
wg := &sync.WaitGroup{}
|
||||
for name := range e.executionEngines {
|
||||
name := name
|
||||
|
||||
@ -35,7 +35,7 @@ func (e *DataClockConsensusEngine) GetDataFrame(
|
||||
if !ok {
|
||||
return nil, status.Error(codes.Internal, "remote peer ID not found")
|
||||
}
|
||||
if e.config.P2P.GrpcServerRateLimit != -1 {
|
||||
if e.config.P2P.GRPCServerRateLimit != -1 {
|
||||
if err := e.grpcRateLimiter.Allow(peerID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -69,6 +69,7 @@ func (e *MasterClockConsensusEngine) Sync(
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "sync")
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
response := []*protobufs.ClockFrame{}
|
||||
|
||||
@ -81,10 +82,6 @@ func (e *MasterClockConsensusEngine) Sync(
|
||||
response = append(response, frame)
|
||||
}
|
||||
|
||||
if err = iter.Close(); err != nil {
|
||||
return errors.Wrap(err, "sync")
|
||||
}
|
||||
|
||||
if len(response) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
47
node/main.go
47
node/main.go
@ -144,6 +144,11 @@ var (
|
||||
true,
|
||||
"when enabled, frame execution validation is skipped",
|
||||
)
|
||||
compactDB = flag.Bool(
|
||||
"compact-db",
|
||||
false,
|
||||
"compacts the database and exits",
|
||||
)
|
||||
)
|
||||
|
||||
func signatureCheckDefault() bool {
|
||||
@ -330,6 +335,17 @@ func main() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if *compactDB && *core == 0 {
|
||||
db := store.NewPebbleDB(nodeConfig.DB)
|
||||
if err := db.CompactAll(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := db.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if *network != 0 {
|
||||
if nodeConfig.P2P.BootstrapPeers[0] == config.BootstrapPeers[0] {
|
||||
fmt.Println(
|
||||
@ -384,15 +400,6 @@ func main() {
|
||||
return
|
||||
}
|
||||
|
||||
if nodeConfig.Engine.DataWorkerBaseListenMultiaddr == "" {
|
||||
nodeConfig.Engine.DataWorkerBaseListenMultiaddr = "/ip4/127.0.0.1/tcp/%d"
|
||||
}
|
||||
if nodeConfig.Engine.DataWorkerBaseListenPort == 0 {
|
||||
nodeConfig.Engine.DataWorkerBaseListenPort = 40000
|
||||
}
|
||||
if nodeConfig.Engine.DataWorkerMemoryLimit == 0 {
|
||||
nodeConfig.Engine.DataWorkerMemoryLimit = 1792 * 1024 * 1024 // 1.75GiB
|
||||
}
|
||||
if len(nodeConfig.Engine.DataWorkerMultiaddrs) == 0 {
|
||||
maxProcs, numCPU := runtime.GOMAXPROCS(0), runtime.NumCPU()
|
||||
if maxProcs > numCPU && !nodeConfig.Engine.AllowExcessiveGOMAXPROCS {
|
||||
@ -460,6 +467,9 @@ func main() {
|
||||
if _, explicitGOMEMLIMIT := os.LookupEnv("GOMEMLIMIT"); !explicitGOMEMLIMIT {
|
||||
rdebug.SetMemoryLimit(availableOverhead * 8 / 10)
|
||||
}
|
||||
if _, explicitGOGC := os.LookupEnv("GOGC"); !explicitGOGC {
|
||||
rdebug.SetGCPercent(10)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -467,6 +477,7 @@ func main() {
|
||||
|
||||
if !*integrityCheck {
|
||||
go spawnDataWorkers(nodeConfig)
|
||||
defer stopDataWorkers()
|
||||
}
|
||||
|
||||
kzg.Init()
|
||||
@ -510,6 +521,9 @@ func main() {
|
||||
|
||||
// runtime.GOMAXPROCS(1)
|
||||
|
||||
node.Start()
|
||||
defer node.Stop()
|
||||
|
||||
if nodeConfig.ListenGRPCMultiaddr != "" {
|
||||
srv, err := rpc.NewRPCServer(
|
||||
nodeConfig.ListenGRPCMultiaddr,
|
||||
@ -526,20 +540,13 @@ func main() {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
err := srv.Start()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
if err := srv.Start(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer srv.Stop()
|
||||
}
|
||||
|
||||
node.Start()
|
||||
|
||||
<-done
|
||||
stopDataWorkers()
|
||||
node.Stop()
|
||||
}
|
||||
|
||||
var dataWorkers []*exec.Cmd
|
||||
|
||||
@ -45,26 +45,13 @@ import (
|
||||
"source.quilibrium.com/quilibrium/monorepo/node/config"
|
||||
qgrpc "source.quilibrium.com/quilibrium/monorepo/node/internal/grpc"
|
||||
"source.quilibrium.com/quilibrium/monorepo/node/internal/observability"
|
||||
qruntime "source.quilibrium.com/quilibrium/monorepo/node/internal/runtime"
|
||||
"source.quilibrium.com/quilibrium/monorepo/node/p2p/internal"
|
||||
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
|
||||
)
|
||||
|
||||
// The default watermarks are the defaults used by libp2p.DefaultConnectionManager.
|
||||
// We explicitly set them here in order to force internal consistency between the
|
||||
// connection manager and the resource manager.
|
||||
const (
|
||||
defaultLowWatermarkConnections = 160
|
||||
defaultHighWatermarkConnections = 192
|
||||
defaultMinBootstrapPeers = 3
|
||||
defaultBootstrapParallelism = 10
|
||||
defaultDiscoveryParallelism = 50
|
||||
defaultDiscoveryPeerLookupLimit = 1000
|
||||
defaultPingTimeout = 5 * time.Second
|
||||
defaultPingPeriod = 30 * time.Second
|
||||
defaultPingAttempts = 3
|
||||
DecayInterval = 10 * time.Second
|
||||
AppDecay = .9
|
||||
DecayInterval = 10 * time.Second
|
||||
AppDecay = .9
|
||||
)
|
||||
|
||||
type appScore struct {
|
||||
@ -192,7 +179,6 @@ func NewBlossomSub(
|
||||
logger *zap.Logger,
|
||||
) *BlossomSub {
|
||||
ctx := context.Background()
|
||||
p2pConfig = withDefaults(p2pConfig)
|
||||
|
||||
opts := []libp2pconfig.Option{
|
||||
libp2p.ListenAddrStrings(p2pConfig.ListenMultiaddr),
|
||||
@ -1043,144 +1029,6 @@ func (b *BlossomSub) SignMessage(msg []byte) ([]byte, error) {
|
||||
return sig, errors.Wrap(err, "sign message")
|
||||
}
|
||||
|
||||
func withDefaults(p2pConfig *config.P2PConfig) *config.P2PConfig {
|
||||
cfg := *p2pConfig
|
||||
p2pConfig = &cfg
|
||||
if p2pConfig.D == 0 {
|
||||
p2pConfig.D = blossomsub.BlossomSubD
|
||||
}
|
||||
if p2pConfig.DLo == 0 {
|
||||
p2pConfig.DLo = blossomsub.BlossomSubDlo
|
||||
}
|
||||
if p2pConfig.DHi == 0 {
|
||||
p2pConfig.DHi = blossomsub.BlossomSubDhi
|
||||
}
|
||||
if p2pConfig.DScore == 0 {
|
||||
p2pConfig.DScore = blossomsub.BlossomSubDscore
|
||||
}
|
||||
if p2pConfig.DOut == 0 {
|
||||
p2pConfig.DOut = blossomsub.BlossomSubDout
|
||||
}
|
||||
if p2pConfig.HistoryLength == 0 {
|
||||
p2pConfig.HistoryLength = blossomsub.BlossomSubHistoryLength
|
||||
}
|
||||
if p2pConfig.HistoryGossip == 0 {
|
||||
p2pConfig.HistoryGossip = blossomsub.BlossomSubHistoryGossip
|
||||
}
|
||||
if p2pConfig.DLazy == 0 {
|
||||
p2pConfig.DLazy = blossomsub.BlossomSubDlazy
|
||||
}
|
||||
if p2pConfig.GossipFactor == 0 {
|
||||
p2pConfig.GossipFactor = blossomsub.BlossomSubGossipFactor
|
||||
}
|
||||
if p2pConfig.GossipRetransmission == 0 {
|
||||
p2pConfig.GossipRetransmission = blossomsub.BlossomSubGossipRetransmission
|
||||
}
|
||||
if p2pConfig.HeartbeatInitialDelay == 0 {
|
||||
p2pConfig.HeartbeatInitialDelay = blossomsub.BlossomSubHeartbeatInitialDelay
|
||||
}
|
||||
if p2pConfig.HeartbeatInterval == 0 {
|
||||
p2pConfig.HeartbeatInterval = blossomsub.BlossomSubHeartbeatInterval
|
||||
}
|
||||
if p2pConfig.FanoutTTL == 0 {
|
||||
p2pConfig.FanoutTTL = blossomsub.BlossomSubFanoutTTL
|
||||
}
|
||||
if p2pConfig.PrunePeers == 0 {
|
||||
p2pConfig.PrunePeers = blossomsub.BlossomSubPrunePeers
|
||||
}
|
||||
if p2pConfig.PruneBackoff == 0 {
|
||||
p2pConfig.PruneBackoff = blossomsub.BlossomSubPruneBackoff
|
||||
}
|
||||
if p2pConfig.UnsubscribeBackoff == 0 {
|
||||
p2pConfig.UnsubscribeBackoff = blossomsub.BlossomSubUnsubscribeBackoff
|
||||
}
|
||||
if p2pConfig.Connectors == 0 {
|
||||
p2pConfig.Connectors = blossomsub.BlossomSubConnectors
|
||||
}
|
||||
if p2pConfig.MaxPendingConnections == 0 {
|
||||
p2pConfig.MaxPendingConnections = blossomsub.BlossomSubMaxPendingConnections
|
||||
}
|
||||
if p2pConfig.ConnectionTimeout == 0 {
|
||||
p2pConfig.ConnectionTimeout = blossomsub.BlossomSubConnectionTimeout
|
||||
}
|
||||
if p2pConfig.DirectConnectTicks == 0 {
|
||||
p2pConfig.DirectConnectTicks = blossomsub.BlossomSubDirectConnectTicks
|
||||
}
|
||||
if p2pConfig.DirectConnectInitialDelay == 0 {
|
||||
p2pConfig.DirectConnectInitialDelay =
|
||||
blossomsub.BlossomSubDirectConnectInitialDelay
|
||||
}
|
||||
if p2pConfig.OpportunisticGraftTicks == 0 {
|
||||
p2pConfig.OpportunisticGraftTicks =
|
||||
blossomsub.BlossomSubOpportunisticGraftTicks
|
||||
}
|
||||
if p2pConfig.OpportunisticGraftPeers == 0 {
|
||||
p2pConfig.OpportunisticGraftPeers =
|
||||
blossomsub.BlossomSubOpportunisticGraftPeers
|
||||
}
|
||||
if p2pConfig.GraftFloodThreshold == 0 {
|
||||
p2pConfig.GraftFloodThreshold = blossomsub.BlossomSubGraftFloodThreshold
|
||||
}
|
||||
if p2pConfig.MaxIHaveLength == 0 {
|
||||
p2pConfig.MaxIHaveLength = blossomsub.BlossomSubMaxIHaveLength
|
||||
}
|
||||
if p2pConfig.MaxIHaveMessages == 0 {
|
||||
p2pConfig.MaxIHaveMessages = blossomsub.BlossomSubMaxIHaveMessages
|
||||
}
|
||||
if p2pConfig.MaxIDontWantMessages == 0 {
|
||||
p2pConfig.MaxIDontWantMessages = blossomsub.BlossomSubMaxIDontWantMessages
|
||||
}
|
||||
if p2pConfig.IWantFollowupTime == 0 {
|
||||
p2pConfig.IWantFollowupTime = blossomsub.BlossomSubIWantFollowupTime
|
||||
}
|
||||
if p2pConfig.IDontWantMessageThreshold == 0 {
|
||||
p2pConfig.IDontWantMessageThreshold = blossomsub.BlossomSubIDontWantMessageThreshold
|
||||
}
|
||||
if p2pConfig.IDontWantMessageTTL == 0 {
|
||||
p2pConfig.IDontWantMessageTTL = blossomsub.BlossomSubIDontWantMessageTTL
|
||||
}
|
||||
if p2pConfig.LowWatermarkConnections == 0 {
|
||||
p2pConfig.LowWatermarkConnections = defaultLowWatermarkConnections
|
||||
}
|
||||
if p2pConfig.HighWatermarkConnections == 0 {
|
||||
p2pConfig.HighWatermarkConnections = defaultHighWatermarkConnections
|
||||
}
|
||||
if p2pConfig.MinBootstrapPeers == 0 {
|
||||
p2pConfig.MinBootstrapPeers = defaultMinBootstrapPeers
|
||||
}
|
||||
if p2pConfig.BootstrapParallelism == 0 {
|
||||
p2pConfig.BootstrapParallelism = defaultBootstrapParallelism
|
||||
}
|
||||
if p2pConfig.DiscoveryParallelism == 0 {
|
||||
p2pConfig.DiscoveryParallelism = defaultDiscoveryParallelism
|
||||
}
|
||||
if p2pConfig.DiscoveryPeerLookupLimit == 0 {
|
||||
p2pConfig.DiscoveryPeerLookupLimit = defaultDiscoveryPeerLookupLimit
|
||||
}
|
||||
if p2pConfig.PingTimeout == 0 {
|
||||
p2pConfig.PingTimeout = defaultPingTimeout
|
||||
}
|
||||
if p2pConfig.PingPeriod == 0 {
|
||||
p2pConfig.PingPeriod = defaultPingPeriod
|
||||
}
|
||||
if p2pConfig.PingAttempts == 0 {
|
||||
p2pConfig.PingAttempts = defaultPingAttempts
|
||||
}
|
||||
if p2pConfig.ValidateQueueSize == 0 {
|
||||
p2pConfig.ValidateQueueSize = blossomsub.DefaultValidateQueueSize
|
||||
}
|
||||
if p2pConfig.ValidateWorkers == 0 {
|
||||
p2pConfig.ValidateWorkers = qruntime.WorkerCount(0, false)
|
||||
}
|
||||
if p2pConfig.SubscriptionQueueSize == 0 {
|
||||
p2pConfig.SubscriptionQueueSize = blossomsub.DefaultSubscriptionQueueSize
|
||||
}
|
||||
if p2pConfig.PeerOutboundQueueSize == 0 {
|
||||
p2pConfig.PeerOutboundQueueSize = blossomsub.DefaultPeerOutboundQueueSize
|
||||
}
|
||||
return p2pConfig
|
||||
}
|
||||
|
||||
func toBlossomSubParams(p2pConfig *config.P2PConfig) blossomsub.BlossomSubParams {
|
||||
return blossomsub.BlossomSubParams{
|
||||
D: p2pConfig.D,
|
||||
|
||||
@ -6,6 +6,7 @@ import (
|
||||
"math/big"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
|
||||
@ -43,6 +44,8 @@ type RPCServer struct {
|
||||
pubSub p2p.PubSub
|
||||
masterClock *master.MasterClockConsensusEngine
|
||||
executionEngines []execution.ExecutionEngine
|
||||
grpcServer *grpc.Server
|
||||
httpServer *http.Server
|
||||
}
|
||||
|
||||
// GetFrameInfo implements protobufs.NodeServiceServer.
|
||||
@ -94,21 +97,17 @@ func (r *RPCServer) GetFrames(
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "get frames")
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
frames := []*protobufs.ClockFrame{}
|
||||
for iter.First(); iter.Valid(); iter.Next() {
|
||||
frame, err := iter.Value()
|
||||
if err != nil {
|
||||
iter.Close()
|
||||
return nil, errors.Wrap(err, "get frames")
|
||||
}
|
||||
frames = append(frames, frame)
|
||||
}
|
||||
|
||||
if err := iter.Close(); err != nil {
|
||||
return nil, errors.Wrap(err, "get frames")
|
||||
}
|
||||
|
||||
return &protobufs.FramesResponse{
|
||||
TruncatedClockFrames: frames,
|
||||
}, nil
|
||||
@ -121,21 +120,17 @@ func (r *RPCServer) GetFrames(
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "get frame info")
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
frames := []*protobufs.ClockFrame{}
|
||||
for iter.First(); iter.Valid(); iter.Next() {
|
||||
frame, err := iter.TruncatedValue()
|
||||
if err != nil {
|
||||
iter.Close()
|
||||
return nil, errors.Wrap(err, "get frames")
|
||||
}
|
||||
frames = append(frames, frame)
|
||||
}
|
||||
|
||||
if err := iter.Close(); err != nil {
|
||||
return nil, errors.Wrap(err, "get frames")
|
||||
}
|
||||
|
||||
return &protobufs.FramesResponse{
|
||||
TruncatedClockFrames: frames,
|
||||
}, nil
|
||||
@ -384,7 +379,33 @@ func NewRPCServer(
|
||||
masterClock *master.MasterClockConsensusEngine,
|
||||
executionEngines []execution.ExecutionEngine,
|
||||
) (*RPCServer, error) {
|
||||
return &RPCServer{
|
||||
mg, err := multiaddr.NewMultiaddr(listenAddrGRPC)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "new rpc server")
|
||||
}
|
||||
mga, err := mn.ToNetAddr(mg)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "new rpc server")
|
||||
}
|
||||
|
||||
mux := runtime.NewServeMux()
|
||||
opts := qgrpc.ClientOptions(
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithDefaultCallOptions(
|
||||
grpc.MaxCallRecvMsgSize(600*1024*1024),
|
||||
grpc.MaxCallSendMsgSize(600*1024*1024),
|
||||
),
|
||||
)
|
||||
if err := protobufs.RegisterNodeServiceHandlerFromEndpoint(
|
||||
context.Background(),
|
||||
mux,
|
||||
mga.String(),
|
||||
opts,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rpcServer := &RPCServer{
|
||||
listenAddrGRPC: listenAddrGRPC,
|
||||
listenAddrHTTP: listenAddrHTTP,
|
||||
logger: logger,
|
||||
@ -395,17 +416,22 @@ func NewRPCServer(
|
||||
pubSub: pubSub,
|
||||
masterClock: masterClock,
|
||||
executionEngines: executionEngines,
|
||||
}, nil
|
||||
grpcServer: qgrpc.NewServer(
|
||||
grpc.MaxRecvMsgSize(600*1024*1024),
|
||||
grpc.MaxSendMsgSize(600*1024*1024),
|
||||
),
|
||||
httpServer: &http.Server{
|
||||
Handler: mux,
|
||||
},
|
||||
}
|
||||
|
||||
protobufs.RegisterNodeServiceServer(rpcServer.grpcServer, rpcServer)
|
||||
reflection.Register(rpcServer.grpcServer)
|
||||
|
||||
return rpcServer, nil
|
||||
}
|
||||
|
||||
func (r *RPCServer) Start() error {
|
||||
s := qgrpc.NewServer(
|
||||
grpc.MaxRecvMsgSize(600*1024*1024),
|
||||
grpc.MaxSendMsgSize(600*1024*1024),
|
||||
)
|
||||
protobufs.RegisterNodeServiceServer(s, r)
|
||||
reflection.Register(s)
|
||||
|
||||
mg, err := multiaddr.NewMultiaddr(r.listenAddrGRPC)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "start")
|
||||
@ -417,51 +443,42 @@ func (r *RPCServer) Start() error {
|
||||
}
|
||||
|
||||
go func() {
|
||||
if err := s.Serve(mn.NetListener(lis)); err != nil {
|
||||
panic(err)
|
||||
if err := r.grpcServer.Serve(mn.NetListener(lis)); err != nil {
|
||||
r.logger.Error("serve error", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
if r.listenAddrHTTP != "" {
|
||||
m, err := multiaddr.NewMultiaddr(r.listenAddrHTTP)
|
||||
mh, err := multiaddr.NewMultiaddr(r.listenAddrHTTP)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "start")
|
||||
}
|
||||
|
||||
ma, err := mn.ToNetAddr(m)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "start")
|
||||
}
|
||||
|
||||
mga, err := mn.ToNetAddr(mg)
|
||||
lis, err := mn.Listen(mh)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "start")
|
||||
}
|
||||
|
||||
go func() {
|
||||
mux := runtime.NewServeMux()
|
||||
opts := qgrpc.ClientOptions(
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithDefaultCallOptions(
|
||||
grpc.MaxCallRecvMsgSize(600*1024*1024),
|
||||
grpc.MaxCallSendMsgSize(600*1024*1024),
|
||||
),
|
||||
)
|
||||
|
||||
if err := protobufs.RegisterNodeServiceHandlerFromEndpoint(
|
||||
context.Background(),
|
||||
mux,
|
||||
mga.String(),
|
||||
opts,
|
||||
); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err := http.ListenAndServe(ma.String(), mux); err != nil {
|
||||
panic(err)
|
||||
if err := r.httpServer.Serve(mn.NetListener(lis)); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
r.logger.Error("serve error", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RPCServer) Stop() {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
r.grpcServer.GracefulStop()
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
r.httpServer.Shutdown(context.Background())
|
||||
}()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
@ -216,13 +216,13 @@ func (p *PebbleClockIterator) TruncatedValue() (
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "get truncated clock frame iterator value")
|
||||
}
|
||||
defer frameCloser.Close()
|
||||
if err := proto.Unmarshal(frameValue, frame); err != nil {
|
||||
return nil, errors.Wrap(
|
||||
errors.Wrap(err, ErrInvalidData.Error()),
|
||||
"get truncated clock frame iterator value",
|
||||
)
|
||||
}
|
||||
frameCloser.Close()
|
||||
} else {
|
||||
if err := proto.Unmarshal(value, frame); err != nil {
|
||||
return nil, errors.Wrap(
|
||||
@ -252,13 +252,13 @@ func (p *PebbleClockIterator) Value() (*protobufs.ClockFrame, error) {
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "get clock frame iterator value")
|
||||
}
|
||||
defer frameCloser.Close()
|
||||
if err := proto.Unmarshal(frameValue, frame); err != nil {
|
||||
return nil, errors.Wrap(
|
||||
errors.Wrap(err, ErrInvalidData.Error()),
|
||||
"get clock frame iterator value",
|
||||
)
|
||||
}
|
||||
defer frameCloser.Close()
|
||||
} else {
|
||||
if err := proto.Unmarshal(value, frame); err != nil {
|
||||
return nil, errors.Wrap(
|
||||
@ -469,8 +469,8 @@ func (p *PebbleClockStore) GetEarliestMasterClockFrame(
|
||||
|
||||
return nil, errors.Wrap(err, "get earliest master clock frame")
|
||||
}
|
||||
|
||||
defer closer.Close()
|
||||
|
||||
frameNumber := binary.BigEndian.Uint64(idxValue)
|
||||
frame, err := p.GetMasterClockFrame(filter, frameNumber)
|
||||
if err != nil {
|
||||
@ -492,8 +492,8 @@ func (p *PebbleClockStore) GetLatestMasterClockFrame(
|
||||
|
||||
return nil, errors.Wrap(err, "get latest master clock frame")
|
||||
}
|
||||
|
||||
defer closer.Close()
|
||||
|
||||
frameNumber := binary.BigEndian.Uint64(idxValue)
|
||||
frame, err := p.GetMasterClockFrame(filter, frameNumber)
|
||||
if err != nil {
|
||||
@ -516,11 +516,11 @@ func (p *PebbleClockStore) GetMasterClockFrame(
|
||||
|
||||
return nil, errors.Wrap(err, "get master clock frame")
|
||||
}
|
||||
defer closer.Close()
|
||||
|
||||
copied := make([]byte, len(value))
|
||||
copy(copied[:], value[:])
|
||||
|
||||
defer closer.Close()
|
||||
frame := &protobufs.ClockFrame{}
|
||||
frame.FrameNumber = frameNumber
|
||||
frame.Filter = filter
|
||||
@ -595,10 +595,8 @@ func (p *PebbleClockStore) PutMasterClockFrame(
|
||||
); err != nil {
|
||||
return errors.Wrap(err, "put master clock frame")
|
||||
}
|
||||
}
|
||||
|
||||
if err == nil && closer != nil {
|
||||
closer.Close()
|
||||
} else {
|
||||
_ = closer.Close()
|
||||
}
|
||||
|
||||
if err = txn.Set(
|
||||
@ -625,6 +623,7 @@ func (p *PebbleClockStore) GetDataClockFrame(
|
||||
|
||||
return nil, nil, errors.Wrap(err, "get data clock frame")
|
||||
}
|
||||
defer closer.Close()
|
||||
|
||||
frame := &protobufs.ClockFrame{}
|
||||
genesisFramePreIndex := false
|
||||
@ -641,14 +640,13 @@ func (p *PebbleClockStore) GetDataClockFrame(
|
||||
|
||||
return nil, nil, errors.Wrap(err, "get data clock frame")
|
||||
}
|
||||
defer frameCloser.Close()
|
||||
if err := proto.Unmarshal(frameValue, frame); err != nil {
|
||||
return nil, nil, errors.Wrap(
|
||||
errors.Wrap(err, ErrInvalidData.Error()),
|
||||
"get data clock frame",
|
||||
)
|
||||
}
|
||||
closer.Close()
|
||||
defer frameCloser.Close()
|
||||
} else {
|
||||
genesisFramePreIndex = frameNumber == 0
|
||||
if err := proto.Unmarshal(value, frame); err != nil {
|
||||
@ -657,7 +655,6 @@ func (p *PebbleClockStore) GetDataClockFrame(
|
||||
"get data clock frame",
|
||||
)
|
||||
}
|
||||
defer closer.Close()
|
||||
}
|
||||
|
||||
if !truncate {
|
||||
@ -674,15 +671,17 @@ func (p *PebbleClockStore) GetDataClockFrame(
|
||||
proverTrie := &tries.RollingFrecencyCritbitTrie{}
|
||||
trieData, closer, err := p.db.Get(clockProverTrieKey(filter, i, frameNumber))
|
||||
if err != nil {
|
||||
if !errors.Is(err, pebble.ErrNotFound) {
|
||||
return nil, nil, errors.Wrap(err, "get data clock frame")
|
||||
}
|
||||
break
|
||||
}
|
||||
defer closer.Close()
|
||||
|
||||
if err := proverTrie.Deserialize(trieData); err != nil {
|
||||
closer.Close()
|
||||
return nil, nil, errors.Wrap(err, "get latest data clock frame")
|
||||
}
|
||||
|
||||
closer.Close()
|
||||
i++
|
||||
proverTries = append(proverTries, proverTrie)
|
||||
}
|
||||
@ -726,7 +725,6 @@ func (p *PebbleClockStore) deleteAggregateProofs(
|
||||
for i := 0; i < len(frame.Input[516:])/74; i++ {
|
||||
commit := frame.Input[516+(i*74) : 516+((i+1)*74)]
|
||||
err := internalDeleteAggregateProof(
|
||||
p.db,
|
||||
txn,
|
||||
frame.AggregateProofs[i],
|
||||
commit,
|
||||
@ -757,7 +755,6 @@ func (p *PebbleClockStore) saveAggregateProofs(
|
||||
for i := 0; i < len(frame.Input[516:])/74; i++ {
|
||||
commit := frame.Input[516+(i*74) : 516+((i+1)*74)]
|
||||
err := internalPutAggregateProof(
|
||||
p.db,
|
||||
txn,
|
||||
frame.AggregateProofs[i],
|
||||
commit,
|
||||
@ -788,8 +785,8 @@ func (p *PebbleClockStore) GetEarliestDataClockFrame(
|
||||
|
||||
return nil, errors.Wrap(err, "get earliest data clock frame")
|
||||
}
|
||||
|
||||
defer closer.Close()
|
||||
|
||||
frameNumber := binary.BigEndian.Uint64(idxValue)
|
||||
frame, _, err := p.GetDataClockFrame(filter, frameNumber, false)
|
||||
if err != nil {
|
||||
@ -811,6 +808,7 @@ func (p *PebbleClockStore) GetLatestDataClockFrame(
|
||||
|
||||
return nil, nil, errors.Wrap(err, "get latest data clock frame")
|
||||
}
|
||||
defer closer.Close()
|
||||
|
||||
frameNumber := binary.BigEndian.Uint64(idxValue)
|
||||
frame, tries, err := p.GetDataClockFrame(filter, frameNumber, false)
|
||||
@ -822,8 +820,6 @@ func (p *PebbleClockStore) GetLatestDataClockFrame(
|
||||
return nil, nil, errors.Wrap(err, "get latest data clock frame")
|
||||
}
|
||||
|
||||
closer.Close()
|
||||
|
||||
return frame, tries, nil
|
||||
}
|
||||
|
||||
@ -843,6 +839,7 @@ func (p *PebbleClockStore) GetStagedDataClockFrame(
|
||||
}
|
||||
return nil, errors.Wrap(err, "get parent data clock frame")
|
||||
}
|
||||
defer closer.Close()
|
||||
|
||||
parent := &protobufs.ClockFrame{}
|
||||
if err := proto.Unmarshal(data, parent); err != nil {
|
||||
@ -858,10 +855,6 @@ func (p *PebbleClockStore) GetStagedDataClockFrame(
|
||||
}
|
||||
}
|
||||
|
||||
if closer != nil {
|
||||
closer.Close()
|
||||
}
|
||||
|
||||
return parent, nil
|
||||
}
|
||||
|
||||
@ -879,9 +872,9 @@ func (p *PebbleClockStore) GetStagedDataClockFramesForFrameNumber(
|
||||
}
|
||||
return nil, errors.Wrap(err, "get staged data clock frames")
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
frames := []*protobufs.ClockFrame{}
|
||||
|
||||
for iter.First(); iter.Valid(); iter.Next() {
|
||||
data := iter.Value()
|
||||
frame := &protobufs.ClockFrame{}
|
||||
@ -899,8 +892,6 @@ func (p *PebbleClockStore) GetStagedDataClockFramesForFrameNumber(
|
||||
frames = append(frames, frame)
|
||||
}
|
||||
|
||||
iter.Close()
|
||||
|
||||
return frames, nil
|
||||
}
|
||||
|
||||
@ -992,10 +983,8 @@ func (p *PebbleClockStore) CommitDataClockFrame(
|
||||
); err != nil {
|
||||
return errors.Wrap(err, "commit data clock frame")
|
||||
}
|
||||
}
|
||||
|
||||
if err == nil && closer != nil {
|
||||
closer.Close()
|
||||
} else {
|
||||
_ = closer.Close()
|
||||
}
|
||||
|
||||
if !backfill {
|
||||
@ -1058,36 +1047,99 @@ func (p *PebbleClockStore) DeleteDataClockFrameRange(
|
||||
for i := fromFrameNumber; i < toFrameNumber; i++ {
|
||||
frames, err := p.GetStagedDataClockFramesForFrameNumber(filter, i)
|
||||
if err != nil {
|
||||
continue
|
||||
if !errors.Is(err, ErrNotFound) && !errors.Is(err, ErrInvalidData) {
|
||||
_ = txn.Abort()
|
||||
return errors.Wrap(err, "delete data clock frame range")
|
||||
}
|
||||
frames = nil
|
||||
}
|
||||
|
||||
outer:
|
||||
for _, frame := range frames {
|
||||
err = p.deleteAggregateProofs(txn, frame)
|
||||
if err != nil {
|
||||
continue
|
||||
for _, ap := range frame.AggregateProofs {
|
||||
for _, inc := range ap.InclusionCommitments {
|
||||
// The commitments collide for very small frames, and as such we have to detect them early
|
||||
// and avoid deleting them. Common cases for such collisions are prover announcement messages
|
||||
// which do not contain the frame number, so their binary contents are equivalent between
|
||||
// multiple frames.
|
||||
if len(inc.Data) < 2048 {
|
||||
continue outer
|
||||
}
|
||||
if inc.TypeUrl == protobufs.IntrinsicExecutionOutputType {
|
||||
o := &protobufs.IntrinsicExecutionOutput{}
|
||||
if err := proto.Unmarshal(inc.Data, o); err != nil {
|
||||
_ = txn.Abort()
|
||||
return errors.Wrap(err, "delete data clock frame range")
|
||||
}
|
||||
// The commitments collide for empty frames, and as such we have to detect them early
|
||||
// and avoid deleting them.
|
||||
if len(o.Output) == 0 || len(o.Proof) == 0 {
|
||||
continue outer
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := p.deleteAggregateProofs(txn, frame); err != nil {
|
||||
if !errors.Is(err, pebble.ErrNotFound) {
|
||||
_ = txn.Abort()
|
||||
return errors.Wrap(err, "delete data clock frame range")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
err = txn.DeleteRange(
|
||||
if err := txn.DeleteRange(
|
||||
clockDataParentIndexKey(filter, i, bytes.Repeat([]byte{0x00}, 32)),
|
||||
clockDataParentIndexKey(filter, i, bytes.Repeat([]byte{0xff}, 32)),
|
||||
)
|
||||
if err != nil {
|
||||
continue
|
||||
); err != nil {
|
||||
if !errors.Is(err, pebble.ErrNotFound) {
|
||||
_ = txn.Abort()
|
||||
return errors.Wrap(err, "delete data clock frame range")
|
||||
}
|
||||
}
|
||||
|
||||
err = txn.Delete(clockDataFrameKey(filter, i))
|
||||
if err != nil {
|
||||
continue
|
||||
if err := txn.Delete(clockDataFrameKey(filter, i)); err != nil {
|
||||
if !errors.Is(err, pebble.ErrNotFound) {
|
||||
_ = txn.Abort()
|
||||
return errors.Wrap(err, "delete data clock frame range")
|
||||
}
|
||||
}
|
||||
|
||||
// The prover trie keys are not stored continuously with respect
|
||||
// to the same frame number. As such, we need to manually iterate
|
||||
// and discover such keys.
|
||||
for t := uint16(0); t <= 0xffff; t++ {
|
||||
_, closer, err := p.db.Get(clockProverTrieKey(filter, t, i))
|
||||
if err != nil {
|
||||
if !errors.Is(err, pebble.ErrNotFound) {
|
||||
_ = txn.Abort()
|
||||
return errors.Wrap(err, "delete data clock frame range")
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
_ = closer.Close()
|
||||
if err := txn.Delete(clockProverTrieKey(filter, t, i)); err != nil {
|
||||
_ = txn.Abort()
|
||||
return errors.Wrap(err, "delete data clock frame range")
|
||||
}
|
||||
}
|
||||
|
||||
if err := txn.DeleteRange(
|
||||
clockDataTotalDistanceKey(filter, i, bytes.Repeat([]byte{0x00}, 32)),
|
||||
clockDataTotalDistanceKey(filter, i, bytes.Repeat([]byte{0xff}, 32)),
|
||||
); err != nil {
|
||||
if !errors.Is(err, pebble.ErrNotFound) {
|
||||
_ = txn.Abort()
|
||||
return errors.Wrap(err, "delete data clock frame range")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err = txn.Commit(); err != nil {
|
||||
txn.Abort()
|
||||
if err := txn.Commit(); err != nil {
|
||||
return errors.Wrap(err, "delete data clock frame range")
|
||||
}
|
||||
|
||||
return errors.Wrap(err, "delete data clock frame range")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *PebbleClockStore) ResetMasterClockFrames(filter []byte) error {
|
||||
@ -1138,7 +1190,7 @@ func (p *PebbleClockStore) Compact(
|
||||
if bytes.Compare(version, config.GetVersion()) < 0 {
|
||||
cleared = false
|
||||
}
|
||||
closer.Close()
|
||||
defer closer.Close()
|
||||
}
|
||||
|
||||
if !cleared {
|
||||
@ -1190,11 +1242,10 @@ func (p *PebbleClockStore) Compact(
|
||||
|
||||
return errors.Wrap(err, "compact")
|
||||
}
|
||||
defer closer.Close()
|
||||
|
||||
last := binary.BigEndian.Uint64(idxValue)
|
||||
|
||||
closer.Close()
|
||||
|
||||
for frameNumber := uint64(1); frameNumber <= last; frameNumber++ {
|
||||
value, closer, err := p.db.Get(clockDataFrameKey(dataFilter, frameNumber))
|
||||
if err != nil {
|
||||
@ -1204,6 +1255,7 @@ func (p *PebbleClockStore) Compact(
|
||||
|
||||
return errors.Wrap(err, "compact")
|
||||
}
|
||||
defer closer.Close()
|
||||
|
||||
frame := &protobufs.ClockFrame{}
|
||||
|
||||
@ -1212,6 +1264,7 @@ func (p *PebbleClockStore) Compact(
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "compact")
|
||||
}
|
||||
defer frameCloser.Close()
|
||||
if err := proto.Unmarshal(frameValue, frame); err != nil {
|
||||
return errors.Wrap(err, "compact")
|
||||
}
|
||||
@ -1226,9 +1279,6 @@ func (p *PebbleClockStore) Compact(
|
||||
make([]byte, 32),
|
||||
)),
|
||||
)
|
||||
|
||||
closer.Close()
|
||||
frameCloser.Close()
|
||||
} else {
|
||||
if err := proto.Unmarshal(value, frame); err != nil {
|
||||
return errors.Wrap(err, "compact")
|
||||
@ -1255,14 +1305,15 @@ func (p *PebbleClockStore) Compact(
|
||||
make([]byte, 32),
|
||||
)),
|
||||
)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "compact")
|
||||
}
|
||||
|
||||
parents = append(parents,
|
||||
clockDataParentIndexKey(dataFilter, frameNumber, selector.FillBytes(
|
||||
make([]byte, 32),
|
||||
)),
|
||||
)
|
||||
|
||||
closer.Close()
|
||||
}
|
||||
|
||||
for i := 0; i < len(frame.Input[516:])/74; i++ {
|
||||
@ -1523,10 +1574,8 @@ func (p *PebbleClockStore) GetTotalDistance(
|
||||
|
||||
return nil, errors.Wrap(err, "get total distance")
|
||||
}
|
||||
|
||||
defer closer.Close()
|
||||
dist := new(big.Int).SetBytes(value)
|
||||
|
||||
return dist, nil
|
||||
}
|
||||
|
||||
@ -1564,7 +1613,6 @@ func (p *PebbleClockStore) GetPeerSeniorityMap(filter []byte) (
|
||||
if err = dec.Decode(&seniorityMap); err != nil {
|
||||
return nil, errors.Wrap(err, "get peer seniority map")
|
||||
}
|
||||
|
||||
return seniorityMap, nil
|
||||
}
|
||||
|
||||
@ -1612,9 +1660,12 @@ func (p *PebbleClockStore) SetProverTriesForFrame(
|
||||
clockProverTrieKey(frame.Filter, uint16(start), frame.FrameNumber),
|
||||
)
|
||||
if err != nil {
|
||||
if !errors.Is(err, pebble.ErrNotFound) {
|
||||
return errors.Wrap(err, "set prover tries for frame")
|
||||
}
|
||||
break
|
||||
}
|
||||
closer.Close()
|
||||
_ = closer.Close()
|
||||
|
||||
if err = p.db.Delete(
|
||||
clockProverTrieKey(frame.Filter, uint16(start), frame.FrameNumber),
|
||||
|
||||
@ -136,8 +136,8 @@ func (p *PebbleCoinStore) GetCoinsForOwner(
|
||||
err = errors.Wrap(err, "get coins for owner")
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
defer iter.Close()
|
||||
|
||||
frameNumbers := []uint64{}
|
||||
addresses := [][]byte{}
|
||||
coins := []*protobufs.Coin{}
|
||||
@ -176,8 +176,8 @@ func (p *PebbleCoinStore) GetPreCoinProofsForOwner(owner []byte) (
|
||||
err = errors.Wrap(err, "get pre coin proofs for owner")
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
defer iter.Close()
|
||||
|
||||
frameNumbers := []uint64{}
|
||||
proofs := []*protobufs.PreCoinProof{}
|
||||
for iter.First(); iter.Valid(); iter.Next() {
|
||||
@ -215,7 +215,6 @@ func (p *PebbleCoinStore) GetCoinByAddress(txn Transaction, address []byte) (
|
||||
err = errors.Wrap(err, "get coin by address")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer closer.Close()
|
||||
|
||||
coin := &protobufs.Coin{}
|
||||
@ -240,7 +239,6 @@ func (p *PebbleCoinStore) GetPreCoinProofByAddress(address []byte) (
|
||||
err = errors.Wrap(err, "get pre coin proof by address")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer closer.Close()
|
||||
|
||||
proof := &protobufs.PreCoinProof{}
|
||||
@ -386,9 +384,9 @@ func (p *PebbleCoinStore) GetLatestFrameProcessed() (uint64, error) {
|
||||
|
||||
return 0, errors.Wrap(err, "get latest frame processed")
|
||||
}
|
||||
defer closer.Close()
|
||||
|
||||
frameNumber := binary.BigEndian.Uint64(v)
|
||||
closer.Close()
|
||||
|
||||
return frameNumber, nil
|
||||
}
|
||||
@ -524,13 +522,12 @@ func (p *PebbleCoinStore) Migrate(filter []byte, genesisSeedHex string) error {
|
||||
}
|
||||
return p.internalMigrate(filter, seed)
|
||||
}
|
||||
defer closer.Close()
|
||||
|
||||
if !bytes.Equal(compare, seed) {
|
||||
return p.internalMigrate(filter, seed)
|
||||
}
|
||||
|
||||
closer.Close()
|
||||
|
||||
status, closer, err := p.db.Get(migrationKey())
|
||||
if err != nil {
|
||||
if !errors.Is(err, pebble.ErrNotFound) {
|
||||
|
||||
@ -139,8 +139,8 @@ func internalGetAggregateProof(
|
||||
|
||||
return nil, errors.Wrap(err, "get aggregate proof")
|
||||
}
|
||||
|
||||
defer closer.Close()
|
||||
|
||||
copied := make([]byte, len(value[8:]))
|
||||
limit := binary.BigEndian.Uint64(value[0:8])
|
||||
copy(copied, value[8:])
|
||||
@ -159,6 +159,7 @@ func internalGetAggregateProof(
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "get aggregate proof")
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
i := uint32(0)
|
||||
|
||||
@ -199,14 +200,11 @@ func internalGetAggregateProof(
|
||||
|
||||
return nil, errors.Wrap(err, "get aggregate proof")
|
||||
}
|
||||
defer dataCloser.Close()
|
||||
|
||||
segCopy := make([]byte, len(segValue))
|
||||
copy(segCopy, segValue)
|
||||
chunks = append(chunks, segCopy)
|
||||
|
||||
if err = dataCloser.Close(); err != nil {
|
||||
return nil, errors.Wrap(err, "get aggregate proof")
|
||||
}
|
||||
}
|
||||
|
||||
if string(url) == protobufs.IntrinsicExecutionOutputType {
|
||||
@ -236,10 +234,6 @@ func internalGetAggregateProof(
|
||||
i++
|
||||
}
|
||||
|
||||
if err = iter.Close(); err != nil {
|
||||
return nil, errors.Wrap(err, "get aggregate proof")
|
||||
}
|
||||
|
||||
return aggregate, nil
|
||||
}
|
||||
|
||||
@ -263,8 +257,8 @@ func internalListAggregateProofKeys(
|
||||
|
||||
return nil, nil, nil, errors.Wrap(err, "list aggregate proof")
|
||||
}
|
||||
|
||||
defer closer.Close()
|
||||
|
||||
copied := make([]byte, len(value[8:]))
|
||||
limit := binary.BigEndian.Uint64(value[0:8])
|
||||
copy(copied, value[8:])
|
||||
@ -278,6 +272,7 @@ func internalListAggregateProofKeys(
|
||||
|
||||
return nil, nil, nil, errors.Wrap(err, "list aggregate proof")
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
i := uint32(0)
|
||||
commits = append(commits, dataProofInclusionKey(filter, commitment, 0))
|
||||
@ -305,10 +300,6 @@ func internalListAggregateProofKeys(
|
||||
i++
|
||||
}
|
||||
|
||||
if err = iter.Close(); err != nil {
|
||||
return nil, nil, nil, errors.Wrap(err, "list aggregate proof")
|
||||
}
|
||||
|
||||
return proofs, commits, data, nil
|
||||
}
|
||||
|
||||
@ -326,17 +317,10 @@ func (p *PebbleDataProofStore) GetAggregateProof(
|
||||
}
|
||||
|
||||
func internalDeleteAggregateProof(
|
||||
db KVDB,
|
||||
txn Transaction,
|
||||
aggregateProof *protobufs.InclusionAggregateProof,
|
||||
commitment []byte,
|
||||
) error {
|
||||
buf := binary.BigEndian.AppendUint64(
|
||||
nil,
|
||||
uint64(len(aggregateProof.InclusionCommitments)),
|
||||
)
|
||||
buf = append(buf, aggregateProof.Proof...)
|
||||
|
||||
for i, inc := range aggregateProof.InclusionCommitments {
|
||||
var segments [][]byte
|
||||
if inc.TypeUrl == protobufs.IntrinsicExecutionOutputType {
|
||||
@ -378,7 +362,6 @@ func internalDeleteAggregateProof(
|
||||
}
|
||||
|
||||
func internalPutAggregateProof(
|
||||
db KVDB,
|
||||
txn Transaction,
|
||||
aggregateProof *protobufs.InclusionAggregateProof,
|
||||
commitment []byte,
|
||||
@ -447,7 +430,6 @@ func (p *PebbleDataProofStore) PutAggregateProof(
|
||||
commitment []byte,
|
||||
) error {
|
||||
return internalPutAggregateProof(
|
||||
p.db,
|
||||
txn,
|
||||
aggregateProof,
|
||||
commitment,
|
||||
@ -467,8 +449,8 @@ func (p *PebbleDataProofStore) GetDataTimeProof(
|
||||
err = errors.Wrap(err, "get data time proof")
|
||||
return
|
||||
}
|
||||
|
||||
defer closer.Close()
|
||||
|
||||
if len(data) < 24 {
|
||||
err = ErrInvalidData
|
||||
return
|
||||
@ -513,13 +495,10 @@ func (p *PebbleDataProofStore) GetTotalReward(
|
||||
|
||||
return nil, errors.Wrap(err, "get total difficulty sum")
|
||||
}
|
||||
defer closer.Close()
|
||||
|
||||
if len(prev) != 0 {
|
||||
reward.SetBytes(prev[4:])
|
||||
|
||||
if err = closer.Close(); err != nil {
|
||||
return nil, errors.Wrap(err, "get total difficulty sum")
|
||||
}
|
||||
}
|
||||
|
||||
return reward, nil
|
||||
@ -556,15 +535,12 @@ func (p *PebbleDataProofStore) PutDataTimeProof(
|
||||
if err != nil && (!errors.Is(err, pebble.ErrNotFound) || increment != 0) {
|
||||
return errors.Wrap(err, "put data time proof")
|
||||
}
|
||||
defer closer.Close()
|
||||
|
||||
if len(prev) != 0 {
|
||||
priorSum.SetBytes(prev[4:])
|
||||
prevIncrement := binary.BigEndian.Uint32(prev[:4])
|
||||
|
||||
if err = closer.Close(); err != nil {
|
||||
return errors.Wrap(err, "put data time proof")
|
||||
}
|
||||
|
||||
if prevIncrement != increment-1 {
|
||||
return errors.Wrap(errors.New("invalid increment"), "put data time proof")
|
||||
}
|
||||
@ -609,15 +585,13 @@ func (p *PebbleDataProofStore) GetLatestDataTimeProof(peerId []byte) (
|
||||
|
||||
return 0, 0, nil, errors.Wrap(err, "get latest data time proof")
|
||||
}
|
||||
defer closer.Close()
|
||||
|
||||
if len(prev) < 4 {
|
||||
return 0, 0, nil, ErrInvalidData
|
||||
}
|
||||
|
||||
increment = binary.BigEndian.Uint32(prev[:4])
|
||||
if err = closer.Close(); err != nil {
|
||||
return 0, 0, nil, errors.Wrap(err, "get latest data time proof")
|
||||
}
|
||||
|
||||
_, parallelism, _, output, err = p.GetDataTimeProof(peerId, increment)
|
||||
|
||||
|
||||
@ -281,15 +281,13 @@ func (t *InMemKVDBTransaction) DeleteRange(
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
for iter.First(); iter.Valid(); iter.Next() {
|
||||
t.changes = append(t.changes, InMemKVDBOperation{
|
||||
op: DeleteOperation,
|
||||
key: iter.Key(),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -416,6 +414,7 @@ func (d *InMemKVDB) DeleteRange(start, end []byte) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
for iter.First(); iter.Valid(); iter.Next() {
|
||||
err = d.Delete(iter.Key())
|
||||
|
||||
@ -270,6 +270,8 @@ func (p *PebbleKeyStore) IncludeProvingKey(
|
||||
staged, closer, err := p.db.Get(stagedProvingKeyKey(provingKey.PublicKey()))
|
||||
if err != nil && !errors.Is(err, ErrNotFound) {
|
||||
return errors.Wrap(err, "include proving key")
|
||||
} else if err == nil {
|
||||
defer closer.Close()
|
||||
}
|
||||
|
||||
if staged != nil {
|
||||
@ -279,9 +281,6 @@ func (p *PebbleKeyStore) IncludeProvingKey(
|
||||
return errors.Wrap(err, "include proving key")
|
||||
}
|
||||
}
|
||||
if err := closer.Close(); err != nil {
|
||||
return errors.Wrap(err, "include proving key")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -297,16 +296,13 @@ func (p *PebbleKeyStore) GetStagedProvingKey(
|
||||
|
||||
return nil, errors.Wrap(err, "get staged proving key")
|
||||
}
|
||||
defer closer.Close()
|
||||
|
||||
stagedKey := &protobufs.ProvingKeyAnnouncement{}
|
||||
if err = proto.Unmarshal(data, stagedKey); err != nil {
|
||||
return nil, errors.Wrap(err, "get staged proving key")
|
||||
}
|
||||
|
||||
if err := closer.Close(); err != nil {
|
||||
return nil, errors.Wrap(err, "get staged proving key")
|
||||
}
|
||||
|
||||
return stagedKey, nil
|
||||
}
|
||||
|
||||
@ -322,12 +318,9 @@ func (p *PebbleKeyStore) GetLatestKeyBundle(
|
||||
|
||||
return nil, errors.Wrap(err, "get latest key bundle")
|
||||
}
|
||||
defer closer.Close()
|
||||
frameNumber := binary.BigEndian.Uint64(value)
|
||||
|
||||
if err := closer.Close(); err != nil {
|
||||
return nil, errors.Wrap(err, "get latest key bundle")
|
||||
}
|
||||
|
||||
value, closer, err = p.db.Get(keyBundleKey(provingKey, frameNumber))
|
||||
if err != nil {
|
||||
if errors.Is(err, pebble.ErrNotFound) {
|
||||
@ -336,7 +329,6 @@ func (p *PebbleKeyStore) GetLatestKeyBundle(
|
||||
|
||||
return nil, errors.Wrap(err, "get latest key bundle")
|
||||
}
|
||||
|
||||
defer closer.Close()
|
||||
|
||||
announcement := &protobufs.InclusionCommitment{}
|
||||
@ -440,10 +432,8 @@ func (p *PebbleKeyStore) PutKeyBundle(
|
||||
); err != nil {
|
||||
return errors.Wrap(err, "put key bundle")
|
||||
}
|
||||
}
|
||||
|
||||
if err == nil && closer != nil {
|
||||
closer.Close()
|
||||
} else {
|
||||
_ = closer.Close()
|
||||
}
|
||||
|
||||
if err = txn.Set(
|
||||
|
||||
@ -74,10 +74,10 @@ func (d *PeerstoreDatastore) Get(
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
defer closer.Close()
|
||||
|
||||
out := make([]byte, len(val))
|
||||
copy(out[:], val[:])
|
||||
closer.Close()
|
||||
|
||||
return val, nil
|
||||
}
|
||||
@ -226,10 +226,10 @@ func (t *transaction) Get(
|
||||
}
|
||||
return nil, errors.Wrap(err, "get")
|
||||
}
|
||||
defer closer.Close()
|
||||
|
||||
out := make([]byte, len(b))
|
||||
copy(out[:], b[:])
|
||||
closer.Close()
|
||||
|
||||
return b, nil
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user