mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-11 11:19:05 +08:00
otelhttp derives server.address from the Host header, which creates
a unique time series for every subdomain hostname on public gateways
(e.g. each CID.ipfs.dweb.link). this caused multi-gigabyte prometheus
responses and scrape timeouts.
- cmd/ipfs/kubo/daemon.go: add OTel SDK View that drops server.address
from all http.server.* metrics at the MeterProvider level
- core/corehttp/gateway.go: add server.domain attribute to Gateway and
HostnameGateway handlers, grouping by Gateway.PublicGateways suffix
(e.g. "dweb.link"), with "localhost", "loopback", or "other" fallbacks
- core/corehttp/commands.go: add server.domain="api" to RPC handler
- core/corehttp/gateway.go: add server.domain="libp2p" to libp2p handler
- docs/changelogs/v0.40.md: add changelog highlight
- docs/metrics.md: document server_domain label and server_address drop
(cherry picked from commit 6a082f60b1)
409 lines
14 KiB
Go
409 lines
14 KiB
Go
package corehttp
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"maps"
|
|
"net"
|
|
"net/http"
|
|
"slices"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/ipfs/boxo/blockservice"
|
|
"github.com/ipfs/boxo/exchange/offline"
|
|
"github.com/ipfs/boxo/files"
|
|
"github.com/ipfs/boxo/gateway"
|
|
"github.com/ipfs/boxo/namesys"
|
|
"github.com/ipfs/boxo/path"
|
|
offlineroute "github.com/ipfs/boxo/routing/offline"
|
|
"github.com/ipfs/go-cid"
|
|
version "github.com/ipfs/kubo"
|
|
"github.com/ipfs/kubo/config"
|
|
"github.com/ipfs/kubo/core"
|
|
iface "github.com/ipfs/kubo/core/coreiface"
|
|
"github.com/ipfs/kubo/core/node"
|
|
"github.com/libp2p/go-libp2p/core/routing"
|
|
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
)
|
|
|
|
func GatewayOption(paths ...string) ServeOption {
|
|
return func(n *core.IpfsNode, _ net.Listener, mux *http.ServeMux) (*http.ServeMux, error) {
|
|
config, headers, err := getGatewayConfig(n)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
backend, err := newGatewayBackend(n)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
handler := gateway.NewHandler(config, backend)
|
|
handler = gateway.NewHeaders(headers).ApplyCors().Wrap(handler)
|
|
var otelOpts []otelhttp.Option
|
|
if fn := newServerDomainAttrFn(n); fn != nil {
|
|
otelOpts = append(otelOpts, otelhttp.WithMetricAttributesFn(fn))
|
|
}
|
|
handler = otelhttp.NewHandler(handler, "Gateway", otelOpts...)
|
|
|
|
for _, p := range paths {
|
|
mux.Handle(p+"/", handler)
|
|
}
|
|
|
|
return mux, nil
|
|
}
|
|
}
|
|
|
|
func HostnameOption() ServeOption {
|
|
return func(n *core.IpfsNode, _ net.Listener, mux *http.ServeMux) (*http.ServeMux, error) {
|
|
config, headers, err := getGatewayConfig(n)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
backend, err := newGatewayBackend(n)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
childMux := http.NewServeMux()
|
|
|
|
var handler http.Handler
|
|
handler = gateway.NewHostnameHandler(config, backend, childMux)
|
|
handler = gateway.NewHeaders(headers).ApplyCors().Wrap(handler)
|
|
var otelOpts []otelhttp.Option
|
|
if fn := newServerDomainAttrFn(n); fn != nil {
|
|
otelOpts = append(otelOpts, otelhttp.WithMetricAttributesFn(fn))
|
|
}
|
|
handler = otelhttp.NewHandler(handler, "HostnameGateway", otelOpts...)
|
|
|
|
mux.Handle("/", handler)
|
|
return childMux, nil
|
|
}
|
|
}
|
|
|
|
func VersionOption() ServeOption {
|
|
return func(_ *core.IpfsNode, _ net.Listener, mux *http.ServeMux) (*http.ServeMux, error) {
|
|
mux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
|
|
fmt.Fprintf(w, "Commit: %s\n", version.CurrentCommit)
|
|
fmt.Fprintf(w, "Client Version: %s\n", version.GetUserAgentVersion())
|
|
})
|
|
return mux, nil
|
|
}
|
|
}
|
|
|
|
func Libp2pGatewayOption() ServeOption {
|
|
return func(n *core.IpfsNode, _ net.Listener, mux *http.ServeMux) (*http.ServeMux, error) {
|
|
bserv := blockservice.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
|
|
|
|
backend, err := gateway.NewBlocksBackend(bserv,
|
|
// GatewayOverLibp2p only returns things that are in local blockstore
|
|
// (same as Gateway.NoFetch=true), we have to pass offline path resolver
|
|
gateway.WithResolver(n.OfflineUnixFSPathResolver),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Get gateway configuration from the node's config
|
|
cfg, err := n.Repo.Config()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
gwConfig := gateway.Config{
|
|
// Keep these constraints for security
|
|
DeserializedResponses: false, // Trustless-only
|
|
NoDNSLink: true, // No DNS resolution
|
|
DisableHTMLErrors: true, // Plain text errors only
|
|
PublicGateways: nil,
|
|
Menu: nil,
|
|
// Apply timeout and concurrency limits from user config
|
|
RetrievalTimeout: cfg.Gateway.RetrievalTimeout.WithDefault(config.DefaultRetrievalTimeout),
|
|
MaxRequestDuration: cfg.Gateway.MaxRequestDuration.WithDefault(config.DefaultMaxRequestDuration),
|
|
MaxConcurrentRequests: int(cfg.Gateway.MaxConcurrentRequests.WithDefault(int64(config.DefaultMaxConcurrentRequests))),
|
|
MaxRangeRequestFileSize: int64(cfg.Gateway.MaxRangeRequestFileSize.WithDefault(uint64(config.DefaultMaxRangeRequestFileSize))),
|
|
DiagnosticServiceURL: "", // Not used since DisableHTMLErrors=true
|
|
}
|
|
|
|
handler := gateway.NewHandler(gwConfig, &offlineGatewayErrWrapper{gwimpl: backend})
|
|
handler = otelhttp.NewHandler(handler, "Libp2p-Gateway",
|
|
otelhttp.WithMetricAttributesFn(staticServerDomainAttrFn("libp2p")),
|
|
)
|
|
|
|
mux.Handle("/ipfs/", handler)
|
|
|
|
return mux, nil
|
|
}
|
|
}
|
|
|
|
func newGatewayBackend(n *core.IpfsNode) (gateway.IPFSBackend, error) {
|
|
cfg, err := n.Repo.Config()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
bserv := n.Blocks
|
|
var vsRouting routing.ValueStore = n.Routing
|
|
nsys := n.Namesys
|
|
pathResolver := n.UnixFSPathResolver
|
|
|
|
if cfg.Gateway.NoFetch {
|
|
bserv = blockservice.New(bserv.Blockstore(), offline.Exchange(bserv.Blockstore()))
|
|
|
|
cs := cfg.Ipns.ResolveCacheSize
|
|
if cs == 0 {
|
|
cs = node.DefaultIpnsCacheSize
|
|
}
|
|
if cs < 0 {
|
|
return nil, fmt.Errorf("cannot specify negative resolve cache size")
|
|
}
|
|
|
|
nsOptions := []namesys.Option{
|
|
namesys.WithDatastore(n.Repo.Datastore()),
|
|
namesys.WithDNSResolver(n.DNSResolver),
|
|
namesys.WithCache(cs),
|
|
namesys.WithMaxCacheTTL(cfg.Ipns.MaxCacheTTL.WithDefault(config.DefaultIpnsMaxCacheTTL)),
|
|
}
|
|
|
|
vsRouting = offlineroute.NewOfflineRouter(n.Repo.Datastore(), n.RecordValidator)
|
|
nsys, err = namesys.NewNameSystem(vsRouting, nsOptions...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error constructing namesys: %w", err)
|
|
}
|
|
|
|
// Gateway.NoFetch=true requires offline path resolver
|
|
// to avoid fetching missing blocks during path traversal
|
|
pathResolver = n.OfflineUnixFSPathResolver
|
|
}
|
|
|
|
backend, err := gateway.NewBlocksBackend(bserv,
|
|
gateway.WithValueStore(vsRouting),
|
|
gateway.WithNameSystem(nsys),
|
|
gateway.WithResolver(pathResolver),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &offlineGatewayErrWrapper{gwimpl: backend}, nil
|
|
}
|
|
|
|
type offlineGatewayErrWrapper struct {
|
|
gwimpl gateway.IPFSBackend
|
|
}
|
|
|
|
func offlineErrWrap(err error) error {
|
|
if errors.Is(err, iface.ErrOffline) {
|
|
return fmt.Errorf("%s : %w", err.Error(), gateway.ErrServiceUnavailable)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (o *offlineGatewayErrWrapper) Get(ctx context.Context, path path.ImmutablePath, ranges ...gateway.ByteRange) (gateway.ContentPathMetadata, *gateway.GetResponse, error) {
|
|
md, n, err := o.gwimpl.Get(ctx, path, ranges...)
|
|
err = offlineErrWrap(err)
|
|
return md, n, err
|
|
}
|
|
|
|
func (o *offlineGatewayErrWrapper) GetAll(ctx context.Context, path path.ImmutablePath) (gateway.ContentPathMetadata, files.Node, error) {
|
|
md, n, err := o.gwimpl.GetAll(ctx, path)
|
|
err = offlineErrWrap(err)
|
|
return md, n, err
|
|
}
|
|
|
|
func (o *offlineGatewayErrWrapper) GetBlock(ctx context.Context, path path.ImmutablePath) (gateway.ContentPathMetadata, files.File, error) {
|
|
md, n, err := o.gwimpl.GetBlock(ctx, path)
|
|
err = offlineErrWrap(err)
|
|
return md, n, err
|
|
}
|
|
|
|
func (o *offlineGatewayErrWrapper) Head(ctx context.Context, path path.ImmutablePath) (gateway.ContentPathMetadata, *gateway.HeadResponse, error) {
|
|
md, n, err := o.gwimpl.Head(ctx, path)
|
|
err = offlineErrWrap(err)
|
|
return md, n, err
|
|
}
|
|
|
|
func (o *offlineGatewayErrWrapper) ResolvePath(ctx context.Context, path path.ImmutablePath) (gateway.ContentPathMetadata, error) {
|
|
md, err := o.gwimpl.ResolvePath(ctx, path)
|
|
err = offlineErrWrap(err)
|
|
return md, err
|
|
}
|
|
|
|
func (o *offlineGatewayErrWrapper) GetCAR(ctx context.Context, path path.ImmutablePath, params gateway.CarParams) (gateway.ContentPathMetadata, io.ReadCloser, error) {
|
|
md, data, err := o.gwimpl.GetCAR(ctx, path, params)
|
|
err = offlineErrWrap(err)
|
|
return md, data, err
|
|
}
|
|
|
|
func (o *offlineGatewayErrWrapper) IsCached(ctx context.Context, path path.Path) bool {
|
|
return o.gwimpl.IsCached(ctx, path)
|
|
}
|
|
|
|
func (o *offlineGatewayErrWrapper) GetIPNSRecord(ctx context.Context, c cid.Cid) ([]byte, error) {
|
|
rec, err := o.gwimpl.GetIPNSRecord(ctx, c)
|
|
err = offlineErrWrap(err)
|
|
return rec, err
|
|
}
|
|
|
|
func (o *offlineGatewayErrWrapper) ResolveMutable(ctx context.Context, path path.Path) (path.ImmutablePath, time.Duration, time.Time, error) {
|
|
imPath, ttl, lastMod, err := o.gwimpl.ResolveMutable(ctx, path)
|
|
err = offlineErrWrap(err)
|
|
return imPath, ttl, lastMod, err
|
|
}
|
|
|
|
func (o *offlineGatewayErrWrapper) GetDNSLinkRecord(ctx context.Context, s string) (path.Path, error) {
|
|
p, err := o.gwimpl.GetDNSLinkRecord(ctx, s)
|
|
err = offlineErrWrap(err)
|
|
return p, err
|
|
}
|
|
|
|
var _ gateway.IPFSBackend = (*offlineGatewayErrWrapper)(nil)
|
|
|
|
var defaultPaths = []string{"/ipfs/", "/ipns/", "/p2p/"}
|
|
|
|
// serverDomainAttrKey is the OTel attribute key for the logical server domain.
|
|
// It replaces the high-cardinality server.address attribute (dropped by the
|
|
// View in cmd/ipfs/kubo/daemon.go) with a bounded set of values: configured
|
|
// Gateway.PublicGateways suffixes, "localhost", "loopback", "api", "libp2p",
|
|
// or "other".
|
|
var serverDomainAttrKey = attribute.Key("server.domain")
|
|
|
|
// staticServerDomainAttrFn returns a MetricAttributesFn that always returns
|
|
// a fixed server.domain value. Use for handlers where the domain is known
|
|
// statically (e.g. "api", "libp2p") to keep the label set consistent across
|
|
// all http_server_* metrics.
|
|
func staticServerDomainAttrFn(domain string) func(*http.Request) []attribute.KeyValue {
|
|
attrs := []attribute.KeyValue{serverDomainAttrKey.String(domain)}
|
|
return func(*http.Request) []attribute.KeyValue { return attrs }
|
|
}
|
|
|
|
// newServerDomainAttrFn returns an otelhttp.WithMetricAttributesFn callback
|
|
// that adds a server.domain attribute grouping requests by their matching
|
|
// Gateway.PublicGateways hostname suffix (e.g. "dweb.link", "ipfs.io").
|
|
// Requests that don't match any configured gateway get "other".
|
|
//
|
|
// All return values are pre-allocated at setup time so the per-request
|
|
// closure is zero-allocation.
|
|
func newServerDomainAttrFn(n *core.IpfsNode) func(*http.Request) []attribute.KeyValue {
|
|
cfg, err := n.Repo.Config()
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
// Collect non-nil gateway domain suffixes, sorted longest-first
|
|
// so more-specific suffixes match before shorter ones.
|
|
// Strip ports from keys to match boxo's fallback behavior
|
|
// (boxo tries exact match with port, then strips port and retries).
|
|
seen := make(map[string]struct{}, len(cfg.Gateway.PublicGateways))
|
|
suffixes := make([]string, 0, len(cfg.Gateway.PublicGateways))
|
|
for hostname, gw := range cfg.Gateway.PublicGateways {
|
|
if gw == nil {
|
|
continue
|
|
}
|
|
if h, _, err := net.SplitHostPort(hostname); err == nil {
|
|
hostname = h
|
|
}
|
|
if _, ok := seen[hostname]; ok {
|
|
continue
|
|
}
|
|
seen[hostname] = struct{}{}
|
|
suffixes = append(suffixes, hostname)
|
|
}
|
|
slices.SortFunc(suffixes, func(a, b string) int {
|
|
return len(b) - len(a)
|
|
})
|
|
|
|
// Pre-allocate attribute slices so the per-request closure only returns
|
|
// existing slices and does not allocate.
|
|
suffixAttrs := make([][]attribute.KeyValue, len(suffixes))
|
|
for i, s := range suffixes {
|
|
suffixAttrs[i] = []attribute.KeyValue{serverDomainAttrKey.String(s)}
|
|
}
|
|
localhostAttr := []attribute.KeyValue{serverDomainAttrKey.String("localhost")}
|
|
loopbackAttr := []attribute.KeyValue{serverDomainAttrKey.String("loopback")}
|
|
otherAttr := []attribute.KeyValue{serverDomainAttrKey.String("other")}
|
|
|
|
return func(r *http.Request) []attribute.KeyValue {
|
|
host := r.Host
|
|
if h, _, err := net.SplitHostPort(host); err == nil {
|
|
host = h
|
|
}
|
|
|
|
// Check localhost/loopback before iterating suffixes.
|
|
// "localhost" is an implicit default gateway (defaultKnownGateways)
|
|
// not present in cfg.Gateway.PublicGateways, so it won't appear
|
|
// in suffixes.
|
|
if host == "localhost" || strings.HasSuffix(host, ".localhost") {
|
|
return localhostAttr
|
|
}
|
|
if strings.HasPrefix(host, "127.") || host == "::1" {
|
|
return loopbackAttr
|
|
}
|
|
|
|
for i, suffix := range suffixes {
|
|
if strings.HasSuffix(host, suffix) {
|
|
return suffixAttrs[i]
|
|
}
|
|
}
|
|
|
|
return otherAttr
|
|
}
|
|
}
|
|
|
|
var subdomainGatewaySpec = &gateway.PublicGateway{
|
|
Paths: defaultPaths,
|
|
UseSubdomains: true,
|
|
}
|
|
|
|
var defaultKnownGateways = map[string]*gateway.PublicGateway{
|
|
"localhost": subdomainGatewaySpec,
|
|
}
|
|
|
|
func getGatewayConfig(n *core.IpfsNode) (gateway.Config, map[string][]string, error) {
|
|
cfg, err := n.Repo.Config()
|
|
if err != nil {
|
|
return gateway.Config{}, nil, err
|
|
}
|
|
|
|
// Initialize gateway configuration, with empty PublicGateways, handled after.
|
|
gwCfg := gateway.Config{
|
|
DeserializedResponses: cfg.Gateway.DeserializedResponses.WithDefault(config.DefaultDeserializedResponses),
|
|
AllowCodecConversion: cfg.Gateway.AllowCodecConversion.WithDefault(config.DefaultAllowCodecConversion),
|
|
DisableHTMLErrors: cfg.Gateway.DisableHTMLErrors.WithDefault(config.DefaultDisableHTMLErrors),
|
|
NoDNSLink: cfg.Gateway.NoDNSLink,
|
|
PublicGateways: map[string]*gateway.PublicGateway{},
|
|
RetrievalTimeout: cfg.Gateway.RetrievalTimeout.WithDefault(config.DefaultRetrievalTimeout),
|
|
MaxRequestDuration: cfg.Gateway.MaxRequestDuration.WithDefault(config.DefaultMaxRequestDuration),
|
|
MaxConcurrentRequests: int(cfg.Gateway.MaxConcurrentRequests.WithDefault(int64(config.DefaultMaxConcurrentRequests))),
|
|
MaxRangeRequestFileSize: int64(cfg.Gateway.MaxRangeRequestFileSize.WithDefault(uint64(config.DefaultMaxRangeRequestFileSize))),
|
|
DiagnosticServiceURL: cfg.Gateway.DiagnosticServiceURL.WithDefault(config.DefaultDiagnosticServiceURL),
|
|
}
|
|
|
|
// Add default implicit known gateways, such as subdomain gateway on localhost.
|
|
maps.Copy(gwCfg.PublicGateways, defaultKnownGateways)
|
|
|
|
// Apply values from cfg.Gateway.PublicGateways if they exist.
|
|
for hostname, gw := range cfg.Gateway.PublicGateways {
|
|
if gw == nil {
|
|
// Remove any implicit defaults, if present. This is useful when one
|
|
// wants to disable subdomain gateway on localhost, etc.
|
|
delete(gwCfg.PublicGateways, hostname)
|
|
continue
|
|
}
|
|
|
|
gwCfg.PublicGateways[hostname] = &gateway.PublicGateway{
|
|
Paths: gw.Paths,
|
|
NoDNSLink: gw.NoDNSLink,
|
|
UseSubdomains: gw.UseSubdomains,
|
|
InlineDNSLink: gw.InlineDNSLink.WithDefault(config.DefaultInlineDNSLink),
|
|
DeserializedResponses: gw.DeserializedResponses.WithDefault(gwCfg.DeserializedResponses),
|
|
}
|
|
}
|
|
|
|
return gwCfg, cfg.Gateway.HTTPHeaders, nil
|
|
}
|