parallelism of mints + don't overwrite map until commit time

This commit is contained in:
Cassandra Heart 2024-11-16 18:09:52 -06:00
parent ddb67eb85b
commit 08e7dad8c9
No known key found for this signature in database
GPG Key ID: 6352152859385958
5 changed files with 202 additions and 168 deletions

View File

@ -10,11 +10,11 @@ func GetMinimumVersionCutoff() time.Time {
}
func GetMinimumVersion() []byte {
return []byte{0x02, 0x00, 0x03}
return []byte{0x02, 0x00, 0x04}
}
func GetVersion() []byte {
return []byte{0x02, 0x00, 0x03}
return []byte{0x02, 0x00, 0x04}
}
func GetVersionString() string {
@ -36,9 +36,9 @@ func FormatVersion(version []byte) string {
}
func GetPatchNumber() byte {
return 0x04
return 0x00
}
func GetRCNumber() byte {
return 0x0a
return 0x00
}

View File

@ -1,8 +1,12 @@
package application
import (
"bytes"
"crypto"
"encoding/binary"
"sync"
"github.com/iden3/go-iden3-crypto/poseidon"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
@ -148,35 +152,82 @@ func (a *TokenApplication) ApplyTransitions(
seen := map[string]struct{}{}
for _, transition := range requests {
set := make([]*protobufs.TokenRequest, len(requests))
fails := make([]*protobufs.TokenRequest, len(set))
outputsSet := make([][]*protobufs.TokenOutput, len(set))
for i, transition := range requests {
i := i
switch t := transition.Request.(type) {
case *protobufs.TokenRequest_Mint:
ring, parallelism, err := t.Mint.RingAndParallelism(
func(addr []byte) int {
if _, ok := seen[string(addr)]; ok {
return -1
}
if t == nil || t.Mint.Proofs == nil || t.Mint.Signature == nil {
continue
}
ring := -1
for i, t := range a.Tries[1:] {
if t.Contains(addr) {
ring = i
seen[string(addr)] = struct{}{}
}
}
payload := []byte("mint")
for _, p := range t.Mint.Proofs {
payload = append(payload, p...)
}
if err := t.Mint.Signature.Verify(payload); err != nil {
continue
}
return ring
},
addr, err := poseidon.HashBytes(
t.Mint.Signature.PublicKey.KeyValue,
)
if err != nil {
continue
}
parallelismMap[ring] = parallelismMap[ring] + uint64(parallelism)
if len(t.Mint.Proofs) == 1 && a.Tries[0].Contains(
addr.FillBytes(make([]byte, 32)),
) && bytes.Equal(t.Mint.Signature.PublicKey.KeyValue, a.Beacon) {
if _, ok := seen[string(t.Mint.Proofs[0][32:])]; !ok {
set[i] = transition
seen[string(t.Mint.Proofs[0][32:])] = struct{}{}
}
} else if len(t.Mint.Proofs) >= 3 && currentFrameNumber > PROOF_FRAME_CUTOFF {
frameNumber := binary.BigEndian.Uint64(t.Mint.Proofs[2])
if frameNumber < currentFrameNumber-2 {
fails[i] = transition
continue
}
ring, parallelism, err := t.Mint.RingAndParallelism(
func(addr []byte) int {
if _, ok := seen[string(addr)]; ok {
return -1
}
ring := -1
for i, t := range a.Tries[1:] {
if t.Contains(addr) {
ring = i
seen[string(addr)] = struct{}{}
}
}
return ring
},
)
if err == nil {
// fmt.Println(i, "checked ring test")
set[i] = transition
parallelismMap[ring] = parallelismMap[ring] + uint64(parallelism)
} else {
// fmt.Println(i, "failed ring test", err)
fails[i] = transition
}
}
default:
set[i] = transition
}
}
for _, transition := range requests {
successes := make([]*protobufs.TokenRequest, len(set))
for i, transition := range set {
if transition == nil {
continue
}
req:
switch t := transition.Request.(type) {
case *protobufs.TokenRequest_Announce:
@ -194,11 +245,8 @@ func (a *TokenApplication) ApplyTransitions(
)
break req
}
outputs.Outputs = append(outputs.Outputs, success...)
finalizedTransitions.Requests = append(
finalizedTransitions.Requests,
transition,
)
outputsSet[i] = success
successes[i] = transition
case *protobufs.TokenRequest_Join:
success, err := a.handleDataAnnounceProverJoin(
currentFrameNumber,
@ -212,17 +260,11 @@ func (a *TokenApplication) ApplyTransitions(
"apply transitions",
)
}
failedTransitions.Requests = append(
failedTransitions.Requests,
transition,
)
fails[i] = transition
break req
}
outputs.Outputs = append(outputs.Outputs, success...)
finalizedTransitions.Requests = append(
finalizedTransitions.Requests,
transition,
)
outputsSet[i] = success
successes[i] = transition
case *protobufs.TokenRequest_Leave:
success, err := a.handleDataAnnounceProverLeave(
currentFrameNumber,
@ -236,17 +278,11 @@ func (a *TokenApplication) ApplyTransitions(
"apply transitions",
)
}
failedTransitions.Requests = append(
failedTransitions.Requests,
transition,
)
fails[i] = transition
break req
}
outputs.Outputs = append(outputs.Outputs, success...)
finalizedTransitions.Requests = append(
finalizedTransitions.Requests,
transition,
)
outputsSet[i] = success
successes[i] = transition
case *protobufs.TokenRequest_Resume:
success, err := a.handleDataAnnounceProverResume(
currentFrameNumber,
@ -260,17 +296,11 @@ func (a *TokenApplication) ApplyTransitions(
"apply transitions",
)
}
failedTransitions.Requests = append(
failedTransitions.Requests,
transition,
)
fails[i] = transition
break req
}
outputs.Outputs = append(outputs.Outputs, success...)
finalizedTransitions.Requests = append(
finalizedTransitions.Requests,
transition,
)
outputsSet[i] = success
successes[i] = transition
case *protobufs.TokenRequest_Pause:
success, err := a.handleDataAnnounceProverPause(
currentFrameNumber,
@ -284,17 +314,11 @@ func (a *TokenApplication) ApplyTransitions(
"apply transitions",
)
}
failedTransitions.Requests = append(
failedTransitions.Requests,
transition,
)
fails[i] = transition
break req
}
outputs.Outputs = append(outputs.Outputs, success...)
finalizedTransitions.Requests = append(
finalizedTransitions.Requests,
transition,
)
outputsSet[i] = success
successes[i] = transition
case *protobufs.TokenRequest_Merge:
success, err := a.handleMerge(currentFrameNumber, lockMap, t.Merge)
if err != nil {
@ -304,17 +328,11 @@ func (a *TokenApplication) ApplyTransitions(
"apply transitions",
)
}
failedTransitions.Requests = append(
failedTransitions.Requests,
transition,
)
fails[i] = transition
break req
}
outputs.Outputs = append(outputs.Outputs, success...)
finalizedTransitions.Requests = append(
finalizedTransitions.Requests,
transition,
)
outputsSet[i] = success
successes[i] = transition
case *protobufs.TokenRequest_Split:
success, err := a.handleSplit(currentFrameNumber, lockMap, t.Split)
if err != nil {
@ -324,17 +342,11 @@ func (a *TokenApplication) ApplyTransitions(
"apply transitions",
)
}
failedTransitions.Requests = append(
failedTransitions.Requests,
transition,
)
fails[i] = transition
break req
}
outputs.Outputs = append(outputs.Outputs, success...)
finalizedTransitions.Requests = append(
finalizedTransitions.Requests,
transition,
)
outputsSet[i] = success
successes[i] = transition
case *protobufs.TokenRequest_Transfer:
success, err := a.handleTransfer(currentFrameNumber, lockMap, t.Transfer)
if err != nil {
@ -344,48 +356,88 @@ func (a *TokenApplication) ApplyTransitions(
"apply transitions",
)
}
failedTransitions.Requests = append(
failedTransitions.Requests,
transition,
)
fails[i] = transition
break req
}
outputs.Outputs = append(outputs.Outputs, success...)
finalizedTransitions.Requests = append(
finalizedTransitions.Requests,
transition,
)
outputsSet[i] = success
successes[i] = transition
}
}
wg := sync.WaitGroup{}
for _, transition := range set {
if transition == nil {
continue
}
transition := transition
switch transition.Request.(type) {
case *protobufs.TokenRequest_Mint:
success, err := a.handleMint(
currentFrameNumber,
lockMap,
t.Mint,
frame,
parallelismMap,
)
if err != nil {
if !skipFailures {
return nil, nil, nil, errors.Wrap(
err,
"apply transitions",
)
}
failedTransitions.Requests = append(
failedTransitions.Requests,
transition,
wg.Add(1)
}
}
for i, transition := range set {
if transition == nil {
continue
}
i := i
transition := transition
switch t := transition.Request.(type) {
case *protobufs.TokenRequest_Mint:
go func() {
defer wg.Done()
success, err := a.handleMint(
currentFrameNumber,
t.Mint,
frame,
parallelismMap,
)
break req
if err != nil {
fails[i] = transition
return
}
outputsSet[i] = success
successes[i] = transition
}()
}
}
wg.Wait()
finalFails := []*protobufs.TokenRequest{}
for _, fail := range fails {
if fail != nil {
finalFails = append(finalFails, fail)
}
}
if len(finalFails) != 0 && !skipFailures {
return nil, nil, nil, errors.Wrap(
err,
"apply transitions",
)
}
finalSuccesses := []*protobufs.TokenRequest{}
for _, success := range successes {
if success != nil {
finalSuccesses = append(finalSuccesses, success)
}
}
outputs.Outputs = []*protobufs.TokenOutput{}
for _, out := range outputsSet {
if out != nil {
for _, o := range out {
outputs.Outputs = append(outputs.Outputs, o)
}
outputs.Outputs = append(outputs.Outputs, success...)
finalizedTransitions.Requests = append(
finalizedTransitions.Requests,
transition,
)
}
}
a.TokenOutputs = outputs
finalizedTransitions.Requests = finalSuccesses
failedTransitions.Requests = finalFails
return a, finalizedTransitions, failedTransitions, nil
}

View File

@ -18,13 +18,13 @@ import (
"source.quilibrium.com/quilibrium/monorepo/node/tries"
)
const PROOF_FRAME_CUTOFF = 46500
const PROOF_FRAME_RING_RESET = 52000
const PROOF_FRAME_RING_RESET_2 = 53028
// for tests, these need to be var
var PROOF_FRAME_CUTOFF = uint64(46500)
var PROOF_FRAME_RING_RESET = uint64(52000)
var PROOF_FRAME_RING_RESET_2 = uint64(53028)
func (a *TokenApplication) handleMint(
currentFrameNumber uint64,
lockMap map[string]struct{},
t *protobufs.MintCoinRequest,
frame *protobufs.ClockFrame,
parallelismMap map[int]uint64,
@ -72,10 +72,6 @@ func (a *TokenApplication) handleMint(
return nil, errors.Wrap(ErrInvalidStateTransition, "handle mint")
}
if _, touched := lockMap[string(t.Proofs[0][32:])]; touched {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle mint")
}
_, pr, err := a.CoinStore.GetPreCoinProofsForOwner(t.Proofs[0][32:])
if err != nil && !errors.Is(err, store.ErrNotFound) {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle mint")
@ -87,8 +83,6 @@ func (a *TokenApplication) handleMint(
}
}
lockMap[string(t.Proofs[0][32:])] = struct{}{}
outputs := []*protobufs.TokenOutput{
&protobufs.TokenOutput{
Output: &protobufs.TokenOutput_Proof{
@ -130,28 +124,12 @@ func (a *TokenApplication) handleMint(
zap.String("peer_id", base58.Encode([]byte(peerId))),
zap.Uint64("frame_number", currentFrameNumber),
)
if _, touched := lockMap[string(t.Signature.PublicKey.KeyValue)]; touched {
a.Logger.Debug(
"already received",
zap.String("peer_id", base58.Encode([]byte(peerId))),
zap.Uint64("frame_number", currentFrameNumber),
)
return nil, errors.Wrap(ErrInvalidStateTransition, "handle mint")
}
ring := -1
for i, t := range a.Tries[1:] {
if t.Contains(altAddr.FillBytes(make([]byte, 32))) {
ring = i
}
}
if ring == -1 {
a.Logger.Debug(
"not in ring",
zap.String("peer_id", base58.Encode([]byte(peerId))),
zap.Uint64("frame_number", currentFrameNumber),
)
return nil, errors.Wrap(ErrInvalidStateTransition, "handle mint")
}
_, prfs, err := a.CoinStore.GetPreCoinProofsForOwner(
altAddr.FillBytes(make([]byte, 32)),
@ -181,7 +159,6 @@ func (a *TokenApplication) handleMint(
zap.String("peer_id", base58.Encode([]byte(peerId))),
zap.Uint64("frame_number", currentFrameNumber),
)
lockMap[string(t.Signature.PublicKey.KeyValue)] = struct{}{}
return []*protobufs.TokenOutput{&protobufs.TokenOutput{
Output: &protobufs.TokenOutput_Penalty{
Penalty: &protobufs.ProverPenalty{
@ -210,7 +187,6 @@ func (a *TokenApplication) handleMint(
zap.String("peer_id", base58.Encode([]byte(peerId))),
zap.Uint64("frame_number", currentFrameNumber),
)
lockMap[string(t.Signature.PublicKey.KeyValue)] = struct{}{}
return []*protobufs.TokenOutput{&protobufs.TokenOutput{
Output: &protobufs.TokenOutput_Penalty{
Penalty: &protobufs.ProverPenalty{
@ -271,7 +247,6 @@ func (a *TokenApplication) handleMint(
zap.String("peer_id", base58.Encode([]byte(peerId))),
zap.Uint64("frame_number", currentFrameNumber),
)
lockMap[string(t.Signature.PublicKey.KeyValue)] = struct{}{}
return []*protobufs.TokenOutput{&protobufs.TokenOutput{
Output: &protobufs.TokenOutput_Penalty{
Penalty: &protobufs.ProverPenalty{
@ -310,7 +285,6 @@ func (a *TokenApplication) handleMint(
zap.Uint64("frame_number", currentFrameNumber),
zap.Int("proof_size", len(leaf)),
)
lockMap[string(t.Signature.PublicKey.KeyValue)] = struct{}{}
return []*protobufs.TokenOutput{&protobufs.TokenOutput{
Output: &protobufs.TokenOutput_Penalty{
Penalty: &protobufs.ProverPenalty{
@ -359,7 +333,11 @@ func (a *TokenApplication) handleMint(
}
if verified && delete != nil && len(t.Proofs) > 3 && wesoVerified {
storage := PomwBasis(1, ring, currentFrameNumber)
storage.Quo(storage, big.NewInt(int64(parallelismMap[ring])))
m := parallelismMap[ring]
if m == 0 {
m = 1
}
storage.Quo(storage, big.NewInt(int64(m)))
storage.Mul(storage, big.NewInt(int64(parallelism)))
a.Logger.Debug(
@ -452,7 +430,6 @@ func (a *TokenApplication) handleMint(
})
}
}
lockMap[string(t.Signature.PublicKey.KeyValue)] = struct{}{}
return outputs, nil
}
a.Logger.Debug(

View File

@ -537,6 +537,8 @@ func (e *TokenExecutionEngine) ProcessFrame(
proverTrieJoinRequests := [][]byte{}
proverTrieLeaveRequests := [][]byte{}
mapSnapshot := ToSerializedMap(e.peerSeniority)
activeMap := NewFromMap(mapSnapshot)
for i, output := range app.TokenOutputs.Outputs {
switch o := output.Output.(type) {
@ -597,14 +599,14 @@ func (e *TokenExecutionEngine) ProcessFrame(
break
}
}
if _, ok := (*e.peerSeniority)[addr]; !ok {
(*e.peerSeniority)[addr] = PeerSeniorityItem{
if _, ok := (*activeMap)[addr]; !ok {
(*activeMap)[addr] = PeerSeniorityItem{
seniority: 10,
addr: addr,
}
} else {
(*e.peerSeniority)[addr] = PeerSeniorityItem{
seniority: (*e.peerSeniority)[addr].seniority + 10,
(*activeMap)[addr] = PeerSeniorityItem{
seniority: (*activeMap)[addr].seniority + 10,
addr: addr,
}
}
@ -650,7 +652,7 @@ func (e *TokenExecutionEngine) ProcessFrame(
return nil, errors.Wrap(err, "process frame")
}
sen, ok := (*e.peerSeniority)[string(addr)]
sen, ok := (*activeMap)[string(addr)]
if !ok {
logger(
"peer announced with no seniority",
@ -707,7 +709,7 @@ func (e *TokenExecutionEngine) ProcessFrame(
logger("combined aggregate and 1.4.19-21 seniority", zap.Uint64("seniority", total))
(*e.peerSeniority)[string(addr)] = PeerSeniorityItem{
(*activeMap)[string(addr)] = PeerSeniorityItem{
seniority: aggregated + additional,
addr: string(addr),
}
@ -721,7 +723,7 @@ func (e *TokenExecutionEngine) ProcessFrame(
return nil, errors.Wrap(err, "process frame")
}
(*e.peerSeniority)[string(addr)] = PeerSeniorityItem{
(*activeMap)[string(addr)] = PeerSeniorityItem{
seniority: 0,
addr: string(addr),
}
@ -735,7 +737,7 @@ func (e *TokenExecutionEngine) ProcessFrame(
return nil, errors.Wrap(err, "process frame")
}
sen, ok := (*e.peerSeniority)[string(addr)]
sen, ok := (*activeMap)[string(addr)]
if !ok {
logger(
"peer announced with no seniority",
@ -778,7 +780,7 @@ func (e *TokenExecutionEngine) ProcessFrame(
}
total := GetAggregatedSeniority([]string{peerIds[0]}).Uint64() + additional
logger("combined aggregate and 1.4.19-21 seniority", zap.Uint64("seniority", total))
(*e.peerSeniority)[string(addr)] = PeerSeniorityItem{
(*activeMap)[string(addr)] = PeerSeniorityItem{
seniority: total,
addr: string(addr),
}
@ -790,14 +792,14 @@ func (e *TokenExecutionEngine) ProcessFrame(
return nil, errors.Wrap(err, "process frame")
}
if _, ok := (*e.peerSeniority)[string(addr)]; !ok {
(*e.peerSeniority)[string(addr)] = PeerSeniorityItem{
if _, ok := (*activeMap)[string(addr)]; !ok {
(*activeMap)[string(addr)] = PeerSeniorityItem{
seniority: 20,
addr: string(addr),
}
} else {
(*e.peerSeniority)[string(addr)] = PeerSeniorityItem{
seniority: (*e.peerSeniority)[string(addr)].seniority + 20,
(*activeMap)[string(addr)] = PeerSeniorityItem{
seniority: (*activeMap)[string(addr)].seniority + 20,
addr: string(addr),
}
}
@ -823,14 +825,14 @@ func (e *TokenExecutionEngine) ProcessFrame(
}
case *protobufs.TokenOutput_Penalty:
addr := string(o.Penalty.Account.GetImplicitAccount().Address)
if _, ok := (*e.peerSeniority)[addr]; !ok {
(*e.peerSeniority)[addr] = PeerSeniorityItem{
if _, ok := (*activeMap)[addr]; !ok {
(*activeMap)[addr] = PeerSeniorityItem{
seniority: 0,
addr: addr,
}
proverTrieLeaveRequests = append(proverTrieLeaveRequests, []byte(addr))
} else {
if (*e.peerSeniority)[addr].seniority > o.Penalty.Quantity {
if (*activeMap)[addr].seniority > o.Penalty.Quantity {
for _, t := range app.Tries {
if t.Contains([]byte(addr)) {
v := t.Get([]byte(addr))
@ -841,12 +843,12 @@ func (e *TokenExecutionEngine) ProcessFrame(
break
}
}
(*e.peerSeniority)[addr] = PeerSeniorityItem{
seniority: (*e.peerSeniority)[addr].seniority - o.Penalty.Quantity,
(*activeMap)[addr] = PeerSeniorityItem{
seniority: (*activeMap)[addr].seniority - o.Penalty.Quantity,
addr: addr,
}
} else {
(*e.peerSeniority)[addr] = PeerSeniorityItem{
(*activeMap)[addr] = PeerSeniorityItem{
seniority: 0,
addr: addr,
}
@ -859,23 +861,23 @@ func (e *TokenExecutionEngine) ProcessFrame(
joinAddrs := tries.NewMinHeap[PeerSeniorityItem]()
leaveAddrs := tries.NewMinHeap[PeerSeniorityItem]()
for _, addr := range proverTrieJoinRequests {
if _, ok := (*e.peerSeniority)[string(addr)]; !ok {
if _, ok := (*activeMap)[string(addr)]; !ok {
joinAddrs.Push(PeerSeniorityItem{
addr: string(addr),
seniority: 0,
})
} else {
joinAddrs.Push((*e.peerSeniority)[string(addr)])
joinAddrs.Push((*activeMap)[string(addr)])
}
}
for _, addr := range proverTrieLeaveRequests {
if _, ok := (*e.peerSeniority)[string(addr)]; !ok {
if _, ok := (*activeMap)[string(addr)]; !ok {
leaveAddrs.Push(PeerSeniorityItem{
addr: string(addr),
seniority: 0,
})
} else {
leaveAddrs.Push((*e.peerSeniority)[string(addr)])
leaveAddrs.Push((*activeMap)[string(addr)])
}
}
@ -891,7 +893,7 @@ func (e *TokenExecutionEngine) ProcessFrame(
err = e.clockStore.PutPeerSeniorityMap(
txn,
e.intrinsicFilter,
ToSerializedMap(e.peerSeniority),
ToSerializedMap(activeMap),
)
if err != nil {
txn.Abort()
@ -904,6 +906,8 @@ func (e *TokenExecutionEngine) ProcessFrame(
return nil, errors.Wrap(err, "process frame")
}
e.peerSeniority = activeMap
if frame.FrameNumber == application.PROOF_FRAME_RING_RESET ||
frame.FrameNumber == application.PROOF_FRAME_RING_RESET_2 {
e.logger.Info("performing ring reset")

View File

@ -29,6 +29,7 @@ func (t *MintCoinRequest) RingAndParallelism(
if err := t.Signature.Verify(payload); err != nil {
return -1, 0, errors.New("invalid")
}
pk, err := pcrypto.UnmarshalEd448PublicKey(
t.Signature.PublicKey.KeyValue,
)