package app import ( "bytes" "context" "time" "github.com/iden3/go-iden3-crypto/poseidon" "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" qgrpc "source.quilibrium.com/quilibrium/monorepo/node/internal/grpc" "source.quilibrium.com/quilibrium/monorepo/protobufs" "source.quilibrium.com/quilibrium/monorepo/types/store" ) func (e *AppConsensusEngine) GetAppShardFrame( ctx context.Context, request *protobufs.GetAppShardFrameRequest, ) (*protobufs.AppShardFrameResponse, error) { peerID, err := e.authenticateProverFromContext(ctx) if err != nil { return nil, err } e.logger.Debug( "received frame request", zap.Uint64("frame_number", request.FrameNumber), zap.String("peer_id", peerID.String()), ) var frame *protobufs.AppShardFrame if request.FrameNumber == 0 { frame = *e.forks.FinalizedState().State if frame.Header.FrameNumber == 0 { return nil, errors.Wrap( errors.New("not currently syncable"), "get app frame", ) } } else { frame, _, err = e.clockStore.GetShardClockFrame( request.Filter, request.FrameNumber, false, ) } if err != nil { e.logger.Debug( "received error while fetching time reel head", zap.String("peer_id", peerID.String()), zap.Uint64("frame_number", request.FrameNumber), zap.Error(err), ) return nil, errors.Wrap(err, "get data frame") } return &protobufs.AppShardFrameResponse{ Frame: frame, }, nil } func (e *AppConsensusEngine) GetAppShardProposal( ctx context.Context, request *protobufs.GetAppShardProposalRequest, ) (*protobufs.AppShardProposalResponse, error) { peerID, err := e.authenticateProverFromContext(ctx) if err != nil { return nil, err } // Genesis does not have a parent cert, treat special: if request.FrameNumber == 0 { frame, _, err := e.clockStore.GetShardClockFrame( request.Filter, request.FrameNumber, false, ) if err != nil { e.logger.Debug( "received error while fetching shard frame", zap.String("peer_id", peerID.String()), zap.Uint64("frame_number", request.FrameNumber), zap.Error(err), ) return nil, errors.Wrap(err, "get shard proposal") } return &protobufs.AppShardProposalResponse{ Proposal: &protobufs.AppShardProposal{ State: frame, }, }, nil } e.logger.Debug( "received proposal request", zap.Uint64("frame_number", request.FrameNumber), zap.String("peer_id", peerID.String()), ) frame, err := e.loadAppFrameMatchingSelector( request.Filter, request.FrameNumber, nil, ) if err != nil { return &protobufs.AppShardProposalResponse{}, nil } vote, err := e.clockStore.GetProposalVote( request.Filter, frame.GetRank(), []byte(frame.Source()), ) if err != nil { return &protobufs.AppShardProposalResponse{}, nil } parent, err := e.loadAppFrameMatchingSelector( request.Filter, request.FrameNumber-1, frame.Header.ParentSelector, ) if err != nil { e.logger.Debug( "received error while fetching shard frame parent", zap.String("peer_id", peerID.String()), zap.Uint64("frame_number", request.FrameNumber), zap.Error(err), ) return nil, errors.Wrap(err, "get shard proposal") } parentQC, err := e.clockStore.GetQuorumCertificate( request.Filter, parent.GetRank(), ) if err != nil { e.logger.Debug( "received error while fetching QC parent", zap.String("peer_id", peerID.String()), zap.Uint64("frame_number", request.FrameNumber), zap.Error(err), ) return nil, errors.Wrap(err, "get shard proposal") } // no tc is fine, pass the nil along priorRankTC, _ := e.clockStore.GetTimeoutCertificate( request.Filter, frame.GetRank()-1, ) proposal := &protobufs.AppShardProposal{ State: frame, ParentQuorumCertificate: parentQC, PriorRankTimeoutCertificate: priorRankTC, Vote: vote, } return &protobufs.AppShardProposalResponse{ Proposal: proposal, }, nil } func (e *AppConsensusEngine) RegisterServices(server *grpc.Server) { protobufs.RegisterAppShardServiceServer(server, e) protobufs.RegisterDispatchServiceServer(server, e.dispatchService) protobufs.RegisterHypergraphComparisonServiceServer(server, e.hyperSync) protobufs.RegisterOnionServiceServer(server, e.onionService) } func (e *AppConsensusEngine) loadAppFrameMatchingSelector( filter []byte, frameNumber uint64, expectedSelector []byte, ) (*protobufs.AppShardFrame, error) { matchesSelector := func(frame *protobufs.AppShardFrame) bool { if frame == nil || frame.Header == nil || len(expectedSelector) == 0 { return true } return bytes.Equal(frame.Header.ParentSelector, expectedSelector) } frame, _, err := e.clockStore.GetShardClockFrame(filter, frameNumber, false) if err == nil && matchesSelector(frame) { return frame, nil } iter, iterErr := e.clockStore.RangeStagedShardClockFrames( filter, frameNumber, frameNumber, ) if iterErr != nil { if err != nil { return nil, err } return nil, iterErr } defer iter.Close() for ok := iter.First(); ok && iter.Valid(); ok = iter.Next() { candidate, valErr := iter.Value() if valErr != nil { return nil, valErr } if matchesSelector(candidate) { return candidate, nil } } if err == nil && matchesSelector(frame) { return frame, nil } return nil, store.ErrNotFound } func (e *AppConsensusEngine) authenticateProverFromContext( ctx context.Context, ) (peer.ID, error) { peerID, ok := qgrpc.PeerIDFromContext(ctx) if !ok { return peerID, status.Error(codes.Internal, "remote peer ID not found") } if !bytes.Equal(e.pubsub.GetPeerID(), []byte(peerID)) { if e.peerAuthCacheAllows(peerID) { return peerID, nil } registry, err := e.keyStore.GetKeyRegistry( []byte(peerID), ) if err != nil { return peerID, status.Error( codes.PermissionDenied, "could not identify peer", ) } if registry.ProverKey == nil || registry.ProverKey.KeyValue == nil { return peerID, status.Error( codes.PermissionDenied, "could not identify peer (no prover)", ) } addrBI, err := poseidon.HashBytes(registry.ProverKey.KeyValue) if err != nil { return peerID, status.Error( codes.PermissionDenied, "could not identify peer (invalid address)", ) } addr := addrBI.FillBytes(make([]byte, 32)) info, err := e.proverRegistry.GetActiveProvers(e.appAddress) if err != nil { return peerID, status.Error( codes.PermissionDenied, "could not identify peer (no prover registry)", ) } found := false for _, prover := range info { if bytes.Equal(prover.Address, addr) { found = true break } } if !found { return peerID, status.Error( codes.PermissionDenied, "invalid peer", ) } e.markPeerAuthCache(peerID) } return peerID, nil } const appPeerAuthCacheTTL = 10 * time.Second func (e *AppConsensusEngine) peerAuthCacheAllows(id peer.ID) bool { e.peerAuthCacheMu.RLock() expiry, ok := e.peerAuthCache[string(id)] e.peerAuthCacheMu.RUnlock() if !ok { return false } if time.Now().After(expiry) { e.peerAuthCacheMu.Lock() if current, exists := e.peerAuthCache[string(id)]; exists && current.Equal(expiry) { delete(e.peerAuthCache, string(id)) } e.peerAuthCacheMu.Unlock() return false } return true } func (e *AppConsensusEngine) markPeerAuthCache(id peer.ID) { e.peerAuthCacheMu.Lock() e.peerAuthCache[string(id)] = time.Now().Add(appPeerAuthCacheTTL) e.peerAuthCacheMu.Unlock() }