Separate dialing from retrieval (#398)

This commit is contained in:
petricadaipegsp 2024-12-01 22:02:07 +01:00 committed by GitHub
parent dd030560bc
commit 4be1888496
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 24 additions and 19 deletions

View File

@ -89,8 +89,9 @@ var allCmd = &cobra.Command{
panic(errors.Wrap(err, "error getting peer id"))
}
ctx := context.Background()
resume := make([]byte, 32)
cc, err := pubSub.GetDirectChannel([]byte(bpeerId), "worker")
cc, err := pubSub.GetDirectChannel(ctx, []byte(bpeerId), "worker")
if err != nil {
logger.Info(
"could not establish direct channel, waiting...",
@ -100,7 +101,7 @@ var allCmd = &cobra.Command{
}
for {
if cc == nil {
cc, err = pubSub.GetDirectChannel([]byte(bpeerId), "worker")
cc, err = pubSub.GetDirectChannel(ctx, []byte(bpeerId), "worker")
if err != nil {
logger.Info(
"could not establish direct channel, waiting...",

View File

@ -291,6 +291,7 @@ func (e *DataClockConsensusEngine) syncWithPeer(
zap.Uint64("current_frame", latest.FrameNumber),
zap.Uint64("max_frame", maxFrame),
)
var cooperative bool = true
defer func() {
if cooperative {
@ -304,7 +305,15 @@ func (e *DataClockConsensusEngine) syncWithPeer(
delete(e.peerMap, string(peerId))
}
}()
cc, err := e.pubSub.GetDirectChannel(peerId, "sync")
syncTimeout := e.config.Engine.SyncTimeout
if syncTimeout == 0 {
syncTimeout = defaultSyncTimeout
}
dialCtx, cancelDial := context.WithTimeout(e.ctx, syncTimeout)
defer cancelDial()
cc, err := e.pubSub.GetDirectChannel(dialCtx, peerId, "sync")
if err != nil {
e.logger.Debug(
"could not establish direct channel",
@ -320,22 +329,16 @@ func (e *DataClockConsensusEngine) syncWithPeer(
}()
client := protobufs.NewDataServiceClient(cc)
syncTimeout := e.config.Engine.SyncTimeout
if syncTimeout == 0 {
syncTimeout = defaultSyncTimeout
}
for {
ctx, cancel := context.WithTimeout(e.ctx, syncTimeout)
getCtx, cancelGet := context.WithTimeout(e.ctx, syncTimeout)
response, err := client.GetDataFrame(
ctx,
getCtx,
&protobufs.GetDataFrameRequest{
FrameNumber: latest.FrameNumber + 1,
},
grpc.MaxCallRecvMsgSize(600*1024*1024),
)
cancel()
cancelGet()
if err != nil {
e.logger.Debug(
"could not get frame",

View File

@ -636,7 +636,7 @@ func (e *DataClockConsensusEngine) GetPublicChannelForProvingKey(
)
}
} else {
cc, err := e.pubSub.GetDirectChannel(peerID, base58.Encode(provingKey))
cc, err := e.pubSub.GetDirectChannel(e.ctx, peerID, base58.Encode(provingKey))
if err != nil {
e.logger.Error(
"could not get public channel for proving key",

View File

@ -85,7 +85,7 @@ func (e *DataClockConsensusEngine) runPreMidnightProofWorker() {
}
resume := make([]byte, 32)
cc, err := e.pubSub.GetDirectChannel([]byte(peerId), "worker")
cc, err := e.pubSub.GetDirectChannel(e.ctx, []byte(peerId), "worker")
if err != nil {
e.logger.Info(
"could not establish direct channel, waiting...",
@ -110,7 +110,7 @@ func (e *DataClockConsensusEngine) runPreMidnightProofWorker() {
}
if cc == nil {
cc, err = e.pubSub.GetDirectChannel([]byte(peerId), "worker")
cc, err = e.pubSub.GetDirectChannel(e.ctx, []byte(peerId), "worker")
if err != nil {
e.logger.Info(
"could not establish direct channel, waiting...",

View File

@ -64,7 +64,7 @@ func (pubsub) StartDirectChannelListener(
) error {
return nil
}
func (pubsub) GetDirectChannel(peerId []byte, purpose string) (*grpc.ClientConn, error) {
func (pubsub) GetDirectChannel(ctx context.Context, peerId []byte, purpose string) (*grpc.ClientConn, error) {
return nil, nil
}
func (pubsub) GetNetworkInfo() *protobufs.NetworkInfoResponse {

View File

@ -937,7 +937,7 @@ func (c *extraCloseConn) Close() error {
return err
}
func (b *BlossomSub) GetDirectChannel(peerID []byte, purpose string) (
func (b *BlossomSub) GetDirectChannel(ctx context.Context, peerID []byte, purpose string) (
cc *grpc.ClientConn, err error,
) {
// Kind of a weird hack, but gostream can induce panics if the peer drops at
@ -954,7 +954,7 @@ func (b *BlossomSub) GetDirectChannel(peerID []byte, purpose string) (
// Open question: should we prefix this so a node can run both in mainnet and
// testnet? Feels like a bad idea and would be preferable to discourage.
cc, err = qgrpc.DialContext(
b.ctx,
ctx,
"passthrough:///",
grpc.WithContextDialer(
func(ctx context.Context, _ string) (net.Conn, error) {
@ -991,6 +991,7 @@ func (b *BlossomSub) GetDirectChannel(peerID []byte, purpose string) (
},
),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
if err != nil {
return nil, errors.Wrap(err, "dial context")

View File

@ -44,7 +44,7 @@ type PubSub interface {
purpose string,
server *grpc.Server,
) error
GetDirectChannel(peerId []byte, purpose string) (*grpc.ClientConn, error)
GetDirectChannel(ctx context.Context, peerId []byte, purpose string) (*grpc.ClientConn, error)
GetNetworkInfo() *protobufs.NetworkInfoResponse
SignMessage(msg []byte) ([]byte, error)
GetPublicKey() []byte