diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index d079ab4..67643a8 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -838,53 +838,77 @@ func (b *BlossomSub) StartDirectChannelListener( return errors.Wrap(server.Serve(bind), "start direct channel listener") } -func (b *BlossomSub) GetDirectChannel(key []byte, purpose string) ( - dialCtx *grpc.ClientConn, - err error, +type extraCloseConn struct { + net.Conn + extraClose func() +} + +func (c *extraCloseConn) Close() error { + err := c.Conn.Close() + c.extraClose() + return err +} + +func (b *BlossomSub) GetDirectChannel(peerID []byte, purpose string) ( + cc *grpc.ClientConn, err error, ) { // Kind of a weird hack, but gostream can induce panics if the peer drops at // the time of connection, this avoids the problem. defer func() { if r := recover(); r != nil { - dialCtx = nil + cc = nil err = errors.New("connection failed") } }() + id := peer.ID(peerID) + // Open question: should we prefix this so a node can run both in mainnet and // testnet? Feels like a bad idea and would be preferable to discourage. - dialCtx, err = qgrpc.DialContext( + cc, err = qgrpc.DialContext( b.ctx, - base58.Encode(key), - grpc.WithDialer( - func(peerIdStr string, timeout time.Duration) (net.Conn, error) { - subCtx, subCtxCancel := context.WithTimeout(b.ctx, timeout) - defer subCtxCancel() - - id, err := peer.Decode(peerIdStr) - if err != nil { - return nil, errors.Wrap(err, "dial context") + "passthrough:///", + grpc.WithContextDialer( + func(ctx context.Context, _ string) (net.Conn, error) { + // If we are not already connected to the peer, we will manually dial it + // before opening the direct channel. We will close the peer connection + // when the direct channel is closed. + alreadyConnected := false + switch connectedness := b.h.Network().Connectedness(id); connectedness { + case network.Connected, network.Limited: + alreadyConnected = true + default: + if err := b.h.Connect(ctx, peer.AddrInfo{ID: id}); err != nil { + return nil, errors.Wrap(err, "connect") + } } - c, err := gostream.Dial( - subCtx, + network.WithNoDial(ctx, "direct-channel"), b.h, - peer.ID(key), + id, protocol.ID( - "/p2p/direct-channel/"+peer.ID(id).String()+purpose, + "/p2p/direct-channel/"+id.String()+purpose, ), ) - - return c, errors.Wrap(err, "dial context") + if err != nil { + return nil, errors.Wrap(err, "dial direct channel") + } + if alreadyConnected { + return c, nil + } + return &extraCloseConn{ + Conn: c, + extraClose: func() { _ = b.h.Network().ClosePeer(id) }, + }, nil }, ), grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { - return nil, errors.Wrap(err, "get direct channel") + return nil, errors.Wrap(err, "dial context") } - return dialCtx, nil + return cc, nil } func (b *BlossomSub) GetPublicKey() []byte {