ceremonyclient/node/p2p/onion/relay.go
Cassandra Heart dbd95bd9e9
v2.1.0 (#439)
* v2.1.0 [omit consensus and adjacent] - this commit will be amended with the full release after the file copy is complete

* 2.1.0 main node rollup
2025-09-30 02:48:15 -05:00

742 lines
17 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package onion
import (
"context"
"crypto/rand"
"encoding/binary"
"io"
"net"
"sync"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
mn "github.com/multiformats/go-multiaddr/net"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/types/crypto"
"source.quilibrium.com/quilibrium/monorepo/types/keys"
)
type ValidEndpointFn func(ma multiaddr.Multiaddr) bool
type routeState struct {
hk *hopKeys // this relay's hop keys for the circuit
upPeer string // immediate upstream neighbor (who sent CREATE)
upCirc uint32 // circID used on upstream link
downPeer string // immediate downstream neighbor (set by ROUTE)
downCirc uint32 // circID used on downstream link
streamUp map[uint16]net.Conn // exit only: sid -> conn
}
type introPoint struct {
// Circuit from service -> this intro relay
upPeer string
upCirc uint32
}
type rendWait struct {
// recorded by rendezvous relay on REND1 (client half)
upPeer string
upCirc uint32
sid uint16
}
type spliceKey struct {
circ uint32
sid uint16
}
type splicePair struct {
// two half-streams joined at rendezvous
aUpPeer string
aUpCirc uint32
aSid uint16
bUpPeer string
bUpCirc uint32
bSid uint16
}
// Relay node that participates as a hop.
type Relay struct {
logger *zap.Logger
selfID string
link Transport
keyFn KeyFn
secretFn SharedSecretFn
validEndpointFn ValidEndpointFn
routingKey crypto.Agreement
// State: per-link circID -> hop keys for this relay
mu sync.Mutex
routes map[uint32]*routeState // keyed by upCirc OR downCirc
// rendezvous listeners
intro map[[32]byte]*introPoint // serviceID -> intro registration
rend map[[16]byte]rendWait // cookie -> client half
sp map[spliceKey]*splicePair // (circ,sid) -> pair (both directions)
}
func NewRelay(
logger *zap.Logger,
selfID string,
t Transport,
keyManager keys.KeyManager,
keyFn KeyFn,
secretFn SharedSecretFn,
opts ...RelayOption,
) *Relay {
r := &Relay{
logger: logger,
selfID: selfID,
link: t,
keyFn: keyFn,
secretFn: secretFn,
routes: make(map[uint32]*routeState),
intro: make(map[[32]byte]*introPoint),
rend: make(map[[16]byte]rendWait),
sp: make(map[spliceKey]*splicePair),
}
routingKey, err := keyManager.GetAgreementKey("onion-routing-key")
if err != nil {
routingKey, err = keyManager.CreateAgreementKey(
"onion-routing-key",
crypto.KeyTypeX448,
)
if err != nil {
panic(err)
}
}
r.routingKey = routingKey
for _, o := range opts {
o(r)
}
// Register this relay as the receiver on its Transport
t.OnReceive(r.onReceive)
return r
}
type RelayOption func(*Relay)
// WithPermissiveValidator will accept all multiaddrs, used only for testing
func WithPermissiveValidator() RelayOption {
return func(r *Relay) {
r.validEndpointFn = func(ma multiaddr.Multiaddr) bool { return true }
}
}
// WithDHTValidator validates multiaddrs based on both correctness of address,
// non-local access, and membership in the DHT.
func WithDHTValidator(dht *dht.IpfsDHT) RelayOption {
dhtValidatorFn := func(ma multiaddr.Multiaddr) bool {
// TODO(2.1.1+): when we support protocols outside of IPv4/6, this needs to
// be updated
ipComponent, err := ma.ValueForProtocol(multiaddr.P_IP4)
if err != nil {
ipComponent, err = ma.ValueForProtocol(multiaddr.P_IP6)
if err != nil {
return false
}
}
ip := net.ParseIP(ipComponent)
if ip == nil {
return false
}
// Private or localhost
if ip.IsPrivate() || ip.IsLoopback() || ip.IsUnspecified() {
return false
}
info, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil || info == nil {
return false
}
// verify multiaddr is network member
pi, err := dht.FindPeer(context.Background(), info.ID)
if err != nil {
return false
}
// If ma does not appear in set, we'll abort (TODO(2.1.1+): should we
// consider this for aborting with prejudice?)
found := false
for _, a := range pi.Addrs {
if a.Equal(ma) {
found = true
break
}
}
// Only if found is it safe to proceed (TODO(2.2+): MPCTLS would
// differentiate on this)
return found
}
return func(r *Relay) { r.validEndpointFn = dhtValidatorFn }
}
func (r *Relay) onReceive(srcPeerID []byte, circID uint32, cell []byte) {
// Link-level control: CREATE -> CREATED
if len(cell) >= 3 {
cmd := cell[0]
l := int(binary.BigEndian.Uint16(cell[1:3]))
if 3+l <= len(cell) {
switch cmd {
case CmdCreate:
r.handleCreate(srcPeerID, circID, cell[3:3+l])
return
case CmdCreated:
// This must be a CREATED coming from our downstream link
r.mu.Lock()
// we aliased routes[downCirc] = rs when we sent CREATE down
rs := r.routes[circID]
r.mu.Unlock()
if rs == nil || rs.upPeer == "" || rs.upCirc == 0 {
return
}
// Wrap into RELAY EXTENDED and send upstream with Kb
createdPayload := cell[3 : 3+l]
raw, err := marshalRelay(relayHeader{
Cmd: CmdExtended,
StreamID: 0,
Length: uint16(len(createdPayload)),
Data: createdPayload,
}, payloadMax())
if err != nil {
return
}
r.mu.Lock()
nonce := nonceFrom(rs.hk.kb.nonce, rs.hk.bCtr)
rs.hk.bCtr++
aeadB := rs.hk.kb.aead
r.mu.Unlock()
sealed := aeadB.Seal(nil, nonce, raw, nil)
_ = r.link.Send(
context.Background(),
[]byte(rs.upPeer),
rs.upCirc,
sealed,
)
return
}
}
}
// Relay path (must have keys for this circ on this relay)
r.mu.Lock()
rs := r.routes[circID]
r.mu.Unlock()
if rs == nil {
return
}
isForward := (circID == rs.upCirc) || (rs.upCirc == 0 && circID != rs.downCirc)
if isForward {
// Forward direction (client -> exit): peel one Kf layer and forward.
var nonce []byte
var aeadF cipherAEAD
r.mu.Lock()
nonce = nonceFrom(rs.hk.kf.nonce, rs.hk.fCtr)
rs.hk.fCtr++
aeadF = rs.hk.kf.aead
r.mu.Unlock()
plain, err := aeadF.Open(nil, nonce, cell, nil)
if err != nil {
return
}
h, err := unmarshalRelay(plain)
if err != nil {
// transit to deeper hop (still layered) only forward if we already know
// downstream
if rs.downPeer != "" && rs.downCirc != 0 {
_ = r.link.Send(
context.Background(),
[]byte(rs.downPeer),
rs.downCirc,
plain,
)
}
return
}
switch h.Cmd {
case CmdExtend:
// Payload = nextPeerID || createPayload (last 121 bytes)
if len(h.Data) < 121 {
return
}
nextPeer := string(h.Data[:len(h.Data)-121])
createPayload := h.Data[len(h.Data)-121:]
// Allocate downstream circID local to this link
downCirc := uint32(randUint(rand.Reader, 1<<32))
r.mu.Lock()
rs.downPeer = nextPeer
rs.downCirc = downCirc
r.routes[downCirc] = rs // alias lookup by downstream circ
r.mu.Unlock()
// Send link-level CREATE to next hop
buf := make([]byte, 1+2+len(createPayload))
buf[0] = CmdCreate
binary.BigEndian.PutUint16(buf[1:3], uint16(len(createPayload)))
copy(buf[3:], createPayload)
_ = r.link.Send(context.Background(), []byte(nextPeer), downCirc, buf)
return
case CmdBegin, CmdData, CmdEnd:
// If this (circ,sid) belongs to a rendezvous splice, forward across it.
if h.Cmd == CmdData || h.Cmd == CmdEnd {
r.mu.Lock()
pair := r.sp[spliceKey{circ: circID, sid: h.StreamID}]
r.mu.Unlock()
if pair != nil {
// Rewrite StreamID to the peer-local stream ID of the other half
fwd := append([]byte(nil), plain...) // |Cmd|SID|Len|Data|
var dstPeer string
var dstCirc uint32
var dstSid uint16
if circID == pair.aUpCirc && h.StreamID == pair.aSid {
dstPeer, dstCirc, dstSid = pair.bUpPeer, pair.bUpCirc, pair.bSid
} else {
dstPeer, dstCirc, dstSid = pair.aUpPeer, pair.aUpCirc, pair.aSid
}
binary.BigEndian.PutUint16(fwd[1:3], dstSid)
r.mu.Lock()
rsDst := r.routes[dstCirc]
nonce := nonceFrom(rsDst.hk.kb.nonce, rsDst.hk.bCtr)
rsDst.hk.bCtr++
aeadB := rsDst.hk.kb.aead
r.mu.Unlock()
sealed := aeadB.Seal(nil, nonce, fwd, nil)
_ = r.link.Send(
context.Background(),
[]byte(dstPeer),
dstCirc,
sealed,
)
return
}
}
// Otherwise, treat as normal exit TCP handling
r.exitHandlePlain(rs, h)
return
case CmdIntroEstablish:
// Service registers an intro point at this relay
if len(h.Data) != 32 {
return
}
var sid [32]byte
copy(sid[:], h.Data[:32])
r.mu.Lock()
r.intro[sid] = &introPoint{upPeer: rs.upPeer, upCirc: rs.upCirc}
r.mu.Unlock()
// Ack back to service over this circuit (Kb)
raw, _ := marshalRelay(relayHeader{
Cmd: CmdIntroAck,
StreamID: h.StreamID,
Length: 0,
}, payloadMax())
r.mu.Lock()
nonce := nonceFrom(rs.hk.kb.nonce, rs.hk.bCtr)
rs.hk.bCtr++
aeadB := rs.hk.kb.aead
r.mu.Unlock()
sealed := aeadB.Seal(nil, nonce, raw, nil)
_ = r.link.Send(
context.Background(),
[]byte(rs.upPeer),
rs.upCirc,
sealed,
)
return
case CmdRend1:
// Client creates a rendezvous cookie at this relay
if len(h.Data) != 16+2 {
return
}
var cookie [16]byte
copy(cookie[:], h.Data[:16])
clientSid := binary.BigEndian.Uint16(h.Data[16:18])
r.mu.Lock()
r.rend[cookie] = rendWait{
upPeer: rs.upPeer,
upCirc: rs.upCirc,
sid: clientSid,
}
r.mu.Unlock()
// (optional ack omitted)
return
case CmdIntroduce:
// Client asks an intro relay to notify the service to rendezvous here
if len(h.Data) < 32+1+16+2 {
return
}
var sid [32]byte
off := 0
copy(sid[:], h.Data[off:off+32])
off += 32
nameLen := int(h.Data[off])
off++
if len(h.Data) < 32+1+nameLen+16+2 {
return
}
// rendPeer := string(h.Data[off : off+nameLen])
off += nameLen
var cookie [16]byte
copy(cookie[:], h.Data[off:off+16])
off += 16
// clientSid := binary.BigEndian.Uint16(h.Data[off : off+2])
// Look up intro point (service must have registered here)
r.mu.Lock()
ip := r.intro[sid]
r.mu.Unlock()
if ip == nil {
return
}
// Relay the INTRODUCE payload to the service over its intro circuit
raw, err := marshalRelay(relayHeader{
Cmd: CmdIntroduce,
StreamID: h.StreamID,
Length: uint16(len(h.Data)),
Data: append([]byte(nil), h.Data...),
}, payloadMax())
if err != nil {
return
}
r.mu.Lock()
ipRS := r.routes[ip.upCirc]
if ipRS == nil {
r.mu.Unlock()
return
}
nonce := nonceFrom(ipRS.hk.kb.nonce, ipRS.hk.bCtr)
ipRS.hk.bCtr++
aeadB := ipRS.hk.kb.aead
r.mu.Unlock()
sealed := aeadB.Seal(nil, nonce, raw, nil)
_ = r.link.Send(
context.Background(),
[]byte(ip.upPeer),
ip.upCirc,
sealed,
)
return
case CmdRend2:
// Service completes rendezvous at this relay; splice streams
if len(h.Data) != 16+2 {
return
}
var cookie [16]byte
copy(cookie[:], h.Data[:16])
serviceSid := binary.BigEndian.Uint16(h.Data[16:18])
r.mu.Lock()
w, ok := r.rend[cookie]
if ok {
pair := &splicePair{
aUpPeer: w.upPeer, aUpCirc: w.upCirc, aSid: w.sid, // client
bUpPeer: rs.upPeer, bUpCirc: rs.upCirc, bSid: serviceSid, // service
}
r.sp[spliceKey{circ: w.upCirc, sid: w.sid}] = pair
r.sp[spliceKey{circ: rs.upCirc, sid: serviceSid}] = pair
delete(r.rend, cookie)
}
r.mu.Unlock()
if !ok {
return
}
// Notify both sides that splice is ready
notify := func(upPeer string, upCirc uint32, sid uint16) {
raw, _ := marshalRelay(relayHeader{
Cmd: CmdRendEstablished,
StreamID: sid,
Length: 0,
}, payloadMax())
r.mu.Lock()
rs2 := r.routes[upCirc]
nonce := nonceFrom(rs2.hk.kb.nonce, rs2.hk.bCtr)
rs2.hk.bCtr++
aeadB := rs2.hk.kb.aead
r.mu.Unlock()
sealed := aeadB.Seal(nil, nonce, raw, nil)
_ = r.link.Send(context.Background(), []byte(upPeer), upCirc, sealed)
}
notify(w.upPeer, w.upCirc, w.sid)
notify(rs.upPeer, rs.upCirc, serviceSid)
return
default:
// ignore other relay commands
return
}
}
// Backward direction (exit -> client): add our Kb layer and send upstream
if circID == rs.downCirc {
r.mu.Lock()
nonce := nonceFrom(rs.hk.kb.nonce, rs.hk.bCtr)
rs.hk.bCtr++
aeadB := rs.hk.kb.aead
r.mu.Unlock()
sealed := aeadB.Seal(nil, nonce, cell, nil)
_ = r.link.Send(context.Background(), []byte(rs.upPeer), rs.upCirc, sealed)
return
}
}
// exitHandlePlain processes a peeled relay header at the exit hop.
func (r *Relay) exitHandlePlain(
rs *routeState,
h relayHeader,
) {
switch h.Cmd {
case CmdBegin:
ma, err := multiaddr.StringCast(string(h.Data))
if err != nil || !r.validEndpointFn(ma) {
r.exitSendEnd(rs, h.StreamID)
return
}
conn, err := mn.Dial(ma)
if err != nil {
r.exitSendEnd(rs, h.StreamID)
return
}
r.mu.Lock()
rs.streamUp[h.StreamID] = conn
r.mu.Unlock()
go r.exitPumpUpstream(rs, h.StreamID, conn)
case CmdData:
// Forward client data to remote server.
r.mu.Lock()
conn := rs.streamUp[h.StreamID]
r.mu.Unlock()
if conn == nil || len(h.Data) == 0 {
return
}
rest := h.Data
for len(rest) > 0 {
n, err := conn.Write(rest)
if err != nil {
_ = conn.Close()
r.mu.Lock()
delete(rs.streamUp, h.StreamID)
r.mu.Unlock()
r.exitSendEnd(rs, h.StreamID)
return
}
rest = rest[n:]
}
case CmdEnd:
// Client is closing; close remote and cleanup.
r.mu.Lock()
if c := rs.streamUp[h.StreamID]; c != nil {
r.exitSendEnd(rs, h.StreamID)
if cw, ok := any(c).(interface{ CloseWrite() error }); ok {
_ = cw.CloseWrite()
} else {
_ = c.Close()
delete(rs.streamUp, h.StreamID)
}
}
r.mu.Unlock()
}
}
// exitPumpUpstream reads from the remote server and sends DATA cells upstream,
// layering this hop's Kb (backward key). When the remote closes, it sends END.
func (r *Relay) exitPumpUpstream(rs *routeState, sid uint16, conn net.Conn) {
defer func() {
_ = conn.Close()
r.mu.Lock()
delete(rs.streamUp, sid)
r.mu.Unlock()
r.exitSendEnd(rs, sid)
}()
buf := make([]byte, 32<<10)
for {
n, err := conn.Read(buf)
if n > 0 {
data := buf[:n]
for len(data) > 0 {
chunk := data
if len(chunk) > payloadMax() {
chunk = chunk[:payloadMax()]
}
raw, mErr := marshalRelay(relayHeader{
Cmd: CmdData,
StreamID: sid,
Length: uint16(len(chunk)),
Data: chunk,
}, payloadMax())
if mErr != nil {
return
}
r.mu.Lock()
nonce := nonceFrom(rs.hk.kb.nonce, rs.hk.bCtr)
rs.hk.bCtr++
aeadB := rs.hk.kb.aead
r.mu.Unlock()
sealed := aeadB.Seal(nil, nonce, raw, nil)
_ = r.link.Send(
context.Background(),
[]byte(rs.upPeer),
rs.upCirc,
sealed,
)
if len(data) <= len(chunk) {
break
}
data = data[len(chunk):]
}
}
if err != nil {
return
}
}
}
// exitSendEnd sends a CmdEnd upstream for a stream, layering with Kb.
func (r *Relay) exitSendEnd(rs *routeState, sid uint16) {
raw, err := marshalRelay(relayHeader{
Cmd: CmdEnd,
StreamID: sid,
Length: 0,
Data: nil,
}, payloadMax())
if err != nil {
return
}
r.mu.Lock()
nonce := nonceFrom(rs.hk.kb.nonce, rs.hk.bCtr)
rs.hk.bCtr++
aeadB := rs.hk.kb.aead
r.mu.Unlock()
sealed := aeadB.Seal(nil, nonce, raw, nil)
_ = r.link.Send(context.Background(), []byte(rs.upPeer), rs.upCirc, sealed)
}
// payload layout: fp(32) || circuitID(16) || clientEph(57) || cNonce(16)
func (r *Relay) handleCreate(srcPeerID []byte, circID uint32, payload []byte) {
if len(payload) < 32+16+57+16 {
return
}
fp := payload[0:32]
circuitID := payload[32:48]
clientEph := payload[48:105]
cNonce := payload[105:121]
// Compute shared secret using client eph pub and our onion secret
shared, err := r.routingKey.AgreeWith(clientEph)
if err != nil {
return
}
// Make server eph + nonce
serverEphPub, _, err := r.keyFn()
if err != nil {
return
}
var sNonce [16]byte
_, _ = io.ReadFull(rand.Reader, sNonce[:])
// MAC over transcript
kMac := hkdfExpand(shared, []byte("QOR-NTOR-X448 v1/handshake-mac"), 32)
transcript := concat(
[]byte("QOR-NTOR-X448 v1"),
circuitID,
fp,
clientEph,
serverEphPub,
cNonce,
sNonce[:],
)
mac := hmacSHA256(kMac, transcript)
// Derive per-hop keys and store under (this link, circID)
hk, err := deriveHopKeys(circuitID, []byte(r.selfID), shared)
if err != nil {
return
}
// Register route state for this circuit, keyed by the upstream circID
rs := &routeState{
hk: &hk,
upPeer: string(srcPeerID),
upCirc: circID,
streamUp: make(map[uint16]net.Conn),
}
r.mu.Lock()
r.routes[circID] = rs
r.mu.Unlock()
// Send CREATED (serverEph || sNonce || mac)
resp := make([]byte, 0, 57+16+32)
resp = append(resp, serverEphPub...)
resp = append(resp, sNonce[:]...)
resp = append(resp, mac...)
buf := make([]byte, 1+2+len(resp))
buf[0] = CmdCreated
binary.BigEndian.PutUint16(buf[1:3], uint16(len(resp)))
copy(buf[3:], resp)
_ = r.link.Send(context.Background(), srcPeerID, circID, buf)
}