Close direct channels if the connection is fresh (#371)

This commit is contained in:
petricadaipegsp 2024-11-21 00:07:28 +01:00 committed by GitHub
parent cbc405a3a0
commit 803cf4b7b3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -838,53 +838,77 @@ func (b *BlossomSub) StartDirectChannelListener(
return errors.Wrap(server.Serve(bind), "start direct channel listener")
}
func (b *BlossomSub) GetDirectChannel(key []byte, purpose string) (
dialCtx *grpc.ClientConn,
err error,
type extraCloseConn struct {
net.Conn
extraClose func()
}
func (c *extraCloseConn) Close() error {
err := c.Conn.Close()
c.extraClose()
return err
}
func (b *BlossomSub) GetDirectChannel(peerID []byte, purpose string) (
cc *grpc.ClientConn, err error,
) {
// Kind of a weird hack, but gostream can induce panics if the peer drops at
// the time of connection, this avoids the problem.
defer func() {
if r := recover(); r != nil {
dialCtx = nil
cc = nil
err = errors.New("connection failed")
}
}()
id := peer.ID(peerID)
// Open question: should we prefix this so a node can run both in mainnet and
// testnet? Feels like a bad idea and would be preferable to discourage.
dialCtx, err = qgrpc.DialContext(
cc, err = qgrpc.DialContext(
b.ctx,
base58.Encode(key),
grpc.WithDialer(
func(peerIdStr string, timeout time.Duration) (net.Conn, error) {
subCtx, subCtxCancel := context.WithTimeout(b.ctx, timeout)
defer subCtxCancel()
id, err := peer.Decode(peerIdStr)
if err != nil {
return nil, errors.Wrap(err, "dial context")
"passthrough:///",
grpc.WithContextDialer(
func(ctx context.Context, _ string) (net.Conn, error) {
// If we are not already connected to the peer, we will manually dial it
// before opening the direct channel. We will close the peer connection
// when the direct channel is closed.
alreadyConnected := false
switch connectedness := b.h.Network().Connectedness(id); connectedness {
case network.Connected, network.Limited:
alreadyConnected = true
default:
if err := b.h.Connect(ctx, peer.AddrInfo{ID: id}); err != nil {
return nil, errors.Wrap(err, "connect")
}
}
c, err := gostream.Dial(
subCtx,
network.WithNoDial(ctx, "direct-channel"),
b.h,
peer.ID(key),
id,
protocol.ID(
"/p2p/direct-channel/"+peer.ID(id).String()+purpose,
"/p2p/direct-channel/"+id.String()+purpose,
),
)
return c, errors.Wrap(err, "dial context")
if err != nil {
return nil, errors.Wrap(err, "dial direct channel")
}
if alreadyConnected {
return c, nil
}
return &extraCloseConn{
Conn: c,
extraClose: func() { _ = b.h.Network().ClosePeer(id) },
}, nil
},
),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, errors.Wrap(err, "get direct channel")
return nil, errors.Wrap(err, "dial context")
}
return dialCtx, nil
return cc, nil
}
func (b *BlossomSub) GetPublicKey() []byte {