This commit is contained in:
Cassandra Heart 2024-11-04 18:09:35 -06:00
parent 8ee28eb2a7
commit a9225b2508
No known key found for this signature in database
GPG Key ID: 6352152859385958
21 changed files with 2233 additions and 1907 deletions

View File

@ -40,5 +40,5 @@ func GetPatchNumber() byte {
}
func GetRCNumber() byte {
return 0x00
return 0x01
}

View File

@ -15,11 +15,31 @@ import (
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
)
func (e *DataClockConsensusEngine) handleMessage(
func (e *DataClockConsensusEngine) handleFrameMessage(
message *pb.Message,
) error {
go func() {
e.messageProcessorCh <- message
e.frameMessageProcessorCh <- message
}()
return nil
}
func (e *DataClockConsensusEngine) handleTxMessage(
message *pb.Message,
) error {
go func() {
e.txMessageProcessorCh <- message
}()
return nil
}
func (e *DataClockConsensusEngine) handleInfoMessage(
message *pb.Message,
) error {
go func() {
e.infoMessageProcessorCh <- message
}()
return nil
@ -71,7 +91,7 @@ func (e *DataClockConsensusEngine) publishProof(
),
})
e.peerMapMx.Unlock()
if err := e.publishMessage(e.filter, list); err != nil {
if err := e.publishMessage(e.infoFilter, list); err != nil {
e.logger.Debug("error publishing message", zap.Error(err))
}
@ -80,7 +100,7 @@ func (e *DataClockConsensusEngine) publishProof(
return nil
}
func (e *DataClockConsensusEngine) insertMessage(
func (e *DataClockConsensusEngine) insertTxMessage(
filter []byte,
message proto.Message,
) error {
@ -124,7 +144,7 @@ func (e *DataClockConsensusEngine) insertMessage(
}
go func() {
e.messageProcessorCh <- m
e.txMessageProcessorCh <- m
}()
return nil

View File

@ -97,6 +97,8 @@ type DataClockConsensusEngine struct {
frameChan chan *protobufs.ClockFrame
executionEngines map[string]execution.ExecutionEngine
filter []byte
txFilter []byte
infoFilter []byte
input []byte
parentSelector []byte
syncingStatus SyncStatusType
@ -108,16 +110,13 @@ type DataClockConsensusEngine struct {
stagedTransactionsMx sync.Mutex
peerMapMx sync.RWMutex
peerAnnounceMapMx sync.Mutex
proverTrieJoinRequests map[string]string
proverTrieLeaveRequests map[string]string
proverTriePauseRequests map[string]string
proverTrieResumeRequests map[string]string
proverTrieRequestsMx sync.Mutex
lastKeyBundleAnnouncementFrame uint64
peerSeniority *peerSeniority
peerMap map[string]*peerInfo
uncooperativePeersMap map[string]*peerInfo
messageProcessorCh chan *pb.Message
frameMessageProcessorCh chan *pb.Message
txMessageProcessorCh chan *pb.Message
infoMessageProcessorCh chan *pb.Message
report *protobufs.SelfTestReport
}
@ -259,7 +258,9 @@ func NewDataClockConsensusEngine(
dataTimeReel: dataTimeReel,
peerInfoManager: peerInfoManager,
peerSeniority: newFromMap(peerSeniority),
messageProcessorCh: make(chan *pb.Message),
frameMessageProcessorCh: make(chan *pb.Message),
txMessageProcessorCh: make(chan *pb.Message),
infoMessageProcessorCh: make(chan *pb.Message),
config: config,
preMidnightMint: map[string]struct{}{},
}
@ -271,6 +272,8 @@ func NewDataClockConsensusEngine(
)
e.filter = filter
e.txFilter = append([]byte{0x00}, e.filter...)
e.infoFilter = append([]byte{0x00, 0x00}, e.filter...)
e.input = seed
e.provingKey = signer
e.provingKeyType = keyType
@ -299,10 +302,14 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
panic(err)
}
go e.runMessageHandler()
go e.runFrameMessageHandler()
go e.runTxMessageHandler()
go e.runInfoMessageHandler()
e.logger.Info("subscribing to pubsub messages")
e.pubSub.Subscribe(e.filter, e.handleMessage)
e.pubSub.Subscribe(e.filter, e.handleFrameMessage)
e.pubSub.Subscribe(e.txFilter, e.handleTxMessage)
e.pubSub.Subscribe(e.infoFilter, e.handleInfoMessage)
go func() {
server := grpc.NewServer(
grpc.MaxSendMsgSize(600*1024*1024),
@ -422,7 +429,7 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
zap.Uint64("frame_number", frame.FrameNumber),
)
if err := e.publishMessage(e.filter, list); err != nil {
if err := e.publishMessage(e.infoFilter, list); err != nil {
e.logger.Debug("error publishing message", zap.Error(err))
}
@ -435,7 +442,6 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
}()
go e.runLoop()
go e.rebroadcastLoop()
go func() {
time.Sleep(30 * time.Second)
e.logger.Info("checking for snapshots to play forward")
@ -579,6 +585,7 @@ func (e *DataClockConsensusEngine) PerformTimeProof(
e.logger.Error("failed to reconnect", zap.Error(err))
}
}
clients[i] = client
continue
}
@ -589,7 +596,7 @@ func (e *DataClockConsensusEngine) PerformTimeProof(
e.logger.Error("failed to reconnect", zap.Error(err))
continue
}
e.publishMessage(e.filter, &protobufs.TokenRequest{
e.publishMessage(e.txFilter, &protobufs.TokenRequest{
Request: &protobufs.TokenRequest_Mint{
Mint: &protobufs.MintCoinRequest{
Proofs: [][]byte{resp.Output},
@ -624,14 +631,18 @@ func (e *DataClockConsensusEngine) Stop(force bool) <-chan error {
panic(err)
}
e.publishMessage(e.filter, &protobufs.AnnounceProverPause{
Filter: e.filter,
FrameNumber: e.GetFrame().FrameNumber,
PublicKeySignatureEd448: &protobufs.Ed448Signature{
PublicKey: &protobufs.Ed448PublicKey{
KeyValue: e.pubSub.GetPublicKey(),
e.publishMessage(e.txFilter, &protobufs.TokenRequest{
Request: &protobufs.TokenRequest_Pause{
Pause: &protobufs.AnnounceProverPause{
Filter: e.filter,
FrameNumber: e.GetFrame().FrameNumber,
PublicKeySignatureEd448: &protobufs.Ed448Signature{
PublicKey: &protobufs.Ed448PublicKey{
KeyValue: e.pubSub.GetPublicKey(),
},
Signature: sig,
},
},
Signature: sig,
},
})
@ -777,8 +788,8 @@ func (e *DataClockConsensusEngine) createParallelDataClientsFromListAndIndex(
insecure.NewCredentials(),
),
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(600*1024*1024),
grpc.MaxCallRecvMsgSize(600*1024*1024),
grpc.MaxCallSendMsgSize(10*1024*1024),
grpc.MaxCallRecvMsgSize(10*1024*1024),
),
)
if err != nil {
@ -973,13 +984,17 @@ func (e *DataClockConsensusEngine) announceProverJoin() {
panic(err)
}
e.publishMessage(e.filter, &protobufs.AnnounceProverJoin{
Filter: bytes.Repeat([]byte{0xff}, 32),
FrameNumber: head.FrameNumber,
PublicKeySignatureEd448: &protobufs.Ed448Signature{
Signature: sig,
PublicKey: &protobufs.Ed448PublicKey{
KeyValue: e.provingKeyBytes,
e.publishMessage(e.txFilter, &protobufs.TokenRequest{
Request: &protobufs.TokenRequest_Join{
Join: &protobufs.AnnounceProverJoin{
Filter: bytes.Repeat([]byte{0xff}, 32),
FrameNumber: head.FrameNumber,
PublicKeySignatureEd448: &protobufs.Ed448Signature{
Signature: sig,
PublicKey: &protobufs.Ed448PublicKey{
KeyValue: e.provingKeyBytes,
},
},
},
},
})

View File

@ -2,8 +2,6 @@ package data
import (
"bytes"
"crypto/rand"
"slices"
"time"
"go.uber.org/zap"
@ -58,317 +56,71 @@ func (e *DataClockConsensusEngine) runLoop() {
select {
case dataFrame := <-dataFrameCh:
e.logger.Info(
"current frame head",
zap.Uint64("frame_number", dataFrame.FrameNumber),
)
if !e.IsInProverTrie(e.provingKeyBytes) {
if latestFrame, err = e.collect(dataFrame); err != nil {
e.logger.Error("could not collect", zap.Error(err))
}
}
if latestFrame != nil &&
dataFrame.FrameNumber > latestFrame.FrameNumber {
latestFrame = dataFrame
}
if e.latestFrameReceived < latestFrame.FrameNumber {
e.latestFrameReceived = latestFrame.FrameNumber
}
trie := e.GetFrameProverTries()[0]
selBI, _ := dataFrame.GetSelector()
sel := make([]byte, 32)
sel = selBI.FillBytes(sel)
if bytes.Equal(
trie.FindNearest(sel).External.Key,
e.provingKeyAddress,
) {
var nextFrame *protobufs.ClockFrame
if nextFrame, err = e.prove(latestFrame); err != nil {
e.logger.Error("could not prove", zap.Error(err))
e.state = consensus.EngineStateCollecting
continue
}
e.proverTrieRequestsMx.Lock()
joinAddrs := tries.NewMinHeap[peerSeniorityItem]()
leaveAddrs := tries.NewMinHeap[peerSeniorityItem]()
for _, addr := range e.proverTrieJoinRequests {
if _, ok := (*e.peerSeniority)[addr]; !ok {
joinAddrs.Push(peerSeniorityItem{
addr: addr,
seniority: 0,
})
} else {
joinAddrs.Push((*e.peerSeniority)[addr])
}
}
for _, addr := range e.proverTrieLeaveRequests {
if _, ok := (*e.peerSeniority)[addr]; !ok {
leaveAddrs.Push(peerSeniorityItem{
addr: addr,
seniority: 0,
})
} else {
leaveAddrs.Push((*e.peerSeniority)[addr])
}
}
for _, addr := range e.proverTrieResumeRequests {
if _, ok := e.proverTriePauseRequests[addr]; ok {
delete(e.proverTriePauseRequests, addr)
}
}
joinReqs := make([]peerSeniorityItem, len(joinAddrs.All()))
copy(joinReqs, joinAddrs.All())
slices.Reverse(joinReqs)
leaveReqs := make([]peerSeniorityItem, len(leaveAddrs.All()))
copy(leaveReqs, leaveAddrs.All())
slices.Reverse(leaveReqs)
e.proverTrieJoinRequests = make(map[string]string)
e.proverTrieLeaveRequests = make(map[string]string)
e.proverTrieRequestsMx.Unlock()
e.frameProverTriesMx.Lock()
for _, addr := range joinReqs {
rings := len(e.frameProverTries)
last := e.frameProverTries[rings-1]
set := last.FindNearestAndApproximateNeighbors(make([]byte, 32))
if len(set) == 1024 {
e.frameProverTries = append(
e.frameProverTries,
&tries.RollingFrecencyCritbitTrie{},
)
last = e.frameProverTries[rings]
}
last.Add([]byte(addr.addr), nextFrame.FrameNumber)
}
for _, addr := range leaveReqs {
for _, t := range e.frameProverTries {
if bytes.Equal(
t.FindNearest([]byte(addr.addr)).External.Key,
[]byte(addr.addr),
) {
t.Remove([]byte(addr.addr))
break
}
}
}
e.frameProverTriesMx.Unlock()
e.dataTimeReel.Insert(nextFrame, true)
if err = e.publishProof(nextFrame); err != nil {
e.logger.Error("could not publish", zap.Error(err))
e.state = consensus.EngineStateCollecting
}
break
} else {
if !e.IsInProverTrie(e.provingKeyBytes) {
e.announceProverJoin()
}
}
latestFrame = e.processFrame(latestFrame, dataFrame)
case <-time.After(20 * time.Second):
dataFrame, err := e.dataTimeReel.Head()
if err != nil {
panic(err)
}
e.logger.Info(
"current frame head",
zap.Uint64("frame_number", dataFrame.FrameNumber),
)
if !e.IsInProverTrie(e.provingKeyBytes) {
if latestFrame, err = e.collect(dataFrame); err != nil {
e.logger.Error("could not collect", zap.Error(err))
}
}
if latestFrame == nil ||
latestFrame.FrameNumber < dataFrame.FrameNumber {
latestFrame, err = e.dataTimeReel.Head()
if err != nil {
panic(err)
}
}
if e.latestFrameReceived < latestFrame.FrameNumber {
e.latestFrameReceived = latestFrame.FrameNumber
}
trie := e.GetFrameProverTries()[0]
selBI, _ := dataFrame.GetSelector()
sel := make([]byte, 32)
sel = selBI.FillBytes(sel)
if bytes.Equal(
trie.FindNearest(sel).External.Key,
e.provingKeyAddress,
) {
if bytes.Equal(
trie.FindNearest(e.provingKeyAddress).External.Key,
e.provingKeyAddress,
) {
var nextFrame *protobufs.ClockFrame
if nextFrame, err = e.prove(latestFrame); err != nil {
e.logger.Error("could not prove", zap.Error(err))
e.state = consensus.EngineStateCollecting
continue
}
e.proverTrieRequestsMx.Lock()
joinAddrs := tries.NewMinHeap[peerSeniorityItem]()
leaveAddrs := tries.NewMinHeap[peerSeniorityItem]()
for _, addr := range e.proverTrieJoinRequests {
if _, ok := (*e.peerSeniority)[addr]; !ok {
joinAddrs.Push(peerSeniorityItem{
addr: addr,
seniority: 0,
})
} else {
joinAddrs.Push((*e.peerSeniority)[addr])
}
}
for _, addr := range e.proverTrieLeaveRequests {
if _, ok := (*e.peerSeniority)[addr]; !ok {
leaveAddrs.Push(peerSeniorityItem{
addr: addr,
seniority: 0,
})
} else {
leaveAddrs.Push((*e.peerSeniority)[addr])
}
}
for _, addr := range e.proverTrieResumeRequests {
if _, ok := e.proverTriePauseRequests[addr]; ok {
delete(e.proverTriePauseRequests, addr)
}
}
joinReqs := make([]peerSeniorityItem, len(joinAddrs.All()))
copy(joinReqs, joinAddrs.All())
slices.Reverse(joinReqs)
leaveReqs := make([]peerSeniorityItem, len(leaveAddrs.All()))
copy(leaveReqs, leaveAddrs.All())
slices.Reverse(leaveReqs)
e.proverTrieJoinRequests = make(map[string]string)
e.proverTrieLeaveRequests = make(map[string]string)
e.proverTrieRequestsMx.Unlock()
e.frameProverTriesMx.Lock()
for _, addr := range joinReqs {
rings := len(e.frameProverTries)
last := e.frameProverTries[rings-1]
set := last.FindNearestAndApproximateNeighbors(make([]byte, 32))
if len(set) == 8 {
e.frameProverTries = append(
e.frameProverTries,
&tries.RollingFrecencyCritbitTrie{},
)
last = e.frameProverTries[rings]
}
last.Add([]byte(addr.addr), nextFrame.FrameNumber)
}
for _, addr := range leaveReqs {
for _, t := range e.frameProverTries {
if bytes.Equal(
t.FindNearest([]byte(addr.addr)).External.Key,
[]byte(addr.addr),
) {
t.Remove([]byte(addr.addr))
break
}
}
}
e.frameProverTriesMx.Unlock()
e.dataTimeReel.Insert(nextFrame, true)
if err = e.publishProof(nextFrame); err != nil {
e.logger.Error("could not publish", zap.Error(err))
e.state = consensus.EngineStateCollecting
}
break
}
}
latestFrame = e.processFrame(latestFrame, dataFrame)
}
}
}
}
func (e *DataClockConsensusEngine) rebroadcastLoop() {
if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) {
time.Sleep(120 * time.Second)
for {
_, err := e.dataTimeReel.Head()
if err != nil {
e.logger.Info("no frames to rebroadcast yet, waiting...")
time.Sleep(10 * time.Second)
continue
}
max, _, err := e.clockStore.GetLatestDataClockFrame(e.filter)
frames := []*protobufs.ClockFrame{}
sent := false
for i := uint64(1); i < max.FrameNumber; i++ {
if e.state == consensus.EngineStateStopped ||
e.state == consensus.EngineStateStopping {
e.logger.Info("shutting down rebroadcaster")
return
}
frame, _, err := e.clockStore.GetDataClockFrame(e.filter, i, false)
if err != nil {
frames = []*protobufs.ClockFrame{}
e.logger.Error("error while iterating", zap.Error(err))
break
}
if frame == nil {
frames = []*protobufs.ClockFrame{}
e.logger.Error("too far ahead", zap.Error(err))
break
}
frames = append(frames, frame)
if i%50 == 0 {
e.logger.Info(
"rebroadcasting frames",
zap.Uint64("from", frames[0].FrameNumber),
zap.Uint64("to", frames[len(frames)-1].FrameNumber),
)
e.publishMessage(e.filter, &protobufs.FrameRebroadcast{
From: frames[0].FrameNumber,
To: frames[len(frames)-1].FrameNumber,
ClockFrames: frames,
})
time.Sleep(60 * time.Second)
sent = true
frames = []*protobufs.ClockFrame{}
}
}
if !sent && len(frames) != 0 {
e.logger.Info(
"rebroadcasting frames",
zap.Uint64("from", frames[0].FrameNumber),
zap.Uint64("to", frames[len(frames)-1].FrameNumber),
)
b := make([]byte, 24)
rand.Read(b)
e.publishMessage(e.filter, &protobufs.FrameRebroadcast{
From: frames[0].FrameNumber,
To: frames[len(frames)-1].FrameNumber,
ClockFrames: frames,
Random: b,
})
time.Sleep(60 * time.Second)
}
func (e *DataClockConsensusEngine) processFrame(
latestFrame *protobufs.ClockFrame,
dataFrame *protobufs.ClockFrame,
) *protobufs.ClockFrame {
e.logger.Info(
"current frame head",
zap.Uint64("frame_number", dataFrame.FrameNumber),
)
var err error
if !e.IsInProverTrie(e.provingKeyBytes) {
if latestFrame, err = e.collect(dataFrame); err != nil {
e.logger.Error("could not collect", zap.Error(err))
}
}
if latestFrame != nil &&
dataFrame.FrameNumber > latestFrame.FrameNumber {
latestFrame = dataFrame
}
if e.latestFrameReceived < latestFrame.FrameNumber {
e.latestFrameReceived = latestFrame.FrameNumber
}
trie := e.GetFrameProverTries()[0]
selBI, _ := dataFrame.GetSelector()
sel := make([]byte, 32)
sel = selBI.FillBytes(sel)
if bytes.Equal(
trie.FindNearest(sel).External.Key,
e.provingKeyAddress,
) {
var nextFrame *protobufs.ClockFrame
if nextFrame, err = e.prove(latestFrame); err != nil {
e.logger.Error("could not prove", zap.Error(err))
e.state = consensus.EngineStateCollecting
return latestFrame
}
e.dataTimeReel.Insert(nextFrame, true)
if err = e.publishProof(nextFrame); err != nil {
e.logger.Error("could not publish", zap.Error(err))
e.state = consensus.EngineStateCollecting
}
return nextFrame
} else {
if !e.IsInProverTrie(e.provingKeyBytes) {
e.announceProverJoin()
}
return dataFrame
}
}

View File

@ -17,11 +17,47 @@ import (
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
)
func (e *DataClockConsensusEngine) runMessageHandler() {
func (e *DataClockConsensusEngine) runFrameMessageHandler() {
for {
select {
case message := <-e.messageProcessorCh:
e.logger.Debug("handling message")
case message := <-e.frameMessageProcessorCh:
e.logger.Debug("handling frame message")
msg := &protobufs.Message{}
if err := proto.Unmarshal(message.Data, msg); err != nil {
e.logger.Debug("bad message")
continue
}
any := &anypb.Any{}
if err := proto.Unmarshal(msg.Payload, any); err != nil {
e.logger.Error("error while unmarshaling", zap.Error(err))
continue
}
e.logger.Debug("message type", zap.String("type", any.TypeUrl))
go func() {
switch any.TypeUrl {
case protobufs.ClockFrameType:
if err := e.handleClockFrameData(
message.From,
msg.Address,
any,
false,
); err != nil {
return
}
}
}()
}
}
}
func (e *DataClockConsensusEngine) runTxMessageHandler() {
for {
select {
case message := <-e.txMessageProcessorCh:
e.logger.Debug("handling tx message")
msg := &protobufs.Message{}
if err := proto.Unmarshal(message.Data, msg); err != nil {
@ -86,26 +122,31 @@ func (e *DataClockConsensusEngine) runMessageHandler() {
continue
}
e.logger.Debug("message type", zap.String("type", any.TypeUrl))
}
}
}
func (e *DataClockConsensusEngine) runInfoMessageHandler() {
for {
select {
case message := <-e.infoMessageProcessorCh:
e.logger.Debug("handling info message")
msg := &protobufs.Message{}
if err := proto.Unmarshal(message.Data, msg); err != nil {
e.logger.Debug("bad message")
continue
}
any := &anypb.Any{}
if err := proto.Unmarshal(msg.Payload, any); err != nil {
e.logger.Error("error while unmarshaling", zap.Error(err))
continue
}
e.logger.Debug("message type", zap.String("type", any.TypeUrl))
go func() {
switch any.TypeUrl {
case protobufs.FrameRebroadcastType:
if err := e.handleRebroadcast(
message.From,
msg.Address,
any,
); err != nil {
return
}
case protobufs.ClockFrameType:
if err := e.handleClockFrameData(
message.From,
msg.Address,
any,
false,
); err != nil {
return
}
case protobufs.DataPeerListAnnounceType:
if err := e.handleDataPeerListAnnounce(
message.From,
@ -114,101 +155,12 @@ func (e *DataClockConsensusEngine) runMessageHandler() {
); err != nil {
return
}
case protobufs.AnnounceProverJoinType:
if err := e.handleDataAnnounceProverJoin(
message.From,
msg.Address,
any,
); err != nil {
return
}
case protobufs.AnnounceProverLeaveType:
if !e.IsInProverTrie(message.From) {
return
}
if err := e.handleDataAnnounceProverLeave(
message.From,
msg.Address,
any,
); err != nil {
return
}
case protobufs.AnnounceProverPauseType:
if !e.IsInProverTrie(message.From) {
return
}
// Limit score to penalize frequent restarts
e.pubSub.AddPeerScore(message.From, -100)
if err := e.handleDataAnnounceProverPause(
message.From,
msg.Address,
any,
); err != nil {
return
}
case protobufs.AnnounceProverResumeType:
if err := e.handleDataAnnounceProverResume(
message.From,
msg.Address,
any,
); err != nil {
return
}
}
}()
}
}
}
func (e *DataClockConsensusEngine) handleRebroadcast(
peerID []byte,
address []byte,
any *anypb.Any,
) error {
if bytes.Equal(peerID, e.pubSub.GetPeerID()) {
return nil
}
frames := &protobufs.FrameRebroadcast{}
if err := any.UnmarshalTo(frames); err != nil {
return errors.Wrap(err, "handle clock frame data")
}
head, err := e.dataTimeReel.Head()
if err != nil {
return nil
}
e.logger.Debug(
"received rebroadcast",
zap.Uint64("from", frames.From),
zap.Uint64("to", frames.To),
)
if head.FrameNumber+1 < frames.From {
return nil
}
if head.FrameNumber > frames.To {
return nil
}
for _, frame := range frames.ClockFrames {
if head.FrameNumber >= frame.FrameNumber {
continue
}
e.logger.Info("receiving synchronization data")
if err := e.handleClockFrame(peerID, address, frame); err != nil {
// if they're sending invalid clock frames, nuke them.
e.pubSub.AddPeerScore(peerID, -100000)
return errors.Wrap(err, "handle rebroadcast")
}
}
return nil
}
func (e *DataClockConsensusEngine) handleClockFrame(
peerID []byte,
address []byte,
@ -404,197 +356,6 @@ func (e *DataClockConsensusEngine) handleDataPeerListAnnounce(
return nil
}
func (e *DataClockConsensusEngine) getAddressFromSignature(
sig *protobufs.Ed448Signature,
) ([]byte, error) {
if sig.PublicKey == nil || sig.PublicKey.KeyValue == nil {
return nil, errors.New("invalid data")
}
addrBI, err := poseidon.HashBytes(sig.PublicKey.KeyValue)
if err != nil {
return nil, errors.Wrap(err, "get address from signature")
}
return addrBI.FillBytes(make([]byte, 32)), nil
}
func (e *DataClockConsensusEngine) handleDataAnnounceProverJoin(
peerID []byte,
address []byte,
any *anypb.Any,
) error {
if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) {
announce := &protobufs.AnnounceProverJoin{}
if err := any.UnmarshalTo(announce); err != nil {
return errors.Wrap(err, "handle data announce prover join")
}
if announce.PublicKeySignatureEd448 == nil || announce.Filter == nil {
return errors.Wrap(
errors.New("invalid data"),
"handle data announce prover join",
)
}
address, err := e.getAddressFromSignature(announce.PublicKeySignatureEd448)
if err != nil {
return errors.Wrap(err, "handle data announce prover join")
}
msg := []byte("join")
msg = binary.BigEndian.AppendUint64(msg, announce.FrameNumber)
msg = append(msg, announce.Filter...)
if err := announce.GetPublicKeySignatureEd448().Verify(msg); err != nil {
return errors.Wrap(err, "handle data announce prover join")
}
e.proverTrieRequestsMx.Lock()
if len(announce.Filter) != len(e.filter) {
return errors.Wrap(
errors.New("filter width mismatch"),
"handle data announce prover join",
)
}
e.proverTrieJoinRequests[string(address)] = string(announce.Filter)
e.proverTrieRequestsMx.Unlock()
}
return nil
}
func (e *DataClockConsensusEngine) handleDataAnnounceProverLeave(
peerID []byte,
address []byte,
any *anypb.Any,
) error {
if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) {
announce := &protobufs.AnnounceProverLeave{}
if err := any.UnmarshalTo(announce); err != nil {
return errors.Wrap(err, "handle data announce prover leave")
}
if announce.PublicKeySignatureEd448 == nil || announce.Filter == nil {
return errors.Wrap(
errors.New("invalid data"),
"handle data announce prover leave",
)
}
e.proverTrieRequestsMx.Lock()
if len(announce.Filter) != len(e.filter) {
return errors.Wrap(
errors.New("filter width mismatch"),
"handle data announce prover leave",
)
}
msg := []byte("leave")
msg = binary.BigEndian.AppendUint64(msg, announce.FrameNumber)
msg = append(msg, announce.Filter...)
if err := announce.GetPublicKeySignatureEd448().Verify(msg); err != nil {
return errors.Wrap(err, "handle data announce prover leave")
}
address, err := e.getAddressFromSignature(announce.PublicKeySignatureEd448)
if err != nil {
return errors.Wrap(err, "handle data announce prover leave")
}
e.proverTrieLeaveRequests[string(address)] = string(announce.Filter)
e.proverTrieRequestsMx.Unlock()
}
return nil
}
func (e *DataClockConsensusEngine) handleDataAnnounceProverPause(
peerID []byte,
address []byte,
any *anypb.Any,
) error {
if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) {
announce := &protobufs.AnnounceProverPause{}
if err := any.UnmarshalTo(announce); err != nil {
return errors.Wrap(err, "handle data announce prover pause")
}
if announce.PublicKeySignatureEd448 == nil || announce.Filter == nil {
return errors.Wrap(
errors.New("invalid data"),
"handle data announce prover leave",
)
}
e.proverTrieRequestsMx.Lock()
if len(announce.Filter) != len(e.filter) {
return errors.Wrap(
errors.New("filter width mismatch"),
"handle data announce prover pause",
)
}
msg := []byte("pause")
msg = binary.BigEndian.AppendUint64(msg, announce.FrameNumber)
msg = append(msg, announce.Filter...)
if err := announce.GetPublicKeySignatureEd448().Verify(msg); err != nil {
return errors.Wrap(err, "handle data announce prover pause")
}
address, err := e.getAddressFromSignature(announce.PublicKeySignatureEd448)
if err != nil {
return errors.Wrap(err, "handle data announce prover pause")
}
e.proverTriePauseRequests[string(address)] = string(announce.Filter)
e.proverTrieRequestsMx.Unlock()
}
return nil
}
func (e *DataClockConsensusEngine) handleDataAnnounceProverResume(
peerID []byte,
address []byte,
any *anypb.Any,
) error {
if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) {
announce := &protobufs.AnnounceProverResume{}
if err := any.UnmarshalTo(announce); err != nil {
return errors.Wrap(err, "handle data announce prover resume")
}
if announce.PublicKeySignatureEd448 == nil || announce.Filter == nil {
return errors.Wrap(
errors.New("invalid data"),
"handle data announce prover resume",
)
}
e.proverTrieRequestsMx.Lock()
if len(announce.Filter) != len(e.filter) {
return errors.Wrap(
errors.New("filter width mismatch"),
"handle data announce prover resume",
)
}
address, err := e.getAddressFromSignature(announce.PublicKeySignatureEd448)
if err != nil {
return errors.Wrap(err, "handle data announce prover resume")
}
msg := []byte("resume")
msg = binary.BigEndian.AppendUint64(msg, announce.FrameNumber)
msg = append(msg, announce.Filter...)
if err := announce.GetPublicKeySignatureEd448().Verify(msg); err != nil {
return errors.Wrap(err, "handle data announce prover resume")
}
e.proverTrieResumeRequests[string(address)] = string(announce.Filter)
e.proverTrieRequestsMx.Unlock()
}
return nil
}
func (e *DataClockConsensusEngine) handleTokenRequest(
transition *protobufs.TokenRequest,
) error {
@ -649,6 +410,62 @@ func (e *DataClockConsensusEngine) handleTokenRequest(
}
}
}
case *protobufs.TokenRequest_Announce:
switch r := transition.Request.(type) {
case *protobufs.TokenRequest_Announce:
checkannounce:
for i := range t.Announce.GetPublicKeySignaturesEd448() {
for j := range r.Announce.GetPublicKeySignaturesEd448() {
if bytes.Equal(
t.Announce.GetPublicKeySignaturesEd448()[i].PublicKey.KeyValue,
r.Announce.GetPublicKeySignaturesEd448()[j].PublicKey.KeyValue,
) {
found = true
break checkannounce
}
}
}
}
case *protobufs.TokenRequest_Join:
switch r := transition.Request.(type) {
case *protobufs.TokenRequest_Join:
if bytes.Equal(
t.Join.GetPublicKeySignatureEd448().PublicKey.KeyValue,
r.Join.GetPublicKeySignatureEd448().PublicKey.KeyValue,
) {
found = true
}
}
case *protobufs.TokenRequest_Leave:
switch r := transition.Request.(type) {
case *protobufs.TokenRequest_Leave:
if bytes.Equal(
t.Leave.GetPublicKeySignatureEd448().PublicKey.KeyValue,
r.Leave.GetPublicKeySignatureEd448().PublicKey.KeyValue,
) {
found = true
}
}
case *protobufs.TokenRequest_Pause:
switch r := transition.Request.(type) {
case *protobufs.TokenRequest_Pause:
if bytes.Equal(
t.Pause.GetPublicKeySignatureEd448().PublicKey.KeyValue,
r.Pause.GetPublicKeySignatureEd448().PublicKey.KeyValue,
) {
found = true
}
}
case *protobufs.TokenRequest_Resume:
switch r := transition.Request.(type) {
case *protobufs.TokenRequest_Resume:
if bytes.Equal(
t.Resume.GetPublicKeySignatureEd448().PublicKey.KeyValue,
r.Resume.GetPublicKeySignatureEd448().PublicKey.KeyValue,
) {
found = true
}
}
}
}

View File

@ -513,7 +513,7 @@ func (e *DataClockConsensusEngine) handleMint(
txn.Abort()
return nil, errors.Wrap(err, "handle mint")
}
err = e.insertMessage(
err = e.insertTxMessage(
e.filter,
&protobufs.TokenRequest{
Request: &protobufs.TokenRequest_Mint{

View File

@ -130,7 +130,9 @@ func TestHandlePreMidnightMint(t *testing.T) {
dataTimeReel: &qtime.DataTimeReel{},
peerInfoManager: nil,
peerSeniority: newFromMap(map[string]uint64{}),
messageProcessorCh: make(chan *pb.Message),
frameMessageProcessorCh: make(chan *pb.Message),
txMessageProcessorCh: make(chan *pb.Message),
infoMessageProcessorCh: make(chan *pb.Message),
config: nil,
preMidnightMint: map[string]struct{}{},
}
@ -626,7 +628,7 @@ func TestHandlePreMidnightMint(t *testing.T) {
}
}
req := <-d.messageProcessorCh
req := <-d.txMessageProcessorCh
assert.NotNil(t, req)
message := &protobufs.Message{}

View File

@ -38,7 +38,10 @@ type DataTimeReel struct {
logger *zap.Logger
clockStore store.ClockStore
frameProver crypto.FrameProver
exec func(txn store.Transaction, frame *protobufs.ClockFrame) error
exec func(txn store.Transaction, frame *protobufs.ClockFrame) (
[]*tries.RollingFrecencyCritbitTrie,
error,
)
origin []byte
initialInclusionProof *crypto.InclusionAggregateProof
@ -62,7 +65,10 @@ func NewDataTimeReel(
clockStore store.ClockStore,
engineConfig *config.EngineConfig,
frameProver crypto.FrameProver,
exec func(txn store.Transaction, frame *protobufs.ClockFrame) error,
exec func(txn store.Transaction, frame *protobufs.ClockFrame) (
[]*tries.RollingFrecencyCritbitTrie,
error,
),
origin []byte,
initialInclusionProof *crypto.InclusionAggregateProof,
initialProverKeys [][]byte,
@ -607,26 +613,29 @@ func (d *DataTimeReel) setHead(frame *protobufs.ClockFrame, distance *big.Int) {
panic(err)
}
var tries []*tries.RollingFrecencyCritbitTrie
if tries, err = d.exec(txn, frame); err != nil {
d.logger.Debug("invalid frame execution, unwinding", zap.Error(err))
txn.Abort()
return
}
if err := d.clockStore.CommitDataClockFrame(
d.filter,
frame.FrameNumber,
selector.FillBytes(make([]byte, 32)),
d.proverTries,
tries,
txn,
false,
); err != nil {
panic(err)
}
if err = d.exec(txn, frame); err != nil {
d.logger.Debug("invalid frame execution, unwinding", zap.Error(err))
txn.Abort()
return
}
if err = txn.Commit(); err != nil {
panic(err)
}
d.proverTries = tries
d.head = frame
d.headDistance = distance

View File

@ -169,7 +169,12 @@ func TestDataTimeReel(t *testing.T) {
Difficulty: 10,
},
prover,
func(txn store.Transaction, frame *protobufs.ClockFrame) error { return nil },
func(txn store.Transaction, frame *protobufs.ClockFrame) (
[]*tries.RollingFrecencyCritbitTrie,
error,
) {
return []*tries.RollingFrecencyCritbitTrie{proverTrie}, nil
},
bytes.Repeat([]byte{0x00}, 516),
&qcrypto.InclusionAggregateProof{
InclusionCommitments: []*qcrypto.InclusionCommitment{},

View File

@ -132,6 +132,102 @@ func (a *TokenApplication) ApplyTransitions(
finalizedTransitions.Requests,
transition,
)
case *protobufs.TokenRequest_Join:
success, err := a.handleDataAnnounceProverJoin(
currentFrameNumber,
lockMap,
t.Join,
)
if err != nil {
if !skipFailures {
return nil, nil, nil, errors.Wrap(
err,
"apply transitions",
)
}
failedTransitions.Requests = append(
failedTransitions.Requests,
transition,
)
break req
}
outputs.Outputs = append(outputs.Outputs, success...)
finalizedTransitions.Requests = append(
finalizedTransitions.Requests,
transition,
)
case *protobufs.TokenRequest_Leave:
success, err := a.handleDataAnnounceProverLeave(
currentFrameNumber,
lockMap,
t.Leave,
)
if err != nil {
if !skipFailures {
return nil, nil, nil, errors.Wrap(
err,
"apply transitions",
)
}
failedTransitions.Requests = append(
failedTransitions.Requests,
transition,
)
break req
}
outputs.Outputs = append(outputs.Outputs, success...)
finalizedTransitions.Requests = append(
finalizedTransitions.Requests,
transition,
)
case *protobufs.TokenRequest_Resume:
success, err := a.handleDataAnnounceProverResume(
currentFrameNumber,
lockMap,
t.Resume,
)
if err != nil {
if !skipFailures {
return nil, nil, nil, errors.Wrap(
err,
"apply transitions",
)
}
failedTransitions.Requests = append(
failedTransitions.Requests,
transition,
)
break req
}
outputs.Outputs = append(outputs.Outputs, success...)
finalizedTransitions.Requests = append(
finalizedTransitions.Requests,
transition,
)
case *protobufs.TokenRequest_Pause:
success, err := a.handleDataAnnounceProverPause(
currentFrameNumber,
lockMap,
t.Pause,
)
if err != nil {
if !skipFailures {
return nil, nil, nil, errors.Wrap(
err,
"apply transitions",
)
}
failedTransitions.Requests = append(
failedTransitions.Requests,
transition,
)
break req
}
outputs.Outputs = append(outputs.Outputs, success...)
finalizedTransitions.Requests = append(
finalizedTransitions.Requests,
transition,
)
case *protobufs.TokenRequest_Merge:
success, err := a.handleMerge(currentFrameNumber, lockMap, t.Merge)
if err != nil {

View File

@ -25,7 +25,7 @@ func (a *TokenApplication) handleMerge(
}
addresses := [][]byte{}
for _, c := range t.Coins {
if c.Address == nil {
if c.Address == nil || len(c.Address) != 32 {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle merge")
}

View File

@ -121,10 +121,11 @@ func (a *TokenApplication) handleMint(
}
ring := -1
addrBytes := addr.FillBytes(make([]byte, 32))
for i, t := range a.Tries {
proverSet := int64((len(a.Tries) - 1) * 1024)
for i, t := range a.Tries[1:] {
n := t.FindNearest(addrBytes)
if n != nil && bytes.Equal(n.External.Key, addrBytes) {
ring = i
ring = i - 1
}
}
if ring == -1 {
@ -181,9 +182,11 @@ func (a *TokenApplication) handleMint(
ringFactor := big.NewInt(2)
ringFactor.Exp(ringFactor, big.NewInt(int64(ring)), nil)
storage := big.NewInt(int64(1024 / (256 / scale)))
storage := big.NewInt(int64(512))
unitFactor := big.NewInt(8000000000)
storage.Mul(storage, unitFactor)
storage.Quo(storage, big.NewInt(proverSet))
storage.Quo(storage, ringFactor)
outputs = append(

View File

@ -20,6 +20,10 @@ func (a *TokenApplication) handleAnnounce(
return nil, errors.Wrap(ErrInvalidStateTransition, "handle announce")
}
for i, p := range t.PublicKeySignaturesEd448 {
if _, touched := lockMap[string(p.PublicKey.KeyValue)]; touched {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle announce")
}
if p.PublicKey == nil || p.Signature == nil ||
p.PublicKey.KeyValue == nil {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle announce")
@ -40,6 +44,10 @@ func (a *TokenApplication) handleAnnounce(
return nil, errors.Wrap(ErrInvalidStateTransition, "handle announce")
}
for _, p := range t.PublicKeySignaturesEd448 {
lockMap[string(p.PublicKey.KeyValue)] = struct{}{}
}
outputs := []*protobufs.TokenOutput{}
return outputs, nil

View File

@ -0,0 +1,79 @@
package application
import (
"encoding/binary"
"github.com/iden3/go-iden3-crypto/poseidon"
"github.com/pkg/errors"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
)
func (a *TokenApplication) getAddressFromSignature(
sig *protobufs.Ed448Signature,
) ([]byte, error) {
if sig.PublicKey == nil || sig.PublicKey.KeyValue == nil {
return nil, errors.New("invalid data")
}
addrBI, err := poseidon.HashBytes(sig.PublicKey.KeyValue)
if err != nil {
return nil, errors.Wrap(err, "get address from signature")
}
return addrBI.FillBytes(make([]byte, 32)), nil
}
func (a *TokenApplication) handleDataAnnounceProverJoin(
currentFrameNumber uint64,
lockMap map[string]struct{},
t *protobufs.AnnounceProverJoin,
) (
[]*protobufs.TokenOutput,
error,
) {
payload := []byte("join")
if t == nil || t.PublicKeySignatureEd448 == nil {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle join")
}
if t.PublicKeySignatureEd448.PublicKey == nil ||
t.PublicKeySignatureEd448.Signature == nil ||
t.PublicKeySignatureEd448.PublicKey.KeyValue == nil ||
t.Filter == nil || len(t.Filter) != 32 ||
t.FrameNumber < currentFrameNumber-1 || t.FrameNumber > currentFrameNumber {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle join")
}
if _, touched := lockMap[string(
t.PublicKeySignatureEd448.PublicKey.KeyValue,
)]; touched {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle join")
}
payload = binary.BigEndian.AppendUint64(payload, t.FrameNumber)
payload = append(payload, t.Filter...)
if err := t.PublicKeySignatureEd448.Verify(
t.PublicKeySignatureEd448.PublicKey.KeyValue,
); err != nil {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle join")
}
address, err := a.getAddressFromSignature(t.PublicKeySignatureEd448)
if err != nil {
return nil, errors.Wrap(err, "handle join")
}
for _, t := range a.Tries {
if t.Contains(address) {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle join")
}
}
lockMap[string(t.PublicKeySignatureEd448.PublicKey.KeyValue)] = struct{}{}
return []*protobufs.TokenOutput{
&protobufs.TokenOutput{
Output: &protobufs.TokenOutput_Join{
Join: t,
},
},
}, nil
}

View File

@ -0,0 +1,70 @@
package application
import (
"encoding/binary"
"github.com/pkg/errors"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
)
func (a *TokenApplication) handleDataAnnounceProverLeave(
currentFrameNumber uint64,
lockMap map[string]struct{},
t *protobufs.AnnounceProverLeave,
) (
[]*protobufs.TokenOutput,
error,
) {
payload := []byte("leave")
if t == nil || t.PublicKeySignatureEd448 == nil {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle leave")
}
if t.PublicKeySignatureEd448.PublicKey == nil ||
t.PublicKeySignatureEd448.Signature == nil ||
t.PublicKeySignatureEd448.PublicKey.KeyValue == nil ||
t.Filter == nil || len(t.Filter) != 32 ||
t.FrameNumber < currentFrameNumber-1 || t.FrameNumber > currentFrameNumber {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle leave")
}
if _, touched := lockMap[string(
t.PublicKeySignatureEd448.PublicKey.KeyValue,
)]; touched {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle leave")
}
payload = binary.BigEndian.AppendUint64(payload, t.FrameNumber)
payload = append(payload, t.Filter...)
if err := t.PublicKeySignatureEd448.Verify(
t.PublicKeySignatureEd448.PublicKey.KeyValue,
); err != nil {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle leave")
}
address, err := a.getAddressFromSignature(t.PublicKeySignatureEd448)
if err != nil {
return nil, errors.Wrap(err, "handle leave")
}
inTries := false
for _, t := range a.Tries {
inTries = inTries || t.Contains(address)
}
if !inTries {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle leave")
}
lockMap[string(t.PublicKeySignatureEd448.PublicKey.KeyValue)] = struct{}{}
return []*protobufs.TokenOutput{
&protobufs.TokenOutput{
Output: &protobufs.TokenOutput_Leave{
Leave: t,
},
},
}, nil
}

View File

@ -0,0 +1,67 @@
package application
import (
"encoding/binary"
"github.com/pkg/errors"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
)
func (a *TokenApplication) handleDataAnnounceProverPause(
currentFrameNumber uint64,
lockMap map[string]struct{},
t *protobufs.AnnounceProverPause,
) (
[]*protobufs.TokenOutput,
error,
) {
payload := []byte("pause")
if t == nil || t.PublicKeySignatureEd448 == nil {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle pause")
}
if t.PublicKeySignatureEd448.PublicKey == nil ||
t.PublicKeySignatureEd448.Signature == nil ||
t.PublicKeySignatureEd448.PublicKey.KeyValue == nil ||
t.Filter == nil || len(t.Filter) != 32 ||
t.FrameNumber < currentFrameNumber-1 || t.FrameNumber > currentFrameNumber {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle pause")
}
if _, touched := lockMap[string(
t.PublicKeySignatureEd448.PublicKey.KeyValue,
)]; touched {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle pause")
}
payload = binary.BigEndian.AppendUint64(payload, t.FrameNumber)
payload = append(payload, t.Filter...)
if err := t.PublicKeySignatureEd448.Verify(
t.PublicKeySignatureEd448.PublicKey.KeyValue,
); err != nil {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle pause")
}
address, err := a.getAddressFromSignature(t.PublicKeySignatureEd448)
if err != nil {
return nil, errors.Wrap(err, "handle pause")
}
inTries := false
for _, t := range a.Tries {
inTries = inTries || t.Contains(address)
}
if !inTries {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle pause")
}
lockMap[string(t.PublicKeySignatureEd448.PublicKey.KeyValue)] = struct{}{}
return []*protobufs.TokenOutput{
&protobufs.TokenOutput{
Output: &protobufs.TokenOutput_Pause{
Pause: t,
},
},
}, nil
}

View File

@ -0,0 +1,70 @@
package application
import (
"encoding/binary"
"github.com/pkg/errors"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
)
func (a *TokenApplication) handleDataAnnounceProverResume(
currentFrameNumber uint64,
lockMap map[string]struct{},
t *protobufs.AnnounceProverResume,
) (
[]*protobufs.TokenOutput,
error,
) {
payload := []byte("resume")
if t == nil || t.PublicKeySignatureEd448 == nil {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle resume")
}
if t.PublicKeySignatureEd448.PublicKey == nil ||
t.PublicKeySignatureEd448.Signature == nil ||
t.PublicKeySignatureEd448.PublicKey.KeyValue == nil ||
t.Filter == nil || len(t.Filter) != 32 ||
t.FrameNumber < currentFrameNumber-1 || t.FrameNumber > currentFrameNumber {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle resume")
}
if _, touched := lockMap[string(
t.PublicKeySignatureEd448.PublicKey.KeyValue,
)]; touched {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle resume")
}
payload = binary.BigEndian.AppendUint64(payload, t.FrameNumber)
payload = append(payload, t.Filter...)
if err := t.PublicKeySignatureEd448.Verify(
t.PublicKeySignatureEd448.PublicKey.KeyValue,
); err != nil {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle resume")
}
address, err := a.getAddressFromSignature(t.PublicKeySignatureEd448)
if err != nil {
return nil, errors.Wrap(err, "handle resume")
}
inTries := false
for _, t := range a.Tries {
inTries = inTries || t.Contains(address)
}
if !inTries {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle resume")
}
lockMap[string(t.PublicKeySignatureEd448.PublicKey.KeyValue)] = struct{}{}
return []*protobufs.TokenOutput{
&protobufs.TokenOutput{
Output: &protobufs.TokenOutput_Resume{
Resume: t,
},
},
}, nil
}

View File

@ -19,7 +19,8 @@ func (a *TokenApplication) handleSplit(
newCoins := []*protobufs.Coin{}
newAmounts := []*big.Int{}
payload := []byte{}
if t.Signature == nil || t.OfCoin == nil || t.OfCoin.Address == nil {
if t.Signature == nil || t.OfCoin == nil || t.OfCoin.Address == nil ||
len(t.OfCoin.Address) != 32 {
return nil, errors.Wrap(ErrInvalidStateTransition, "handle split")
}
coin, err := a.CoinStore.GetCoinByAddress(nil, t.OfCoin.Address)

View File

@ -3,7 +3,10 @@ package token
import (
"bytes"
"crypto"
"encoding/binary"
"encoding/hex"
"math/big"
"slices"
"strings"
"sync"
gotime "time"
@ -25,8 +28,31 @@ import (
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
"source.quilibrium.com/quilibrium/monorepo/node/store"
"source.quilibrium.com/quilibrium/monorepo/node/tries"
)
type peerSeniorityItem struct {
seniority uint64
addr string
}
type peerSeniority map[string]peerSeniorityItem
func newFromMap(m map[string]uint64) *peerSeniority {
s := &peerSeniority{}
for k, v := range m {
(*s)[k] = peerSeniorityItem{
seniority: v,
addr: k,
}
}
return s
}
func (p peerSeniorityItem) Priority() *big.Int {
return big.NewInt(int64(p.seniority))
}
type TokenExecutionEngine struct {
logger *zap.Logger
clock *data.DataClockConsensusEngine
@ -47,7 +73,7 @@ type TokenExecutionEngine struct {
alreadyPublishedShare bool
intrinsicFilter []byte
frameProver qcrypto.FrameProver
peerSeniority map[string]uint64
peerSeniority *peerSeniority
}
func NewTokenExecutionEngine(
@ -136,7 +162,7 @@ func NewTokenExecutionEngine(
peerChannels: map[string]*p2p.PublicP2PChannel{},
alreadyPublishedShare: false,
intrinsicFilter: intrinsicFilter,
peerSeniority: peerSeniority,
peerSeniority: newFromMap(peerSeniority),
}
dataTimeReel := time.NewDataTimeReel(
@ -145,15 +171,19 @@ func NewTokenExecutionEngine(
clockStore,
cfg.Engine,
frameProver,
func(txn store.Transaction, frame *protobufs.ClockFrame) error {
func(txn store.Transaction, frame *protobufs.ClockFrame) (
[]*tries.RollingFrecencyCritbitTrie,
error,
) {
if err := e.VerifyExecution(frame); err != nil {
return err
return nil, err
}
if err := e.ProcessFrame(txn, frame); err != nil {
return err
var tries []*tries.RollingFrecencyCritbitTrie
if tries, err = e.ProcessFrame(txn, frame); err != nil {
return nil, err
}
return nil
return tries, nil
},
origin,
inclusionProof,
@ -275,7 +305,7 @@ func NewTokenExecutionEngine(
// need to wait for peering
gotime.Sleep(30 * gotime.Second)
e.publishMessage(intrinsicFilter, req)
e.publishMessage(append([]byte{0x00}, intrinsicFilter...), req)
}()
} else {
f, _, err := e.clockStore.GetLatestDataClockFrame(e.intrinsicFilter)
@ -315,26 +345,33 @@ func NewTokenExecutionEngine(
}
if err == nil {
// msg := []byte("resume")
// msg = binary.BigEndian.AppendUint64(msg, f.FrameNumber)
// msg = append(msg, e.intrinsicFilter...)
// sig, err := e.pubSub.SignMessage(msg)
// if err != nil {
// panic(err)
// }
msg := []byte("resume")
msg = binary.BigEndian.AppendUint64(msg, f.FrameNumber)
msg = append(msg, e.intrinsicFilter...)
sig, err := e.pubSub.SignMessage(msg)
if err != nil {
panic(err)
}
// // need to wait for peering
// gotime.Sleep(30 * gotime.Second)
// e.publishMessage(e.intrinsicFilter, &protobufs.AnnounceProverResume{
// Filter: e.intrinsicFilter,
// FrameNumber: f.FrameNumber,
// PublicKeySignatureEd448: &protobufs.Ed448Signature{
// PublicKey: &protobufs.Ed448PublicKey{
// KeyValue: e.pubSub.GetPublicKey(),
// },
// Signature: sig,
// },
// })
// need to wait for peering
gotime.Sleep(30 * gotime.Second)
e.publishMessage(
append([]byte{0x00}, e.intrinsicFilter...),
&protobufs.TokenRequest{
Request: &protobufs.TokenRequest_Resume{
Resume: &protobufs.AnnounceProverResume{
Filter: e.intrinsicFilter,
FrameNumber: f.FrameNumber,
PublicKeySignatureEd448: &protobufs.Ed448Signature{
PublicKey: &protobufs.Ed448PublicKey{
KeyValue: e.pubSub.GetPublicKey(),
},
Signature: sig,
},
},
},
},
)
}
}
@ -439,10 +476,10 @@ func (e *TokenExecutionEngine) ProcessMessage(
func (e *TokenExecutionEngine) ProcessFrame(
txn store.Transaction,
frame *protobufs.ClockFrame,
) error {
) ([]*tries.RollingFrecencyCritbitTrie, error) {
f, err := e.coinStore.GetLatestFrameProcessed()
if err != nil || f == frame.FrameNumber {
return errors.Wrap(err, "process frame")
return nil, errors.Wrap(err, "process frame")
}
e.activeClockFrame = frame
@ -465,7 +502,7 @@ func (e *TokenExecutionEngine) ProcessFrame(
"error while materializing application from frame",
zap.Error(err),
)
return errors.Wrap(err, "process frame")
return nil, errors.Wrap(err, "process frame")
}
e.logger.Debug(
@ -473,13 +510,16 @@ func (e *TokenExecutionEngine) ProcessFrame(
zap.Int("outputs", len(app.TokenOutputs.Outputs)),
)
proverTrieJoinRequests := make(map[string]string)
proverTrieLeaveRequests := make(map[string]string)
for i, output := range app.TokenOutputs.Outputs {
switch o := output.Output.(type) {
case *protobufs.TokenOutput_Coin:
address, err := GetAddressOfCoin(o.Coin, frame.FrameNumber, uint64(i))
if err != nil {
txn.Abort()
return errors.Wrap(err, "process frame")
return nil, errors.Wrap(err, "process frame")
}
err = e.coinStore.PutCoin(
txn,
@ -489,13 +529,13 @@ func (e *TokenExecutionEngine) ProcessFrame(
)
if err != nil {
txn.Abort()
return errors.Wrap(err, "process frame")
return nil, errors.Wrap(err, "process frame")
}
case *protobufs.TokenOutput_DeletedCoin:
coin, err := e.coinStore.GetCoinByAddress(txn, o.DeletedCoin.Address)
if err != nil {
txn.Abort()
return errors.Wrap(err, "process frame")
return nil, errors.Wrap(err, "process frame")
}
err = e.coinStore.DeleteCoin(
txn,
@ -504,13 +544,13 @@ func (e *TokenExecutionEngine) ProcessFrame(
)
if err != nil {
txn.Abort()
return errors.Wrap(err, "process frame")
return nil, errors.Wrap(err, "process frame")
}
case *protobufs.TokenOutput_Proof:
address, err := GetAddressOfPreCoinProof(o.Proof)
if err != nil {
txn.Abort()
return errors.Wrap(err, "process frame")
return nil, errors.Wrap(err, "process frame")
}
err = e.coinStore.PutPreCoinProof(
txn,
@ -520,13 +560,13 @@ func (e *TokenExecutionEngine) ProcessFrame(
)
if err != nil {
txn.Abort()
return errors.Wrap(err, "process frame")
return nil, errors.Wrap(err, "process frame")
}
case *protobufs.TokenOutput_DeletedProof:
address, err := GetAddressOfPreCoinProof(o.DeletedProof)
if err != nil {
txn.Abort()
return errors.Wrap(err, "process frame")
return nil, errors.Wrap(err, "process frame")
}
err = e.coinStore.DeletePreCoinProof(
txn,
@ -535,7 +575,87 @@ func (e *TokenExecutionEngine) ProcessFrame(
)
if err != nil {
txn.Abort()
return errors.Wrap(err, "process frame")
return nil, errors.Wrap(err, "process frame")
}
case *protobufs.TokenOutput_Join:
addr, err := e.getAddressFromSignature(o.Join.PublicKeySignatureEd448)
if err != nil {
txn.Abort()
return nil, errors.Wrap(err, "process frame")
}
proverTrieJoinRequests[string(addr)] = string(addr)
case *protobufs.TokenOutput_Leave:
addr, err := e.getAddressFromSignature(o.Leave.PublicKeySignatureEd448)
if err != nil {
txn.Abort()
return nil, errors.Wrap(err, "process frame")
}
proverTrieJoinRequests[string(addr)] = string(addr)
case *protobufs.TokenOutput_Pause:
addr, err := e.getAddressFromSignature(o.Pause.PublicKeySignatureEd448)
if err != nil {
txn.Abort()
return nil, errors.Wrap(err, "process frame")
}
proverTrieJoinRequests[string(addr)] = string(addr)
case *protobufs.TokenOutput_Resume:
addr, err := e.getAddressFromSignature(o.Resume.PublicKeySignatureEd448)
if err != nil {
txn.Abort()
return nil, errors.Wrap(err, "process frame")
}
proverTrieJoinRequests[string(addr)] = string(addr)
}
}
joinAddrs := tries.NewMinHeap[peerSeniorityItem]()
leaveAddrs := tries.NewMinHeap[peerSeniorityItem]()
for _, addr := range proverTrieJoinRequests {
if _, ok := (*e.peerSeniority)[addr]; !ok {
joinAddrs.Push(peerSeniorityItem{
addr: addr,
seniority: 0,
})
} else {
joinAddrs.Push((*e.peerSeniority)[addr])
}
}
for _, addr := range proverTrieLeaveRequests {
if _, ok := (*e.peerSeniority)[addr]; !ok {
leaveAddrs.Push(peerSeniorityItem{
addr: addr,
seniority: 0,
})
} else {
leaveAddrs.Push((*e.peerSeniority)[addr])
}
}
joinReqs := make([]peerSeniorityItem, len(joinAddrs.All()))
copy(joinReqs, joinAddrs.All())
slices.Reverse(joinReqs)
leaveReqs := make([]peerSeniorityItem, len(leaveAddrs.All()))
copy(leaveReqs, leaveAddrs.All())
slices.Reverse(leaveReqs)
for _, addr := range joinReqs {
rings := len(app.Tries)
last := app.Tries[rings-1]
set := last.FindNearestAndApproximateNeighbors(make([]byte, 32))
if len(set) == 1024 || rings == 1 {
app.Tries = append(
app.Tries,
&tries.RollingFrecencyCritbitTrie{},
)
last = app.Tries[rings]
}
last.Add([]byte(addr.addr), frame.FrameNumber)
}
for _, addr := range leaveReqs {
for _, t := range app.Tries {
if t.Contains([]byte(addr.addr)) {
t.Remove([]byte(addr.addr))
break
}
}
}
@ -543,10 +663,10 @@ func (e *TokenExecutionEngine) ProcessFrame(
err = e.coinStore.SetLatestFrameProcessed(txn, frame.FrameNumber)
if err != nil {
txn.Abort()
return errors.Wrap(err, "process frame")
return nil, errors.Wrap(err, "process frame")
}
return nil
return app.Tries, nil
}
func (e *TokenExecutionEngine) publishMessage(
@ -682,3 +802,17 @@ func (e *TokenExecutionEngine) GetPeerInfo() *protobufs.PeerInfoResponse {
func (e *TokenExecutionEngine) GetFrame() *protobufs.ClockFrame {
return e.clock.GetFrame()
}
func (e *TokenExecutionEngine) getAddressFromSignature(
sig *protobufs.Ed448Signature,
) ([]byte, error) {
if sig.PublicKey == nil || sig.PublicKey.KeyValue == nil {
return nil, errors.New("invalid data")
}
addrBI, err := poseidon.HashBytes(sig.PublicKey.KeyValue)
if err != nil {
return nil, errors.Wrap(err, "get address from signature")
}
return addrBI.FillBytes(make([]byte, 32)), nil
}

File diff suppressed because it is too large Load Diff

View File

@ -173,7 +173,6 @@ message PeerManifest {
message AnnounceProverRequest {
repeated quilibrium.node.keys.pb.Ed448Signature public_key_signatures_ed448 = 1;
MintCoinRequest initial_proof = 2;
}
message AnnounceProverJoin {
@ -238,6 +237,10 @@ message TokenRequest {
MergeCoinRequest merge = 3;
MintCoinRequest mint = 4;
AnnounceProverRequest announce = 5;
AnnounceProverJoin join = 6;
AnnounceProverLeave leave = 7;
AnnounceProverPause pause = 8;
AnnounceProverResume resume = 9;
}
}
@ -262,6 +265,11 @@ message TokenOutput {
PreCoinProof proof = 2;
CoinRef deleted_coin = 3;
PreCoinProof deleted_proof = 4;
AnnounceProverRequest announce = 5;
AnnounceProverJoin join = 6;
AnnounceProverLeave leave = 7;
AnnounceProverPause pause = 8;
AnnounceProverResume resume = 9;
}
}