mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 18:37:26 +08:00
* experiment: reject bad peer info messages * v2.1.0.18 preview * add tagged sync * Add missing hypergraph changes * small tweaks to sync * allow local sync, use it for provers with workers * missing file * resolve build error * resolve sync issue, remove raw sync * resolve deletion promotion bug * resolve sync abstraction leak from tree deletion changes * rearrange prover sync * remove pruning from sync * restore removed sync flag * fix: sync, event stream deadlock, heuristic scoring of better shards * resolve hanging shutdown + pubsub proxy issue * further bugfixes: sync (restore old leaf sync), pubsub shutdown, merge events * fix: clean up rust ffi, background coverage events, and sync tweaks * fix: linking issue for channel, connectivity test aggression, sync regression, join tests * fix: disjoint sync, improper application of filter * resolve sync/reel/validation deadlock * adjust sync to handle no leaf edge cases, multi-path segment traversal * use simpler sync * faster, simpler sync with some debug extras * migration to recalculate * don't use batch * square up the roots * fix nil pointer * fix: seniority calculation, sync race condition, migration * make sync dumber * fix: tree deletion issue * fix: missing seniority merge request canonical serialization * address issues from previous commit test * stale workers should be cleared * remove missing gap check * rearrange collect, reduce sync logging noise * fix: the disjoint leaf/branch sync case * nuclear option on sync failures * v2.1.0.18, finalized
288 lines
9.0 KiB
Go
288 lines
9.0 KiB
Go
package datarpc
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"time"
|
|
|
|
pcrypto "github.com/libp2p/go-libp2p/core/crypto"
|
|
"github.com/multiformats/go-multiaddr"
|
|
mn "github.com/multiformats/go-multiaddr/net"
|
|
"github.com/pkg/errors"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
|
|
"source.quilibrium.com/quilibrium/monorepo/config"
|
|
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
|
|
"source.quilibrium.com/quilibrium/monorepo/node/consensus/app"
|
|
qgrpc "source.quilibrium.com/quilibrium/monorepo/node/internal/grpc"
|
|
"source.quilibrium.com/quilibrium/monorepo/node/keys"
|
|
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
|
|
"source.quilibrium.com/quilibrium/monorepo/protobufs"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/channel"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/consensus"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/crypto"
|
|
tp2p "source.quilibrium.com/quilibrium/monorepo/types/p2p"
|
|
)
|
|
|
|
type DataWorkerIPCServer struct {
|
|
protobufs.UnimplementedDataIPCServiceServer
|
|
|
|
ctx lifecycle.SignalerContext
|
|
cancel func()
|
|
listenAddrGRPC string
|
|
config *config.Config
|
|
logger *zap.Logger
|
|
coreId uint32
|
|
parentProcessId int
|
|
signer crypto.Signer
|
|
signerRegistry consensus.SignerRegistry
|
|
proverRegistry consensus.ProverRegistry
|
|
peerInfoManager tp2p.PeerInfoManager
|
|
pubsub tp2p.PubSub
|
|
authProvider channel.AuthenticationProvider
|
|
appConsensusEngineFactory *app.AppConsensusEngineFactory
|
|
appConsensusEngine *app.AppConsensusEngine
|
|
server *grpc.Server
|
|
frameProver crypto.FrameProver
|
|
quit chan struct{}
|
|
peerInfoCtx lifecycle.SignalerContext
|
|
peerInfoCancel context.CancelFunc
|
|
}
|
|
|
|
func NewDataWorkerIPCServer(
|
|
listenAddrGRPC string,
|
|
config *config.Config,
|
|
signerRegistry consensus.SignerRegistry,
|
|
proverRegistry consensus.ProverRegistry,
|
|
peerInfoManager tp2p.PeerInfoManager,
|
|
pubsub tp2p.PubSub,
|
|
frameProver crypto.FrameProver,
|
|
appConsensusEngineFactory *app.AppConsensusEngineFactory,
|
|
logger *zap.Logger,
|
|
coreId uint32,
|
|
parentProcessId int,
|
|
) (*DataWorkerIPCServer, error) {
|
|
peerPrivKey, err := hex.DecodeString(config.P2P.PeerPrivKey)
|
|
if err != nil {
|
|
logger.Panic("error decoding peerkey", zap.Error(err))
|
|
}
|
|
|
|
privKey, err := pcrypto.UnmarshalEd448PrivateKey(peerPrivKey)
|
|
if err != nil {
|
|
logger.Panic("error unmarshaling peerkey", zap.Error(err))
|
|
}
|
|
|
|
rawPriv, err := privKey.Raw()
|
|
if err != nil {
|
|
logger.Panic("error getting private key", zap.Error(err))
|
|
}
|
|
|
|
rawPub, err := privKey.GetPublic().Raw()
|
|
if err != nil {
|
|
logger.Panic("error getting public key", zap.Error(err))
|
|
}
|
|
|
|
signer, err := keys.Ed448KeyFromBytes(rawPriv, rawPub)
|
|
if err != nil {
|
|
logger.Panic("error creating signer", zap.Error(err))
|
|
}
|
|
|
|
return &DataWorkerIPCServer{
|
|
listenAddrGRPC: listenAddrGRPC,
|
|
config: config,
|
|
logger: logger,
|
|
coreId: coreId,
|
|
parentProcessId: parentProcessId,
|
|
pubsub: pubsub,
|
|
signer: signer,
|
|
appConsensusEngineFactory: appConsensusEngineFactory,
|
|
signerRegistry: signerRegistry,
|
|
proverRegistry: proverRegistry,
|
|
frameProver: frameProver,
|
|
peerInfoManager: peerInfoManager,
|
|
quit: make(chan struct{}),
|
|
}, nil
|
|
}
|
|
|
|
func (r *DataWorkerIPCServer) Start() error {
|
|
peerInfoCtx, peerInfoCancel, _ := lifecycle.WithSignallerAndCancel(
|
|
context.Background(),
|
|
)
|
|
peerInfoReady := make(chan struct{})
|
|
go r.peerInfoManager.Start(
|
|
peerInfoCtx,
|
|
func() {
|
|
close(peerInfoReady)
|
|
},
|
|
)
|
|
select {
|
|
case <-peerInfoReady:
|
|
case <-time.After(5 * time.Second):
|
|
r.logger.Warn("peer info manager did not start before timeout")
|
|
}
|
|
r.peerInfoCtx = peerInfoCtx
|
|
r.peerInfoCancel = peerInfoCancel
|
|
|
|
r.RespawnServer(nil)
|
|
|
|
<-r.quit
|
|
return nil
|
|
}
|
|
|
|
func (r *DataWorkerIPCServer) Stop() error {
|
|
r.logger.Info("stopping server gracefully")
|
|
r.pubsub.Close()
|
|
if r.server != nil {
|
|
r.server.GracefulStop()
|
|
}
|
|
if r.peerInfoCancel != nil {
|
|
r.peerInfoCancel()
|
|
r.peerInfoCancel = nil
|
|
}
|
|
go func() {
|
|
r.quit <- struct{}{}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func (r *DataWorkerIPCServer) Respawn(
|
|
ctx context.Context,
|
|
req *protobufs.RespawnRequest,
|
|
) (*protobufs.RespawnResponse, error) {
|
|
go r.RespawnServer(req.Filter)
|
|
return &protobufs.RespawnResponse{}, nil
|
|
}
|
|
|
|
func (r *DataWorkerIPCServer) RespawnServer(filter []byte) error {
|
|
if r.server != nil {
|
|
r.logger.Info("stopping server for respawn")
|
|
r.server.GracefulStop()
|
|
r.server = nil
|
|
}
|
|
if r.appConsensusEngine != nil {
|
|
if r.cancel != nil {
|
|
r.cancel()
|
|
}
|
|
<-r.appConsensusEngine.Stop(false)
|
|
r.appConsensusEngine = nil
|
|
}
|
|
|
|
// Establish an auth provider
|
|
r.authProvider = p2p.NewPeerAuthenticator(
|
|
r.logger,
|
|
r.config.P2P,
|
|
r.peerInfoManager,
|
|
r.proverRegistry,
|
|
r.signerRegistry,
|
|
filter,
|
|
nil,
|
|
map[string]channel.AllowedPeerPolicyType{
|
|
"quilibrium.node.application.pb.HypergraphComparisonService": channel.AnyProverPeer,
|
|
"quilibrium.node.node.pb.DataIPCService": channel.OnlySelfPeer,
|
|
"quilibrium.node.global.pb.GlobalService": channel.OnlyGlobalProverPeer,
|
|
"quilibrium.node.global.pb.AppShardService": channel.OnlyShardProverPeer,
|
|
"quilibrium.node.global.pb.OnionService": channel.AnyPeer,
|
|
"quilibrium.node.global.pb.KeyRegistryService": channel.OnlySelfPeer,
|
|
},
|
|
map[string]channel.AllowedPeerPolicyType{
|
|
"/quilibrium.node.application.pb.HypergraphComparisonService/HyperStream": channel.OnlyShardProverPeer,
|
|
"/quilibrium.node.application.pb.HypergraphComparisonService/PerformSync": channel.OnlyShardProverPeer,
|
|
"/quilibrium.node.global.pb.MixnetService/GetTag": channel.AnyPeer,
|
|
"/quilibrium.node.global.pb.MixnetService/PutTag": channel.AnyPeer,
|
|
"/quilibrium.node.global.pb.MixnetService/PutMessage": channel.AnyPeer,
|
|
"/quilibrium.node.global.pb.MixnetService/RoundStream": channel.OnlyGlobalProverPeer,
|
|
"/quilibrium.node.global.pb.DispatchService/PutInboxMessage": channel.OnlySelfPeer,
|
|
"/quilibrium.node.global.pb.DispatchService/GetInboxMessages": channel.OnlySelfPeer,
|
|
"/quilibrium.node.global.pb.DispatchService/PutHub": channel.OnlySelfPeer,
|
|
"/quilibrium.node.global.pb.DispatchService/GetHub": channel.OnlySelfPeer,
|
|
"/quilibrium.node.global.pb.DispatchService/Sync": channel.AnyProverPeer,
|
|
"/quilibrium.node.ferretproxy.pb.FerretProxy/AliceProxy": channel.OnlySelfPeer,
|
|
"/quilibrium.node.ferretproxy.pb.FerretProxy/BobProxy": channel.AnyPeer,
|
|
},
|
|
)
|
|
|
|
tlsCreds, err := r.authProvider.CreateServerTLSCredentials()
|
|
if err != nil {
|
|
return errors.Wrap(err, "respawn server")
|
|
}
|
|
r.server = qgrpc.NewServer(
|
|
grpc.Creds(tlsCreds),
|
|
grpc.ChainUnaryInterceptor(r.authProvider.UnaryInterceptor),
|
|
grpc.ChainStreamInterceptor(r.authProvider.StreamInterceptor),
|
|
grpc.MaxRecvMsgSize(10*1024*1024),
|
|
grpc.MaxSendMsgSize(10*1024*1024),
|
|
)
|
|
|
|
mg, err := multiaddr.NewMultiaddr(r.listenAddrGRPC)
|
|
if err != nil {
|
|
return errors.Wrap(err, "respawn server")
|
|
}
|
|
|
|
lis, err := mn.Listen(mg)
|
|
if err != nil {
|
|
return errors.Wrap(err, "respawn server")
|
|
}
|
|
|
|
r.logger.Info(
|
|
"data worker listening",
|
|
zap.String("address", r.listenAddrGRPC),
|
|
zap.String("resolved", lis.Addr().String()),
|
|
)
|
|
if len(filter) != 0 {
|
|
globalTimeReel, err := r.appConsensusEngineFactory.CreateGlobalTimeReel()
|
|
if err != nil {
|
|
return errors.Wrap(err, "respawn server")
|
|
}
|
|
|
|
r.appConsensusEngine, err = r.appConsensusEngineFactory.CreateAppConsensusEngine(
|
|
filter,
|
|
uint(r.coreId),
|
|
globalTimeReel,
|
|
r.server,
|
|
)
|
|
if err != nil {
|
|
return errors.Wrap(err, "respawn server")
|
|
}
|
|
|
|
r.ctx, r.cancel, _ = lifecycle.WithSignallerAndCancel(context.Background())
|
|
// Capture engine and ctx in local variables to avoid race with subsequent RespawnServer calls
|
|
engine := r.appConsensusEngine
|
|
ctx := r.ctx
|
|
go func() {
|
|
if engine == nil {
|
|
return
|
|
}
|
|
if err = engine.Start(ctx); err != nil {
|
|
r.logger.Error("error while running", zap.Error(err))
|
|
}
|
|
}()
|
|
}
|
|
go func() {
|
|
protobufs.RegisterDataIPCServiceServer(r.server, r)
|
|
if err := r.server.Serve(mn.NetListener(lis)); err != nil {
|
|
r.logger.Info("terminating server", zap.Error(err))
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// CreateJoinProof implements protobufs.DataIPCServiceServer.
|
|
func (r *DataWorkerIPCServer) CreateJoinProof(
|
|
ctx context.Context,
|
|
req *protobufs.CreateJoinProofRequest,
|
|
) (*protobufs.CreateJoinProofResponse, error) {
|
|
r.logger.Debug("received request to create join proof")
|
|
proof := r.frameProver.CalculateMultiProof(
|
|
[32]byte(req.Challenge),
|
|
req.Difficulty,
|
|
req.Ids,
|
|
req.ProverIndex,
|
|
)
|
|
|
|
return &protobufs.CreateJoinProofResponse{
|
|
Response: proof[:],
|
|
}, nil
|
|
}
|