diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index b84b480..bdaec46 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -337,7 +337,7 @@ func (e *DataClockConsensusEngine) Start() <-chan error { if frame.FrameNumber >= nextFrame.FrameNumber || nextFrame.FrameNumber == 0 { - time.Sleep(30 * time.Second) + time.Sleep(60 * time.Second) continue } @@ -592,24 +592,24 @@ func (e *DataClockConsensusEngine) Stop(force bool) <-chan error { e.state = consensus.EngineStateStopping errChan := make(chan error) - msg := []byte("pause") - msg = binary.BigEndian.AppendUint64(msg, e.GetFrame().FrameNumber) - msg = append(msg, e.filter...) - sig, err := e.pubSub.SignMessage(msg) - if err != nil { - panic(err) - } + // msg := []byte("pause") + // msg = binary.BigEndian.AppendUint64(msg, e.GetFrame().FrameNumber) + // msg = append(msg, e.filter...) + // sig, err := e.pubSub.SignMessage(msg) + // if err != nil { + // panic(err) + // } - e.publishMessage(e.filter, &protobufs.AnnounceProverPause{ - Filter: e.filter, - FrameNumber: e.GetFrame().FrameNumber, - PublicKeySignatureEd448: &protobufs.Ed448Signature{ - PublicKey: &protobufs.Ed448PublicKey{ - KeyValue: e.pubSub.GetPublicKey(), - }, - Signature: sig, - }, - }) + // e.publishMessage(e.filter, &protobufs.AnnounceProverPause{ + // Filter: e.filter, + // FrameNumber: e.GetFrame().FrameNumber, + // PublicKeySignatureEd448: &protobufs.Ed448Signature{ + // PublicKey: &protobufs.Ed448PublicKey{ + // KeyValue: e.pubSub.GetPublicKey(), + // }, + // Signature: sig, + // }, + // }) wg := sync.WaitGroup{} wg.Add(len(e.executionEngines)) diff --git a/node/consensus/data/message_handler.go b/node/consensus/data/message_handler.go index 38e1ca7..21bd016 100644 --- a/node/consensus/data/message_handler.go +++ b/node/consensus/data/message_handler.go @@ -25,6 +25,7 @@ func (e *DataClockConsensusEngine) runMessageHandler() { msg := &protobufs.Message{} if err := proto.Unmarshal(message.Data, msg); err != nil { + e.logger.Debug("bad message") continue } @@ -84,6 +85,7 @@ func (e *DataClockConsensusEngine) runMessageHandler() { e.logger.Error("error while unmarshaling", zap.Error(err)) continue } + e.logger.Debug("message type", zap.String("type", any.TypeUrl)) go func() { switch any.TypeUrl { @@ -112,34 +114,38 @@ func (e *DataClockConsensusEngine) runMessageHandler() { ); err != nil { return } - // case protobufs.AnnounceProverJoinType: - // if err := e.handleDataAnnounceProverJoin( - // message.From, - // msg.Address, - // any, - // ); err != nil { - // return - // } - // case protobufs.AnnounceProverLeaveType: - // if !e.IsInProverTrie(peer.peerId) { - // return - // } - // if err := e.handleDataAnnounceProverLeave( - // message.From, - // msg.Address, - // any, - // ); err != nil { - // return - // } - // case protobufs.AnnounceProverPauseType: - // if err := e.handleDataAnnounceProverPause( - // message.From, - // msg.Address, - // any, - // ); err != nil { - // return - // } - // case protobufs.AnnounceProverResumeType: + // case protobufs.AnnounceProverJoinType: + // if err := e.handleDataAnnounceProverJoin( + // message.From, + // msg.Address, + // any, + // ); err != nil { + // return + // } + // case protobufs.AnnounceProverLeaveType: + // if !e.IsInProverTrie(peer.peerId) { + // return + // } + // if err := e.handleDataAnnounceProverLeave( + // message.From, + // msg.Address, + // any, + // ); err != nil { + // return + // } + case protobufs.AnnounceProverPauseType: + // stop spamming + e.pubSub.AddPeerScore(message.From, -1000) + // if err := e.handleDataAnnounceProverPause( + // message.From, + // msg.Address, + // any, + // ); err != nil { + // return + // } + case protobufs.AnnounceProverResumeType: + // stop spamming + e.pubSub.AddPeerScore(message.From, -1000) // if err := e.handleDataAnnounceProverResume( // message.From, // msg.Address, @@ -172,6 +178,15 @@ func (e *DataClockConsensusEngine) handleRebroadcast( return nil } + e.logger.Debug( + "received rebroadcast", + zap.Uint64("from", frames.From), + zap.Uint64("to", frames.To), + ) + if head.FrameNumber+1 < frames.From { + return nil + } + if head.FrameNumber > frames.To { return nil } diff --git a/node/consensus/data/pre_midnight_proof_worker.go b/node/consensus/data/pre_midnight_proof_worker.go index ff40487..960707e 100644 --- a/node/consensus/data/pre_midnight_proof_worker.go +++ b/node/consensus/data/pre_midnight_proof_worker.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/binary" + "strings" "time" "github.com/iden3/go-iden3-crypto/poseidon" @@ -85,7 +86,18 @@ func (e *DataClockConsensusEngine) runPreMidnightProofWorker() { } resume := make([]byte, 32) + cc, err := e.pubSub.GetDirectChannel([]byte(peerId), "worker") + if err != nil { + e.logger.Info( + "could not establish direct channel, waiting...", + zap.Error(err), + ) + time.Sleep(10 * time.Second) + } for { + if e.state >= consensus.EngineStateStopping || e.state == consensus.EngineStateStopped { + break + } _, prfs, err := e.coinStore.GetPreCoinProofsForOwner(addr) if err != nil && !errors.Is(err, store.ErrNotFound) { e.logger.Error("error while fetching pre-coin proofs", zap.Error(err)) @@ -97,14 +109,17 @@ func (e *DataClockConsensusEngine) runPreMidnightProofWorker() { return } - cc, err := e.pubSub.GetDirectChannel([]byte(peerId), "worker") - if err != nil { - e.logger.Info( - "could not establish direct channel, waiting...", - zap.Error(err), - ) - time.Sleep(10 * time.Second) - continue + if cc == nil { + cc, err = e.pubSub.GetDirectChannel([]byte(peerId), "worker") + if err != nil { + e.logger.Info( + "could not establish direct channel, waiting...", + zap.Error(err), + ) + cc = nil + time.Sleep(10 * time.Second) + continue + } } client := protobufs.NewDataServiceClient(cc) @@ -125,6 +140,7 @@ func (e *DataClockConsensusEngine) runPreMidnightProofWorker() { ) time.Sleep(10 * time.Second) cc.Close() + cc = nil continue } @@ -166,6 +182,7 @@ func (e *DataClockConsensusEngine) runPreMidnightProofWorker() { zap.Int("increment", i), ) cc.Close() + cc = nil return } @@ -201,8 +218,20 @@ func (e *DataClockConsensusEngine) runPreMidnightProofWorker() { "got error response, waiting...", zap.Error(err), ) + if strings.Contains( + err.Error(), + application.ErrInvalidStateTransition.Error(), + ) && i == 0 { + resume = make([]byte, 32) + e.logger.Info("pre-midnight proofs submitted, returning") + cc.Close() + cc = nil + return + } + resume = make([]byte, 32) cc.Close() + cc = nil time.Sleep(10 * time.Second) break } @@ -217,6 +246,7 @@ func (e *DataClockConsensusEngine) runPreMidnightProofWorker() { if i == 0 { e.logger.Info("pre-midnight proofs submitted, returning") cc.Close() + cc = nil return } else { increment = uint32(i) - 1 @@ -225,7 +255,6 @@ func (e *DataClockConsensusEngine) runPreMidnightProofWorker() { break } } - cc.Close() } } diff --git a/node/consensus/data/token_handle_mint_test.go b/node/consensus/data/token_handle_mint_test.go index 8ba14cb..76d66e3 100644 --- a/node/consensus/data/token_handle_mint_test.go +++ b/node/consensus/data/token_handle_mint_test.go @@ -68,9 +68,10 @@ func (pubsub) GetNetworkInfo() *protobufs.NetworkInfoResponse { func (p pubsub) SignMessage(msg []byte) ([]byte, error) { return p.privkey.Sign(rand.Reader, msg, gocrypto.Hash(0)) } -func (p pubsub) GetPublicKey() []byte { return p.pubkey } -func (pubsub) GetPeerScore(peerId []byte) int64 { return 0 } -func (pubsub) SetPeerScore(peerId []byte, score int64) {} +func (p pubsub) GetPublicKey() []byte { return p.pubkey } +func (pubsub) GetPeerScore(peerId []byte) int64 { return 0 } +func (pubsub) SetPeerScore(peerId []byte, score int64) {} +func (pubsub) AddPeerScore(peerId []byte, scoreDelta int64) {} type outputs struct { difficulty uint32 diff --git a/node/consensus/time/data_time_reel.go b/node/consensus/time/data_time_reel.go index 228e6c2..19b57cf 100644 --- a/node/consensus/time/data_time_reel.go +++ b/node/consensus/time/data_time_reel.go @@ -169,11 +169,11 @@ func (d *DataTimeReel) Insert(frame *protobufs.ClockFrame, isSync bool) error { zap.String("output_tag", hex.EncodeToString(frame.Output[:64])), ) - if d.lruFrames.Contains(string(frame.Output[:64])) { - return nil - } + // if d.lruFrames.Contains(string(frame.Output[:64])) { + // return nil + // } - d.lruFrames.Add(string(frame.Output[:64]), string(frame.ParentSelector)) + // d.lruFrames.Add(string(frame.Output[:64]), string(frame.ParentSelector)) parent := new(big.Int).SetBytes(frame.ParentSelector) selector, err := frame.GetSelector() @@ -183,16 +183,18 @@ func (d *DataTimeReel) Insert(frame *protobufs.ClockFrame, isSync bool) error { distance, _ := d.GetDistance(frame) - d.storePending(selector, parent, distance, frame) - if d.head.FrameNumber < frame.FrameNumber { - go func() { - d.frames <- &pendingFrame{ - selector: selector, - parentSelector: parent, - frameNumber: frame.FrameNumber, - } - }() + d.storePending(selector, parent, distance, frame) + + if d.head.FrameNumber+1 == frame.FrameNumber { + go func() { + d.frames <- &pendingFrame{ + selector: selector, + parentSelector: parent, + frameNumber: frame.FrameNumber, + } + }() + } } return nil diff --git a/node/execution/intrinsics/token/token_execution_engine.go b/node/execution/intrinsics/token/token_execution_engine.go index 61104cd..0e9f81b 100644 --- a/node/execution/intrinsics/token/token_execution_engine.go +++ b/node/execution/intrinsics/token/token_execution_engine.go @@ -3,7 +3,6 @@ package token import ( "bytes" "crypto" - "encoding/binary" "encoding/hex" "strings" "sync" @@ -316,26 +315,26 @@ func NewTokenExecutionEngine( } if err == nil { - msg := []byte("resume") - msg = binary.BigEndian.AppendUint64(msg, f.FrameNumber) - msg = append(msg, e.intrinsicFilter...) - sig, err := e.pubSub.SignMessage(msg) - if err != nil { - panic(err) - } + // msg := []byte("resume") + // msg = binary.BigEndian.AppendUint64(msg, f.FrameNumber) + // msg = append(msg, e.intrinsicFilter...) + // sig, err := e.pubSub.SignMessage(msg) + // if err != nil { + // panic(err) + // } - // need to wait for peering - gotime.Sleep(30 * gotime.Second) - e.publishMessage(e.intrinsicFilter, &protobufs.AnnounceProverResume{ - Filter: e.intrinsicFilter, - FrameNumber: f.FrameNumber, - PublicKeySignatureEd448: &protobufs.Ed448Signature{ - PublicKey: &protobufs.Ed448PublicKey{ - KeyValue: e.pubSub.GetPublicKey(), - }, - Signature: sig, - }, - }) + // // need to wait for peering + // gotime.Sleep(30 * gotime.Second) + // e.publishMessage(e.intrinsicFilter, &protobufs.AnnounceProverResume{ + // Filter: e.intrinsicFilter, + // FrameNumber: f.FrameNumber, + // PublicKeySignatureEd448: &protobufs.Ed448Signature{ + // PublicKey: &protobufs.Ed448PublicKey{ + // KeyValue: e.pubSub.GetPublicKey(), + // }, + // Signature: sig, + // }, + // }) } } diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index f50afe5..b359ff4 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -584,6 +584,16 @@ func (b *BlossomSub) SetPeerScore(peerId []byte, score int64) { b.peerScoreMx.Unlock() } +func (b *BlossomSub) AddPeerScore(peerId []byte, scoreDelta int64) { + b.peerScoreMx.Lock() + if _, ok := b.peerScore[string(peerId)]; !ok { + b.peerScore[string(peerId)] = scoreDelta + } else { + b.peerScore[string(peerId)] = b.peerScore[string(peerId)] + scoreDelta + } + b.peerScoreMx.Unlock() +} + func (b *BlossomSub) GetBitmaskPeers() map[string][]string { peers := map[string][]string{} diff --git a/node/p2p/pubsub.go b/node/p2p/pubsub.go index aff95cf..620f6fd 100644 --- a/node/p2p/pubsub.go +++ b/node/p2p/pubsub.go @@ -35,4 +35,5 @@ type PubSub interface { GetPublicKey() []byte GetPeerScore(peerId []byte) int64 SetPeerScore(peerId []byte, score int64) + AddPeerScore(peerId []byte, scoreDelta int64) }