From d4a5e30ba2446faa0bfaf9303a5e61d4bee866be Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Tue, 12 Nov 2024 01:45:18 -0600 Subject: [PATCH] v2.0.3-p3 --- node/config/version.go | 2 +- node/consensus/data/main_data_loop.go | 28 ++++++++++----- node/consensus/time/data_time_reel.go | 6 ++++ node/consensus/time/data_time_reel_test.go | 1 + .../token/token_execution_engine.go | 6 ++++ .../intrinsics/token/token_genesis_test.go | 18 ++++++++++ node/main.go | 34 +++++++------------ 7 files changed, 64 insertions(+), 31 deletions(-) create mode 100644 node/execution/intrinsics/token/token_genesis_test.go diff --git a/node/config/version.go b/node/config/version.go index 1502f03..37494d7 100644 --- a/node/config/version.go +++ b/node/config/version.go @@ -36,7 +36,7 @@ func FormatVersion(version []byte) string { } func GetPatchNumber() byte { - return 0x02 + return 0x03 } func GetRCNumber() byte { diff --git a/node/consensus/data/main_data_loop.go b/node/consensus/data/main_data_loop.go index 6d6b295..8cb71de 100644 --- a/node/consensus/data/main_data_loop.go +++ b/node/consensus/data/main_data_loop.go @@ -41,6 +41,7 @@ func ( func (e *DataClockConsensusEngine) runLoop() { dataFrameCh := e.dataTimeReel.NewFrameCh() + runOnce := true for e.GetState() < consensus.EngineStateStopping { peerCount := e.pubSub.GetNetworkPeersCount() if peerCount < e.minimumPeersRequired { @@ -57,8 +58,26 @@ func (e *DataClockConsensusEngine) runLoop() { select { case dataFrame := <-dataFrameCh: + if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) { + if err = e.publishProof(dataFrame); err != nil { + e.logger.Error("could not publish", zap.Error(err)) + e.stateMx.Lock() + if e.state < consensus.EngineStateStopping { + e.state = consensus.EngineStateCollecting + } + e.stateMx.Unlock() + } + } latestFrame = e.processFrame(latestFrame, dataFrame) case <-time.After(20 * time.Second): + if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) && !runOnce { + continue + } + + if runOnce { + runOnce = false + } + dataFrame, err := e.dataTimeReel.Head() if err != nil { panic(err) @@ -118,15 +137,6 @@ func (e *DataClockConsensusEngine) processFrame( e.dataTimeReel.Insert(nextFrame, true) - if err = e.publishProof(nextFrame); err != nil { - e.logger.Error("could not publish", zap.Error(err)) - e.stateMx.Lock() - if e.state < consensus.EngineStateStopping { - e.state = consensus.EngineStateCollecting - } - e.stateMx.Unlock() - } - return nextFrame } else { if latestFrame.Timestamp > time.Now().UnixMilli()-30000 { diff --git a/node/consensus/time/data_time_reel.go b/node/consensus/time/data_time_reel.go index af03428..a2f3cb9 100644 --- a/node/consensus/time/data_time_reel.go +++ b/node/consensus/time/data_time_reel.go @@ -61,6 +61,7 @@ type DataTimeReel struct { newFrameCh chan *protobufs.ClockFrame badFrameCh chan *protobufs.ClockFrame done chan bool + alwaysSend bool } func NewDataTimeReel( @@ -80,6 +81,7 @@ func NewDataTimeReel( origin []byte, initialInclusionProof *crypto.InclusionAggregateProof, initialProverKeys [][]byte, + alwaysSend bool, ) *DataTimeReel { if filter == nil { panic("filter is nil") @@ -128,6 +130,7 @@ func NewDataTimeReel( newFrameCh: make(chan *protobufs.ClockFrame), badFrameCh: make(chan *protobufs.ClockFrame), done: make(chan bool), + alwaysSend: alwaysSend, } } @@ -664,6 +667,9 @@ func (d *DataTimeReel) setHead(frame *protobufs.ClockFrame, distance *big.Int) e d.head = frame d.headDistance = distance + if d.alwaysSend { + d.newFrameCh <- frame + } go func() { select { case d.newFrameCh <- frame: diff --git a/node/consensus/time/data_time_reel_test.go b/node/consensus/time/data_time_reel_test.go index d11d23b..61ce649 100644 --- a/node/consensus/time/data_time_reel_test.go +++ b/node/consensus/time/data_time_reel_test.go @@ -186,6 +186,7 @@ func TestDataTimeReel(t *testing.T) { Proof: []byte{}, }, pubKeys, + true, ) err = d.Start() diff --git a/node/execution/intrinsics/token/token_execution_engine.go b/node/execution/intrinsics/token/token_execution_engine.go index fbb7d84..2994934 100644 --- a/node/execution/intrinsics/token/token_execution_engine.go +++ b/node/execution/intrinsics/token/token_execution_engine.go @@ -221,6 +221,11 @@ func NewTokenExecutionEngine( peerSeniority: NewFromMap(peerSeniority), } + alwaysSend := false + if bytes.Equal(config.GetGenesis().Beacon, pubSub.GetPublicKey()) { + alwaysSend = true + } + dataTimeReel := time.NewDataTimeReel( intrinsicFilter, logger, @@ -250,6 +255,7 @@ func NewTokenExecutionEngine( origin, inclusionProof, proverKeys, + alwaysSend, ) e.clock = data.NewDataClockConsensusEngine( diff --git a/node/execution/intrinsics/token/token_genesis_test.go b/node/execution/intrinsics/token/token_genesis_test.go new file mode 100644 index 0000000..3d9918d --- /dev/null +++ b/node/execution/intrinsics/token/token_genesis_test.go @@ -0,0 +1,18 @@ +package token_test + +import ( + "testing" + + "github.com/iden3/go-iden3-crypto/poseidon" + "github.com/stretchr/testify/assert" + "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token" +) + +func TestRebuildPeerSeniority(t *testing.T) { + m, err := token.RebuildPeerSeniority(0) + assert.NoError(t, err) + peerId := "QmcKQjpQmLpbDsiif2MuakhHFyxWvqYauPsJDaXnLav7PJ" + b, _ := poseidon.HashBytes([]byte(peerId)) + a := m[string(b.FillBytes(make([]byte, 32)))] + assert.Equal(t, a, 0) +} diff --git a/node/main.go b/node/main.go index 5e4a703..ce7c31b 100644 --- a/node/main.go +++ b/node/main.go @@ -617,6 +617,13 @@ func RunForkRepairIfNeeded( } } + if err = txn.Commit(); err != nil { + txn.Abort() + + logger.Error("could not commit data", zap.Error(err)) + return + } + logger.Info("inserting valid frame starting at position 48995") type OverrideFrames struct { FrameData []byte `json:"frameData"` @@ -631,11 +638,11 @@ func RunForkRepairIfNeeded( for _, overrideFrame := range overrideFramesJson { override := &protobufs.ClockFrame{} if err := proto.Unmarshal(overrideFrame.FrameData, override); err != nil { - txn.Abort() logger.Error("could not unmarshal frame data", zap.Error(err)) return } + txn, _ := clockStore.NewTransaction() if err := overrideHead( txn, clockStore, @@ -647,13 +654,13 @@ func RunForkRepairIfNeeded( logger.Error("could not override frame data", zap.Error(err)) return } - } - if err = txn.Commit(); err != nil { - txn.Abort() + if err = txn.Commit(); err != nil { + txn.Abort() - logger.Error("could not commit data", zap.Error(err)) - return + logger.Error("could not commit data", zap.Error(err)) + return + } } } else { fmt.Println("No repair needed.") @@ -759,11 +766,6 @@ func processFrame( return nil, errors.Wrap(err, "process frame") } - logger.Debug( - "app outputs", - zap.Int("outputs", len(app.TokenOutputs.Outputs)), - ) - proverTrieJoinRequests := [][]byte{} proverTrieLeaveRequests := [][]byte{} @@ -879,22 +881,13 @@ func processFrame( txn.Abort() return nil, errors.Wrap(err, "process frame") } - fmt.Println(hex.EncodeToString(addr)) sen, ok := (*peerSeniority)[string(addr)] if !ok { - logger.Debug( - "peer announced with no seniority", - zap.String("peer_id", peerId), - ) continue } peer := new(big.Int).SetUint64(sen.GetSeniority()) if peer.Cmp(token.GetAggregatedSeniority([]string{peerId})) != 0 { - logger.Debug( - "peer announced but is already different seniority", - zap.String("peer_id", peerIds[0]), - ) mergeable = false break } @@ -951,7 +944,6 @@ func processFrame( txn.Abort() return nil, errors.Wrap(err, "process frame") } - fmt.Println(hex.EncodeToString(addr)) if _, ok := (*peerSeniority)[string(addr)]; !ok { (*peerSeniority)[string(addr)] = token.NewPeerSeniorityItem(20, string(addr)) } else {