ceremonyclient/node/consensus/sync/sync_provider.go
Cassandra Heart 2acc979ccf
v2.1.0.13
2025-11-29 15:04:37 -06:00

629 lines
14 KiB
Go

package sync
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"math/rand"
"slices"
"time"
pcrypto "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
mn "github.com/multiformats/go-multiaddr/net"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
"source.quilibrium.com/quilibrium/monorepo/config"
"source.quilibrium.com/quilibrium/monorepo/consensus"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/channel"
tconsensus "source.quilibrium.com/quilibrium/monorepo/types/consensus"
"source.quilibrium.com/quilibrium/monorepo/types/hypergraph"
tp2p "source.quilibrium.com/quilibrium/monorepo/types/p2p"
"source.quilibrium.com/quilibrium/monorepo/types/tries"
)
const defaultStateQueueCapacity = 1
type syncRequest struct {
frameNumber uint64
peerId []byte
identity []byte
}
type UniqueFrame interface {
models.Unique
GetFrameNumber() uint64
}
type ProposalProcessor[ProposalT any] interface {
AddProposal(proposal ProposalT)
}
// SyncProvider implements consensus.SyncProvider
type SyncProvider[StateT UniqueFrame, ProposalT any] struct {
logger *zap.Logger
queuedStates chan syncRequest
forks consensus.Forks[StateT]
proverRegistry tconsensus.ProverRegistry
signerRegistry tconsensus.SignerRegistry
peerInfoManager tp2p.PeerInfoManager
proposalSynchronizer SyncClient[StateT, ProposalT]
hypergraph hypergraph.Hypergraph
config *config.Config
filter []byte
proverAddress []byte
}
var _ consensus.SyncProvider[*protobufs.GlobalFrame] = (*SyncProvider[*protobufs.GlobalFrame, *protobufs.GlobalProposal])(nil)
func NewSyncProvider[StateT UniqueFrame, ProposalT any](
logger *zap.Logger,
forks consensus.Forks[StateT],
proverRegistry tconsensus.ProverRegistry,
signerRegistry tconsensus.SignerRegistry,
peerInfoManager tp2p.PeerInfoManager,
proposalSynchronizer SyncClient[StateT, ProposalT],
hypergraph hypergraph.Hypergraph,
config *config.Config,
filter []byte,
proverAddress []byte,
) *SyncProvider[StateT, ProposalT] {
return &SyncProvider[StateT, ProposalT]{
logger: logger,
filter: filter,
forks: forks,
proverRegistry: proverRegistry,
signerRegistry: signerRegistry,
peerInfoManager: peerInfoManager,
proposalSynchronizer: proposalSynchronizer,
hypergraph: hypergraph,
proverAddress: proverAddress,
config: config,
queuedStates: make(chan syncRequest, defaultStateQueueCapacity),
}
}
func (p *SyncProvider[StateT, ProposalT]) Start(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
) {
ready()
for {
select {
case <-ctx.Done():
return
case request := <-p.queuedStates:
finalized := p.forks.FinalizedState()
if request.frameNumber <=
(*p.forks.FinalizedState().State).GetFrameNumber() {
continue
}
p.logger.Info(
"synchronizing with peer",
zap.String("peer", peer.ID(request.peerId).String()),
zap.Uint64("finalized_rank", finalized.Rank),
zap.Uint64("peer_frame", request.frameNumber),
)
p.processState(
ctx,
request.frameNumber,
request.peerId,
request.identity,
)
case <-time.After(10 * time.Minute):
peerId, err := p.getRandomProverPeerId()
if err != nil {
p.logger.Debug("could not get random prover peer id", zap.Error(err))
continue
}
select {
case p.queuedStates <- syncRequest{
frameNumber: (*p.forks.FinalizedState().State).GetFrameNumber() + 1,
peerId: []byte(peerId),
}:
default:
}
}
}
}
func (p *SyncProvider[StateT, ProposalT]) processState(
ctx context.Context,
frameNumber uint64,
peerID []byte,
identity []byte,
) {
err := p.syncWithPeer(
ctx,
frameNumber,
peerID,
identity,
)
if err != nil {
p.logger.Error("could not sync with peer", zap.Error(err))
}
}
func (p *SyncProvider[StateT, ProposalT]) Synchronize(
ctx context.Context,
existing *StateT,
) (<-chan *StateT, <-chan error) {
dataCh := make(chan *StateT, 1)
errCh := make(chan error, 1)
go func() {
defer func() {
if r := recover(); r != nil {
if errCh != nil {
errCh <- errors.New(fmt.Sprintf("fatal error encountered: %+v", r))
}
}
}()
defer close(dataCh)
defer close(errCh)
head := p.forks.FinalizedState()
hasFrame := head != nil
if !hasFrame {
errCh <- errors.New("no frame")
return
}
err := p.syncWithMesh(ctx)
if err != nil {
dataCh <- existing
errCh <- err
return
}
if hasFrame {
dataCh <- head.State
}
syncStatusCheck.WithLabelValues("synced").Inc()
errCh <- nil
}()
return dataCh, errCh
}
func (p *SyncProvider[StateT, ProposalT]) syncWithMesh(
ctx context.Context,
) error {
p.logger.Info("synchronizing with peers")
head := p.forks.FinalizedState()
peers, err := p.proverRegistry.GetActiveProvers(p.filter)
if len(peers) <= 1 || err != nil {
return nil
}
for _, candidate := range peers {
if bytes.Equal(candidate.Address, p.proverAddress) {
continue
}
registry, err := p.signerRegistry.GetKeyRegistryByProver(
candidate.Address,
)
if err != nil {
continue
}
if registry.IdentityKey == nil || registry.IdentityKey.KeyValue == nil {
continue
}
pub, err := pcrypto.UnmarshalEd448PublicKey(registry.IdentityKey.KeyValue)
if err != nil {
p.logger.Warn("error unmarshaling identity key", zap.Error(err))
continue
}
peerID, err := peer.IDFromPublicKey(pub)
if err != nil {
p.logger.Warn("error deriving peer id", zap.Error(err))
continue
}
err = p.syncWithPeer(
ctx,
(*head.State).GetFrameNumber(),
[]byte(peerID),
nil,
)
if err != nil {
p.logger.Debug("error syncing frame", zap.Error(err))
}
}
head = p.forks.FinalizedState()
p.logger.Info(
"returning leader frame",
zap.Uint64("frame_number", (*head.State).GetFrameNumber()),
zap.Duration(
"frame_age",
time.Since(time.UnixMilli(int64((*head.State).GetTimestamp()))),
),
)
return nil
}
func (p *SyncProvider[StateT, ProposalT]) syncWithPeer(
ctx context.Context,
frameNumber uint64,
peerId []byte,
expectedIdentity []byte,
) error {
p.logger.Info(
"polling peer for new frames",
zap.String("peer_id", peer.ID(peerId).String()),
zap.Uint64("current_frame", frameNumber),
)
info := p.peerInfoManager.GetPeerInfo(peerId)
if info == nil {
p.logger.Info(
"no peer info known yet, skipping sync",
zap.String("peer", peer.ID(peerId).String()),
)
return nil
}
if len(info.Reachability) == 0 {
p.logger.Info(
"no reachability info known yet, skipping sync",
zap.String("peer", peer.ID(peerId).String()),
)
return nil
}
for _, reachability := range info.Reachability {
if !bytes.Equal(reachability.Filter, p.filter) {
continue
}
for _, s := range reachability.StreamMultiaddrs {
cc, err := p.getDirectChannel(peerId, s)
if err != nil {
p.logger.Debug(
"could not establish direct channel, trying next multiaddr",
zap.String("peer", peer.ID(peerId).String()),
zap.String("multiaddr", s),
zap.Error(err),
)
continue
}
defer func() {
if err := cc.Close(); err != nil {
p.logger.Error("error while closing connection", zap.Error(err))
}
}()
err = p.proposalSynchronizer.Sync(
ctx,
p.logger.With(
zap.String("peer", peer.ID(peerId).String()),
zap.String("multiaddr", s),
),
cc,
frameNumber,
expectedIdentity,
)
if err != nil {
if errors.Is(err, ErrConnectivityFailed) {
continue
}
return errors.Wrap(err, "sync")
}
}
break
}
p.logger.Debug(
"failed to complete sync for all known multiaddrs",
zap.String("peer", peer.ID(peerId).String()),
)
return nil
}
func (p *SyncProvider[StateT, ProposalT]) HyperSync(
ctx context.Context,
prover []byte,
shardKey tries.ShardKey,
) {
registry, err := p.signerRegistry.GetKeyRegistryByProver(prover)
if err != nil || registry == nil || registry.IdentityKey == nil {
p.logger.Debug(
"failed to find key registry info for prover",
zap.String("prover", hex.EncodeToString(prover)),
)
return
}
peerKey := registry.IdentityKey
pubKey, err := pcrypto.UnmarshalEd448PublicKey(peerKey.KeyValue)
if err != nil {
p.logger.Error(
"could not unmarshal key info",
zap.String("prover", hex.EncodeToString(prover)),
zap.String("prover", hex.EncodeToString(peerKey.KeyValue)),
)
return
}
peerId, err := peer.IDFromPublicKey(pubKey)
info := p.peerInfoManager.GetPeerInfo([]byte(peerId))
if info == nil {
p.logger.Info(
"no peer info known yet, skipping hypersync",
zap.String("peer", peer.ID(peerId).String()),
)
return
}
if len(info.Reachability) == 0 {
p.logger.Info(
"no reachability info known yet, skipping sync",
zap.String("peer", peer.ID(peerId).String()),
)
return
}
phaseSyncs := []func(
protobufs.HypergraphComparisonService_HyperStreamClient,
tries.ShardKey,
){
p.hyperSyncVertexAdds,
p.hyperSyncVertexRemoves,
p.hyperSyncHyperedgeAdds,
p.hyperSyncHyperedgeRemoves,
}
for _, reachability := range info.Reachability {
if !bytes.Equal(reachability.Filter, p.filter) {
continue
}
for _, s := range reachability.StreamMultiaddrs {
for _, syncPhase := range phaseSyncs {
ch, err := p.getDirectChannel([]byte(peerId), s)
if err != nil {
p.logger.Debug(
"could not establish direct channel, trying next multiaddr",
zap.String("peer", peer.ID(peerId).String()),
zap.String("multiaddr", s),
zap.Error(err),
)
continue
}
client := protobufs.NewHypergraphComparisonServiceClient(ch)
str, err := client.HyperStream(ctx)
if err != nil {
p.logger.Error("error from sync", zap.Error(err))
return
}
syncPhase(str, shardKey)
if cerr := ch.Close(); cerr != nil {
p.logger.Error("error while closing connection", zap.Error(cerr))
}
}
}
break
}
}
func (p *SyncProvider[StateT, ProposalT]) hyperSyncVertexAdds(
str protobufs.HypergraphComparisonService_HyperStreamClient,
shardKey tries.ShardKey,
) {
err := p.hypergraph.Sync(
str,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
)
if err != nil {
p.logger.Error("error from sync", zap.Error(err))
}
str.CloseSend()
}
func (p *SyncProvider[StateT, ProposalT]) hyperSyncVertexRemoves(
str protobufs.HypergraphComparisonService_HyperStreamClient,
shardKey tries.ShardKey,
) {
err := p.hypergraph.Sync(
str,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES,
)
if err != nil {
p.logger.Error("error from sync", zap.Error(err))
}
str.CloseSend()
}
func (p *SyncProvider[StateT, ProposalT]) hyperSyncHyperedgeAdds(
str protobufs.HypergraphComparisonService_HyperStreamClient,
shardKey tries.ShardKey,
) {
err := p.hypergraph.Sync(
str,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS,
)
if err != nil {
p.logger.Error("error from sync", zap.Error(err))
}
str.CloseSend()
}
func (p *SyncProvider[StateT, ProposalT]) hyperSyncHyperedgeRemoves(
str protobufs.HypergraphComparisonService_HyperStreamClient,
shardKey tries.ShardKey,
) {
err := p.hypergraph.Sync(
str,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES,
)
if err != nil {
p.logger.Error("error from sync", zap.Error(err))
}
str.CloseSend()
}
func (p *SyncProvider[StateT, ProposalT]) AddState(
sourcePeerID []byte,
frameNumber uint64,
expectedIdentity []byte,
) {
// Adjust if we're within the threshold
if frameNumber <=
(*p.forks.FinalizedState().State).GetFrameNumber() &&
frameNumber != 0 {
frameNumber = frameNumber + 1
expectedIdentity = nil
}
// Handle special case: we're at genesis frame on time reel
if frameNumber == 0 {
frameNumber = 1
expectedIdentity = []byte{}
}
// Enqueue if we can, otherwise drop it because we'll catch up
select {
case p.queuedStates <- syncRequest{
frameNumber: frameNumber,
peerId: slices.Clone(sourcePeerID),
identity: slices.Clone(expectedIdentity),
}:
p.logger.Debug(
"enqueued sync request",
zap.String("peer", peer.ID(sourcePeerID).String()),
zap.Uint64("enqueued_frame_number", frameNumber),
)
default:
p.logger.Debug("no queue capacity, dropping state for sync")
}
}
func (p *SyncProvider[StateT, ProposalT]) getDirectChannel(
peerId []byte,
multiaddrString string,
) (
*grpc.ClientConn,
error,
) {
creds, err := p2p.NewPeerAuthenticator(
p.logger,
p.config.P2P,
nil,
nil,
nil,
nil,
[][]byte{peerId},
map[string]channel.AllowedPeerPolicyType{},
map[string]channel.AllowedPeerPolicyType{},
).CreateClientTLSCredentials(peerId)
if err != nil {
return nil, err
}
ma, err := multiaddr.StringCast(multiaddrString)
if err != nil {
return nil, err
}
mga, err := mn.ToNetAddr(ma)
if err != nil {
return nil, err
}
cc, err := grpc.NewClient(
mga.String(),
grpc.WithTransportCredentials(creds),
)
return cc, err
}
func (e *SyncProvider[StateT, ProposalT]) getPeerIDOfProver(
prover []byte,
) (peer.ID, error) {
registry, err := e.signerRegistry.GetKeyRegistryByProver(
prover,
)
if err != nil {
e.logger.Debug(
"could not get registry for prover",
zap.Error(err),
)
return "", err
}
if registry == nil || registry.IdentityKey == nil {
e.logger.Debug("registry for prover not found")
return "", err
}
pk, err := pcrypto.UnmarshalEd448PublicKey(registry.IdentityKey.KeyValue)
if err != nil {
e.logger.Debug(
"could not parse pub key",
zap.Error(err),
)
return "", err
}
id, err := peer.IDFromPublicKey(pk)
if err != nil {
e.logger.Debug(
"could not derive peer id",
zap.Error(err),
)
return "", err
}
return id, nil
}
func (e *SyncProvider[StateT, ProposalT]) getRandomProverPeerId() (
peer.ID,
error,
) {
provers, err := e.proverRegistry.GetActiveProvers(nil)
if err != nil {
e.logger.Error(
"could not get active provers for sync",
zap.Error(err),
)
}
if len(provers) == 0 {
return "", err
}
otherProvers := []*tconsensus.ProverInfo{}
for _, p := range provers {
if bytes.Equal(p.Address, e.proverAddress) {
continue
}
otherProvers = append(otherProvers, p)
}
index := rand.Intn(len(otherProvers))
return e.getPeerIDOfProver(otherProvers[index].Address)
}