From 7afe704a2f245a2862c9f76f953814f50d61eb95 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Fri, 7 Feb 2025 16:31:43 -0600 Subject: [PATCH] if testnet don't run migration, further parallelization --- .../token/token_execution_engine.go | 262 ++++++++---------- .../intrinsics/token/token_genesis.go | 22 +- 2 files changed, 134 insertions(+), 150 deletions(-) diff --git a/node/execution/intrinsics/token/token_execution_engine.go b/node/execution/intrinsics/token/token_execution_engine.go index e01dea8..6d40a96 100644 --- a/node/execution/intrinsics/token/token_execution_engine.go +++ b/node/execution/intrinsics/token/token_execution_engine.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "crypto" - "encoding/binary" "encoding/hex" "fmt" "math/big" @@ -171,27 +170,29 @@ func NewTokenExecutionEngine( } else if err != nil { panic(err) } else { - err := coinStore.Migrate( - intrinsicFilter, - config.GetGenesis().GenesisSeedHex, - ) - if err != nil { - panic(err) - } - _, err = clockStore.GetEarliestDataClockFrame(intrinsicFilter) - if err != nil && errors.Is(err, store.ErrNotFound) { - origin, inclusionProof, proverKeys, peerSeniority = CreateGenesisState( - logger, - cfg.Engine, - nil, - inclusionProver, - clockStore, - coinStore, - hypergraphStore, - hypergraph, - mpcithVerEnc, - uint(cfg.P2P.Network), + if pubSub.GetNetwork() == 0 { + err := coinStore.Migrate( + intrinsicFilter, + config.GetGenesis().GenesisSeedHex, ) + if err != nil { + panic(err) + } + _, err = clockStore.GetEarliestDataClockFrame(intrinsicFilter) + if err != nil && errors.Is(err, store.ErrNotFound) { + origin, inclusionProof, proverKeys, peerSeniority = CreateGenesisState( + logger, + cfg.Engine, + nil, + inclusionProver, + clockStore, + coinStore, + hypergraphStore, + hypergraph, + mpcithVerEnc, + uint(cfg.P2P.Network), + ) + } } } @@ -374,7 +375,7 @@ func NewTokenExecutionEngine( ) } - if e.hypergraph == nil { + if e.hypergraph == nil || len(e.hypergraph.GetVertexAdds()) == 0 { e.rebuildHypergraph() } } @@ -443,6 +444,50 @@ func NewTokenExecutionEngine( var _ execution.ExecutionEngine = (*TokenExecutionEngine)(nil) +func (e *TokenExecutionEngine) addBatchToHypergraph(batchKey [][]byte, batchValue [][]byte) { + var wg sync.WaitGroup + throttle := make(chan struct{}, runtime.NumCPU()) + batchCompressed := make([][]hypergraph.Encrypted, len(batchKey)) + for i, chunk := range batchValue { + throttle <- struct{}{} + wg.Add(1) + go func(chunk []byte, i int) { + defer func() { <-throttle }() + defer wg.Done() + e.logger.Debug( + "encrypting coin", + zap.String("address", hex.EncodeToString(batchKey[i])), + ) + data := e.mpcithVerEnc.EncryptAndCompress( + chunk, + config.GetGenesis().Beacon, + ) + compressed := []hypergraph.Encrypted{} + for _, d := range data { + compressed = append(compressed, d) + } + e.logger.Debug( + "encrypted coin", + zap.String("address", hex.EncodeToString(batchKey[i])), + ) + batchCompressed[i] = compressed + }(chunk, i) + } + wg.Wait() + + for i := range batchKey { + if err := e.hypergraph.AddVertex( + hypergraph.NewVertex( + [32]byte(application.TOKEN_ADDRESS), + [32]byte(batchKey[i]), + batchCompressed[i], + ), + ); err != nil { + panic(err) + } + } +} + func (e *TokenExecutionEngine) rebuildHypergraph() { e.logger.Info("rebuilding hypergraph") e.hypergraph = hypergraph.NewHypergraph() @@ -450,68 +495,40 @@ func (e *TokenExecutionEngine) rebuildHypergraph() { if err != nil { panic(err) } + var batchKey, batchValue [][]byte for iter.First(); iter.Valid(); iter.Next() { key := make([]byte, len(iter.Key()[2:])) copy(key, iter.Key()[2:]) - e.logger.Debug( - "encrypting coin", - zap.String("address", hex.EncodeToString(key)), - ) - data := e.mpcithVerEnc.EncryptAndCompress( - iter.Value(), - config.GetGenesis().Beacon, - ) - compressed := []hypergraph.Encrypted{} - for _, d := range data { - compressed = append(compressed, d) - } - e.logger.Debug( - "encrypted coin", - zap.String("address", hex.EncodeToString(key)), - ) - if err := e.hypergraph.AddVertex( - hypergraph.NewVertex( - [32]byte(application.TOKEN_ADDRESS), - [32]byte(key), - compressed, - ), - ); err != nil { + batchKey = append(batchKey, key) + + coin := &protobufs.Coin{} + err := proto.Unmarshal(iter.Value()[8:], coin) + if err != nil { panic(err) } + + value := []byte{} + value = append(value, iter.Value()[:8]...) + value = append(value, coin.Amount...) + // implicit + value = append(value, 0x00) + value = append(value, coin.Owner.GetImplicitAccount().GetAddress()...) + // domain len + value = append(value, 0x00) + value = append(value, coin.Intersection...) + batchValue = append(batchValue, value) + + if len(batchKey) == runtime.NumCPU() { + e.addBatchToHypergraph(batchKey, batchValue) + batchKey = [][]byte{} + batchValue = [][]byte{} + } } iter.Close() - iter, err = e.coinStore.RangePreCoinProofs() - if err != nil { - panic(err) + if len(batchKey) != 0 { + e.addBatchToHypergraph(batchKey, batchValue) } - for iter.First(); iter.Valid(); iter.Next() { - key := make([]byte, len(iter.Key()[2:])) - copy(key, iter.Key()[2:]) - e.logger.Debug( - "encrypting pre-coin proof", - zap.String("address", hex.EncodeToString(key)), - ) - data := e.mpcithVerEnc.EncryptAndCompress( - iter.Value(), - config.GetGenesis().Beacon, - ) - compressed := []hypergraph.Encrypted{} - for _, d := range data { - compressed = append(compressed, d) - } - if err := e.hypergraph.AddVertex( - hypergraph.NewVertex( - [32]byte(application.TOKEN_ADDRESS), - [32]byte(key), - compressed, - ), - ); err != nil { - panic(err) - } - } - iter.Close() - e.logger.Info("saving rebuilt hypergraph") txn, err := e.clockStore.NewTransaction(false) if err != nil { @@ -728,16 +745,22 @@ func (e *TokenExecutionEngine) ProcessFrame( txn.Abort() return nil, errors.Wrap(err, "process frame") } - coinBytes, err := proto.Marshal(o.Coin) - if err != nil { - panic(err) - } - data := []byte{} - data = binary.BigEndian.AppendUint64(data, 0) - data = append(data, coinBytes...) + value := []byte{} + value = append(value, make([]byte, 8)...) + value = append(value, o.Coin.Amount...) + // implicit + value = append(value, 0x00) + value = append( + value, + o.Coin.Owner.GetImplicitAccount().GetAddress()..., + ) + // domain len + value = append(value, 0x00) + value = append(value, o.Coin.Intersection...) + proofs := e.mpcithVerEnc.EncryptAndCompress( - data, + value, config.GetGenesis().Beacon, ) compressed := []hypergraph.Encrypted{} @@ -769,16 +792,22 @@ func (e *TokenExecutionEngine) ProcessFrame( txn.Abort() return nil, errors.Wrap(err, "process frame") } - coinBytes, err := proto.Marshal(coin) - if err != nil { - panic(err) - } - data := []byte{} - data = binary.BigEndian.AppendUint64(data, 0) - data = append(data, coinBytes...) + value := []byte{} + value = append(value, make([]byte, 8)...) + value = append(value, coin.Amount...) + // implicit + value = append(value, 0x00) + value = append( + value, + coin.Owner.GetImplicitAccount().GetAddress()..., + ) + // domain len + value = append(value, 0x00) + value = append(value, coin.Intersection...) + proofs := e.mpcithVerEnc.EncryptAndCompress( - data, + value, config.GetGenesis().Beacon, ) compressed := []hypergraph.Encrypted{} @@ -811,32 +840,7 @@ func (e *TokenExecutionEngine) ProcessFrame( txn.Abort() return nil, errors.Wrap(err, "process frame") } - proofBytes, err := proto.Marshal(o.Proof) - if err != nil { - panic(err) - } - data := []byte{} - data = binary.BigEndian.AppendUint64(data, 0) - data = append(data, proofBytes...) - proofs := e.mpcithVerEnc.EncryptAndCompress( - data, - config.GetGenesis().Beacon, - ) - compressed := []hypergraph.Encrypted{} - for _, d := range proofs { - compressed = append(compressed, d) - } - if err := hg.AddVertex( - hypergraph.NewVertex( - [32]byte(application.TOKEN_ADDRESS), - [32]byte(address), - compressed, - ), - ); err != nil { - txn.Abort() - panic(err) - } if len(o.Proof.Amount) == 32 && !bytes.Equal(o.Proof.Amount, make([]byte, 32)) && o.Proof.Commitment != nil { @@ -874,32 +878,6 @@ func (e *TokenExecutionEngine) ProcessFrame( txn.Abort() return nil, errors.Wrap(err, "process frame") } - proofBytes, err := proto.Marshal(o.DeletedProof) - if err != nil { - panic(err) - } - - data := []byte{} - data = binary.BigEndian.AppendUint64(data, 0) - data = append(data, proofBytes...) - proofs := e.mpcithVerEnc.EncryptAndCompress( - data, - config.GetGenesis().Beacon, - ) - compressed := []hypergraph.Encrypted{} - for _, d := range proofs { - compressed = append(compressed, d) - } - if err := hg.RemoveVertex( - hypergraph.NewVertex( - [32]byte(application.TOKEN_ADDRESS), - [32]byte(address), - compressed, - ), - ); err != nil { - txn.Abort() - panic(err) - } case *protobufs.TokenOutput_Announce: peerIds := []string{} for _, sig := range o.Announce.PublicKeySignaturesEd448 { diff --git a/node/execution/intrinsics/token/token_genesis.go b/node/execution/intrinsics/token/token_genesis.go index 9859716..e764070 100644 --- a/node/execution/intrinsics/token/token_genesis.go +++ b/node/execution/intrinsics/token/token_genesis.go @@ -872,16 +872,22 @@ func CreateGenesisState( if err != nil { panic(err) } - coinBytes, err := proto.Marshal(output.GetCoin()) - if err != nil { - panic(err) - } - data := []byte{} - data = binary.BigEndian.AppendUint64(data, 0) - data = append(data, coinBytes...) + value := []byte{} + value = append(value, make([]byte, 8)...) + value = append(value, output.GetCoin().Amount...) + // implicit + value = append(value, 0x00) + value = append( + value, + output.GetCoin().Owner.GetImplicitAccount().GetAddress()..., + ) + // domain len + value = append(value, 0x00) + value = append(value, output.GetCoin().Intersection...) + proofs := mpcithVerEnc.EncryptAndCompress( - data, + value, config.GetGenesis().Beacon, ) compressed := []hypergraph.Encrypted{}