package data import ( "bytes" "fmt" "sync" "time" "github.com/iden3/go-iden3-crypto/poseidon" "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" "go.uber.org/zap" "golang.org/x/crypto/sha3" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "source.quilibrium.com/quilibrium/monorepo/node/config" "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token/application" "source.quilibrium.com/quilibrium/monorepo/node/protobufs" ) func (e *DataClockConsensusEngine) runFrameMessageHandler() { for { select { case <-e.ctx.Done(): return case message := <-e.frameMessageProcessorCh: e.logger.Debug("handling frame message") msg := &protobufs.Message{} if err := proto.Unmarshal(message.Data, msg); err != nil { e.logger.Debug("bad message") continue } any := &anypb.Any{} if err := proto.Unmarshal(msg.Payload, any); err != nil { e.logger.Error("error while unmarshaling", zap.Error(err)) continue } accepted := false switch any.TypeUrl { //expand for future message types case protobufs.ClockFrameType: accepted = true default: } if !accepted { e.pubSub.AddPeerScore(message.From, -100000) continue } switch any.TypeUrl { case protobufs.ClockFrameType: if err := e.handleClockFrameData( message.From, msg.Address, any, false, ); err != nil { e.logger.Debug("could not handle clock frame data", zap.Error(err)) } } } } } func (e *DataClockConsensusEngine) runTxMessageHandler() { for { select { case <-e.ctx.Done(): return case message := <-e.txMessageProcessorCh: e.logger.Debug("handling tx message") msg := &protobufs.Message{} if err := proto.Unmarshal(message.Data, msg); err != nil { e.logger.Debug("bad message") continue } any := &anypb.Any{} if err := proto.Unmarshal(msg.Payload, any); err != nil { continue } accepted := false switch any.TypeUrl { //expand for future message types case protobufs.TokenRequestType: accepted = true default: } if !accepted { e.pubSub.AddPeerScore(message.From, -100000) continue } if e.frameProverTries[0].Contains(e.provingKeyAddress) { wg := &sync.WaitGroup{} for name := range e.executionEngines { name := name wg.Add(1) go func() error { defer wg.Done() messages, err := e.executionEngines[name].ProcessMessage( application.TOKEN_ADDRESS, msg, ) if err != nil { e.logger.Debug( "could not process message for engine", zap.Error(err), zap.String("engine_name", name), ) return nil } for _, appMessage := range messages { appMsg := &anypb.Any{} err := proto.Unmarshal(appMessage.Payload, appMsg) if err != nil { e.logger.Error( "could not unmarshal app message", zap.Error(err), zap.String("engine_name", name), ) continue } switch appMsg.TypeUrl { case protobufs.TokenRequestType: t := &protobufs.TokenRequest{} err := proto.Unmarshal(appMsg.Value, t) if err != nil { e.logger.Debug("could not unmarshal token request", zap.Error(err)) continue } if err := e.handleTokenRequest(t); err != nil { e.logger.Debug("could not handle token request", zap.Error(err)) } } } return nil }() } wg.Wait() } } } } func (e *DataClockConsensusEngine) runInfoMessageHandler() { for { select { case <-e.ctx.Done(): return case message := <-e.infoMessageProcessorCh: e.logger.Debug("handling info message") msg := &protobufs.Message{} if err := proto.Unmarshal(message.Data, msg); err != nil { e.logger.Debug("bad message") continue } any := &anypb.Any{} if err := proto.Unmarshal(msg.Payload, any); err != nil { e.logger.Error("error while unmarshaling", zap.Error(err)) continue } accepted := false switch any.TypeUrl { //expand for future message types case protobufs.DataPeerListAnnounceType: accepted = true default: } if !accepted { e.pubSub.AddPeerScore(message.From, -100000) continue } switch any.TypeUrl { case protobufs.DataPeerListAnnounceType: if err := e.handleDataPeerListAnnounce( message.From, msg.Address, any, ); err != nil { e.logger.Debug("could not handle data peer list announce", zap.Error(err)) } } } } } func (e *DataClockConsensusEngine) handleClockFrame( peerID []byte, address []byte, frame *protobufs.ClockFrame, ) error { if frame == nil { return errors.Wrap(errors.New("frame is nil"), "handle clock frame") } addr, err := poseidon.HashBytes( frame.GetPublicKeySignatureEd448().PublicKey.KeyValue, ) if err != nil { return errors.Wrap(err, "handle clock frame data") } trie := e.GetFrameProverTries()[0] if !trie.Contains(addr.FillBytes(make([]byte, 32))) { e.logger.Debug( "prover not in trie at frame, address may be in fork", zap.Binary("address", address), zap.Binary("filter", frame.Filter), zap.Uint64("frame_number", frame.FrameNumber), ) return nil } e.logger.Debug( "got clock frame", zap.Binary("address", address), zap.Binary("filter", frame.Filter), zap.Uint64("frame_number", frame.FrameNumber), zap.Int("proof_count", len(frame.AggregateProofs)), ) if err := e.frameProver.VerifyDataClockFrame(frame); err != nil { e.logger.Debug("could not verify clock frame", zap.Error(err)) return errors.Wrap(err, "handle clock frame data") } e.logger.Debug( "clock frame was valid", zap.Binary("address", address), zap.Binary("filter", frame.Filter), zap.Uint64("frame_number", frame.FrameNumber), ) head, err := e.dataTimeReel.Head() if err != nil { panic(err) } if frame.FrameNumber > head.FrameNumber { e.dataTimeReel.Insert(e.ctx, frame, false) } return nil } func (e *DataClockConsensusEngine) handleClockFrameData( peerID []byte, address []byte, any *anypb.Any, isSync bool, ) error { if bytes.Equal(peerID, e.pubSub.GetPeerID()) { return nil } frame := &protobufs.ClockFrame{} if err := any.UnmarshalTo(frame); err != nil { return errors.Wrap(err, "handle clock frame data") } return e.handleClockFrame(peerID, address, frame) } func (e *DataClockConsensusEngine) handleDataPeerListAnnounce( peerID []byte, address []byte, any *anypb.Any, ) error { if bytes.Equal(peerID, e.pubSub.GetPeerID()) { return nil } announce := &protobufs.DataPeerListAnnounce{} if err := any.UnmarshalTo(announce); err != nil { return errors.Wrap(err, "handle data peer list announce") } p := announce.Peer if p == nil { return nil } head, err := e.dataTimeReel.Head() if err != nil { return errors.Wrap(err, "handle data peer list announce") } if p.MaxFrame <= head.FrameNumber { return nil } if p.Version != nil && bytes.Compare(p.Version, config.GetMinimumVersion()) < 0 && p.Timestamp > config.GetMinimumVersionCutoff().UnixMilli() { e.logger.Debug( "peer provided outdated version, penalizing app score", zap.String("peer_id", peer.ID(peerID).String()), ) e.pubSub.SetPeerScore(peerID, -1000000) return nil } e.peerMapMx.RLock() if _, ok := e.uncooperativePeersMap[string(peerID)]; ok { e.peerMapMx.RUnlock() return nil } e.peerMapMx.RUnlock() e.pubSub.SetPeerScore(peerID, 10) e.peerMapMx.RLock() existing, ok := e.peerMap[string(peerID)] e.peerMapMx.RUnlock() if ok && existing.timestamp > p.Timestamp { return nil } multiaddr := e.pubSub.GetMultiaddrOfPeer(peerID) e.peerMapMx.Lock() e.peerMap[string(peerID)] = &peerInfo{ peerId: peerID, multiaddr: multiaddr, maxFrame: p.MaxFrame, lastSeen: time.Now().Unix(), timestamp: p.Timestamp, version: p.Version, totalDistance: p.TotalDistance, } e.peerMapMx.Unlock() select { case <-e.ctx.Done(): return nil case e.requestSyncCh <- nil: default: } return nil } func TokenRequestIdentifiers(transition *protobufs.TokenRequest) []string { switch t := transition.Request.(type) { case *protobufs.TokenRequest_Transfer: return []string{fmt.Sprintf("transfer-%x", t.Transfer.OfCoin.Address)} case *protobufs.TokenRequest_Split: return []string{fmt.Sprintf("split-%x", t.Split.OfCoin.Address)} case *protobufs.TokenRequest_Merge: identifiers := make([]string, len(t.Merge.Coins)) for i, coin := range t.Merge.Coins { identifiers[i] = fmt.Sprintf("merge-%x", coin.Address) } return identifiers case *protobufs.TokenRequest_Mint: if len(t.Mint.Proofs) == 1 { return []string{fmt.Sprintf("mint-proof-%x", sha3.Sum512(t.Mint.Proofs[0]))} } return []string{fmt.Sprintf("mint-sign-%x", t.Mint.Signature.PublicKey.KeyValue)} case *protobufs.TokenRequest_Announce: identifiers := make([]string, len(t.Announce.GetPublicKeySignaturesEd448())) for i, sig := range t.Announce.GetPublicKeySignaturesEd448() { identifiers[i] = fmt.Sprintf("announce-%x", sig.PublicKey.KeyValue) } return identifiers case *protobufs.TokenRequest_Join: return []string{fmt.Sprintf("join-%x", t.Join.GetPublicKeySignatureEd448().PublicKey.KeyValue)} case *protobufs.TokenRequest_Leave: return []string{fmt.Sprintf("leave-%x", t.Leave.GetPublicKeySignatureEd448().PublicKey.KeyValue)} case *protobufs.TokenRequest_Pause: return []string{fmt.Sprintf("pause-%x", t.Pause.GetPublicKeySignatureEd448().PublicKey.KeyValue)} case *protobufs.TokenRequest_Resume: return []string{fmt.Sprintf("resume-%x", t.Resume.GetPublicKeySignatureEd448().PublicKey.KeyValue)} default: panic("unhandled transition type") } } func (e *DataClockConsensusEngine) handleTokenRequest( transition *protobufs.TokenRequest, ) error { if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) { identifiers := TokenRequestIdentifiers(transition) e.stagedTransactionsMx.Lock() if e.stagedTransactions == nil { e.stagedTransactions = &protobufs.TokenRequests{} e.stagedTransactionsSet = make(map[string]struct{}) } var found bool for _, identifier := range identifiers { if _, ok := e.stagedTransactionsSet[identifier]; ok { found = true break } } if !found { e.stagedTransactions.Requests = append( e.stagedTransactions.Requests, transition, ) for _, identifier := range identifiers { e.stagedTransactionsSet[identifier] = struct{}{} } } e.stagedTransactionsMx.Unlock() } return nil } func nearestApplicablePowerOfTwo(number uint64) uint64 { power := uint64(128) if number > 2048 { power = 65536 } else if number > 1024 { power = 2048 } else if number > 128 { power = 1024 } return power }