diff --git a/client/cmd/all.go b/client/cmd/all.go index ba9bff4..4f623c8 100644 --- a/client/cmd/all.go +++ b/client/cmd/all.go @@ -89,8 +89,9 @@ var allCmd = &cobra.Command{ panic(errors.Wrap(err, "error getting peer id")) } + ctx := context.Background() resume := make([]byte, 32) - cc, err := pubSub.GetDirectChannel([]byte(bpeerId), "worker") + cc, err := pubSub.GetDirectChannel(ctx, []byte(bpeerId), "worker") if err != nil { logger.Info( "could not establish direct channel, waiting...", @@ -100,7 +101,7 @@ var allCmd = &cobra.Command{ } for { if cc == nil { - cc, err = pubSub.GetDirectChannel([]byte(bpeerId), "worker") + cc, err = pubSub.GetDirectChannel(ctx, []byte(bpeerId), "worker") if err != nil { logger.Info( "could not establish direct channel, waiting...", diff --git a/node/consensus/data/consensus_frames.go b/node/consensus/data/consensus_frames.go index 85453de..e9c9f27 100644 --- a/node/consensus/data/consensus_frames.go +++ b/node/consensus/data/consensus_frames.go @@ -291,6 +291,7 @@ func (e *DataClockConsensusEngine) syncWithPeer( zap.Uint64("current_frame", latest.FrameNumber), zap.Uint64("max_frame", maxFrame), ) + var cooperative bool = true defer func() { if cooperative { @@ -304,7 +305,15 @@ func (e *DataClockConsensusEngine) syncWithPeer( delete(e.peerMap, string(peerId)) } }() - cc, err := e.pubSub.GetDirectChannel(peerId, "sync") + + syncTimeout := e.config.Engine.SyncTimeout + if syncTimeout == 0 { + syncTimeout = defaultSyncTimeout + } + + dialCtx, cancelDial := context.WithTimeout(e.ctx, syncTimeout) + defer cancelDial() + cc, err := e.pubSub.GetDirectChannel(dialCtx, peerId, "sync") if err != nil { e.logger.Debug( "could not establish direct channel", @@ -320,22 +329,16 @@ func (e *DataClockConsensusEngine) syncWithPeer( }() client := protobufs.NewDataServiceClient(cc) - - syncTimeout := e.config.Engine.SyncTimeout - if syncTimeout == 0 { - syncTimeout = defaultSyncTimeout - } - for { - ctx, cancel := context.WithTimeout(e.ctx, syncTimeout) + getCtx, cancelGet := context.WithTimeout(e.ctx, syncTimeout) response, err := client.GetDataFrame( - ctx, + getCtx, &protobufs.GetDataFrameRequest{ FrameNumber: latest.FrameNumber + 1, }, grpc.MaxCallRecvMsgSize(600*1024*1024), ) - cancel() + cancelGet() if err != nil { e.logger.Debug( "could not get frame", diff --git a/node/consensus/data/peer_messaging.go b/node/consensus/data/peer_messaging.go index 89c9478..d51deec 100644 --- a/node/consensus/data/peer_messaging.go +++ b/node/consensus/data/peer_messaging.go @@ -636,7 +636,7 @@ func (e *DataClockConsensusEngine) GetPublicChannelForProvingKey( ) } } else { - cc, err := e.pubSub.GetDirectChannel(peerID, base58.Encode(provingKey)) + cc, err := e.pubSub.GetDirectChannel(e.ctx, peerID, base58.Encode(provingKey)) if err != nil { e.logger.Error( "could not get public channel for proving key", diff --git a/node/consensus/data/pre_midnight_proof_worker.go b/node/consensus/data/pre_midnight_proof_worker.go index 24ef1fb..5eab95e 100644 --- a/node/consensus/data/pre_midnight_proof_worker.go +++ b/node/consensus/data/pre_midnight_proof_worker.go @@ -85,7 +85,7 @@ func (e *DataClockConsensusEngine) runPreMidnightProofWorker() { } resume := make([]byte, 32) - cc, err := e.pubSub.GetDirectChannel([]byte(peerId), "worker") + cc, err := e.pubSub.GetDirectChannel(e.ctx, []byte(peerId), "worker") if err != nil { e.logger.Info( "could not establish direct channel, waiting...", @@ -110,7 +110,7 @@ func (e *DataClockConsensusEngine) runPreMidnightProofWorker() { } if cc == nil { - cc, err = e.pubSub.GetDirectChannel([]byte(peerId), "worker") + cc, err = e.pubSub.GetDirectChannel(e.ctx, []byte(peerId), "worker") if err != nil { e.logger.Info( "could not establish direct channel, waiting...", diff --git a/node/consensus/data/token_handle_mint_test.go b/node/consensus/data/token_handle_mint_test.go index caa3e27..d01e2d9 100644 --- a/node/consensus/data/token_handle_mint_test.go +++ b/node/consensus/data/token_handle_mint_test.go @@ -64,7 +64,7 @@ func (pubsub) StartDirectChannelListener( ) error { return nil } -func (pubsub) GetDirectChannel(peerId []byte, purpose string) (*grpc.ClientConn, error) { +func (pubsub) GetDirectChannel(ctx context.Context, peerId []byte, purpose string) (*grpc.ClientConn, error) { return nil, nil } func (pubsub) GetNetworkInfo() *protobufs.NetworkInfoResponse { diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index b49c245..81622ad 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -937,7 +937,7 @@ func (c *extraCloseConn) Close() error { return err } -func (b *BlossomSub) GetDirectChannel(peerID []byte, purpose string) ( +func (b *BlossomSub) GetDirectChannel(ctx context.Context, peerID []byte, purpose string) ( cc *grpc.ClientConn, err error, ) { // Kind of a weird hack, but gostream can induce panics if the peer drops at @@ -954,7 +954,7 @@ func (b *BlossomSub) GetDirectChannel(peerID []byte, purpose string) ( // Open question: should we prefix this so a node can run both in mainnet and // testnet? Feels like a bad idea and would be preferable to discourage. cc, err = qgrpc.DialContext( - b.ctx, + ctx, "passthrough:///", grpc.WithContextDialer( func(ctx context.Context, _ string) (net.Conn, error) { @@ -991,6 +991,7 @@ func (b *BlossomSub) GetDirectChannel(peerID []byte, purpose string) ( }, ), grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), ) if err != nil { return nil, errors.Wrap(err, "dial context") diff --git a/node/p2p/pubsub.go b/node/p2p/pubsub.go index d02acc4..3b7227a 100644 --- a/node/p2p/pubsub.go +++ b/node/p2p/pubsub.go @@ -44,7 +44,7 @@ type PubSub interface { purpose string, server *grpc.Server, ) error - GetDirectChannel(peerId []byte, purpose string) (*grpc.ClientConn, error) + GetDirectChannel(ctx context.Context, peerId []byte, purpose string) (*grpc.ClientConn, error) GetNetworkInfo() *protobufs.NetworkInfoResponse SignMessage(msg []byte) ([]byte, error) GetPublicKey() []byte