mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 10:27:46 +08:00
Some checks are pending
CodeQL / codeql (push) Waiting to run
Docker Check / lint (push) Waiting to run
Docker Check / build (push) Waiting to run
Gateway Conformance / gateway-conformance (push) Waiting to run
Gateway Conformance / gateway-conformance-libp2p-experiment (push) Waiting to run
Go Build / go-build (push) Waiting to run
Go Check / go-check (push) Waiting to run
Go Lint / go-lint (push) Waiting to run
Go Test / go-test (push) Waiting to run
Interop / interop-prep (push) Waiting to run
Interop / helia-interop (push) Blocked by required conditions
Interop / ipfs-webui (push) Blocked by required conditions
Sharness / sharness-test (push) Waiting to run
Spell Check / spellcheck (push) Waiting to run
* feat: fast provide * Check error from provideRoot * do not provide if nil router * fix(commands): prevent panic from typed nil DHTClient interface Fixes panic when ipfsNode.DHTClient is a non-nil interface containing a nil pointer value (typed nil). This happened when Routing.Type=delegated or when using HTTP-only routing without DHT. The panic occurred because: - Go interfaces can be non-nil while containing nil pointer values - Simple `if DHTClient == nil` checks pass, but calling methods panics - Example: `(*ddht.DHT)(nil)` stored in interface passes nil check Solution: - Add HasActiveDHTClient() method to check both interface and concrete value - Update all 7 call sites to use proper check before DHT operations - Rename provideRoot → provideCIDSync for clarity - Add structured logging with "fast-provide" prefix for easier filtering - Add tests covering nil cases and valid DHT configurations Fixes: https://github.com/ipfs/kubo/pull/11046#issuecomment-3525313349 * feat(add): split fast-provide into two flags for async/sync control Renames --fast-provide to --fast-provide-root and adds --fast-provide-wait to give users control over synchronous vs asynchronous providing behavior. Changes: - --fast-provide-root (default: true): enables immediate root CID providing - --fast-provide-wait (default: false): controls whether to block until complete - Default behavior: async provide (fast, non-blocking) - Opt-in: --fast-provide-wait for guaranteed discoverability (slower, blocking) - Can disable with --fast-provide-root=false to rely on background reproviding Implementation: - Async mode: launches goroutine with detached context for fire-and-forget - Added 10 second timeout to prevent hanging on network issues - Timeout aligns with other kubo operations (ping, DNS resolve, p2p) - Sufficient for DHT with sweep provider or accelerated client - Sync mode: blocks on provideCIDSync until completion (uses req.Context) - Improved structured logging with "fast-provide-root:" prefix - Removed redundant "root CID" from messages (already in prefix) - Clear async/sync distinction in log messages - Added FAST PROVIDE OPTIMIZATION section to ipfs add --help explaining: - The problem: background queue takes time, content not immediately discoverable - The solution: extra immediate announcement of just the root CID - The benefit: peers can find content right away while queue handles rest - Usage: async by default, --fast-provide-wait for guaranteed completion Changelog: - Added highlight section for fast root CID providing feature - Updated TOC and overview - Included usage examples with clear comments explaining each mode - Emphasized this is extra announcement independent of background queue The feature works best with sweep provider and accelerated DHT client where provide operations are significantly faster. * fix(add): respect Provide config in fast-provide-root fast-provide-root should honor the same config settings as the regular provide system: - skip when Provide.Enabled is false - skip when Provide.DHT.Interval is 0 - respect Provide.Strategy (all/pinned/roots/mfs/combinations) This ensures fast-provide only runs when appropriate based on user configuration and the nature of the content being added (pinned vs unpinned, added to MFS or not). * Update core/commands/add.go --------- Co-authored-by: gammazero <11790789+gammazero@users.noreply.github.com> Co-authored-by: Marcin Rataj <lidel@lidel.org>
253 lines
6.4 KiB
Go
253 lines
6.4 KiB
Go
package commands
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"text/tabwriter"
|
|
"time"
|
|
|
|
cmdenv "github.com/ipfs/kubo/core/commands/cmdenv"
|
|
"github.com/ipfs/kubo/core/commands/cmdutils"
|
|
|
|
cmds "github.com/ipfs/go-ipfs-cmds"
|
|
dht "github.com/libp2p/go-libp2p-kad-dht"
|
|
"github.com/libp2p/go-libp2p-kad-dht/fullrt"
|
|
kbucket "github.com/libp2p/go-libp2p-kbucket"
|
|
"github.com/libp2p/go-libp2p/core/network"
|
|
pstore "github.com/libp2p/go-libp2p/core/peerstore"
|
|
)
|
|
|
|
type dhtPeerInfo struct {
|
|
ID string
|
|
Connected bool
|
|
AgentVersion string
|
|
LastUsefulAt string
|
|
LastQueriedAt string
|
|
}
|
|
|
|
type dhtStat struct {
|
|
Name string
|
|
Buckets []dhtBucket
|
|
}
|
|
|
|
type dhtBucket struct {
|
|
LastRefresh string
|
|
Peers []dhtPeerInfo
|
|
}
|
|
|
|
var statDhtCmd = &cmds.Command{
|
|
Helptext: cmds.HelpText{
|
|
Tagline: "Returns statistics about the node's DHT(s).",
|
|
ShortDescription: `
|
|
Returns statistics about the DHT(s) the node is participating in.
|
|
|
|
This interface is not stable and may change from release to release.
|
|
`,
|
|
},
|
|
Arguments: []cmds.Argument{
|
|
cmds.StringArg("dht", false, true, "The DHT whose table should be listed (wanserver, lanserver, wan, lan). "+
|
|
"wan and lan refer to client routing tables. When using the experimental DHT client only WAN is supported. Defaults to wan and lan."),
|
|
},
|
|
Options: []cmds.Option{},
|
|
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
|
|
nd, err := cmdenv.GetNode(env)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !nd.IsOnline {
|
|
return ErrNotOnline
|
|
}
|
|
|
|
if nd.DHT == nil {
|
|
return ErrNotDHT
|
|
}
|
|
|
|
id := kbucket.ConvertPeerID(nd.Identity)
|
|
|
|
dhts := req.Arguments
|
|
if len(dhts) == 0 {
|
|
dhts = []string{"wan", "lan"}
|
|
}
|
|
|
|
dhttypeloop:
|
|
for _, name := range dhts {
|
|
var dht *dht.IpfsDHT
|
|
|
|
var separateClient bool
|
|
// Check if using separate DHT client (e.g., accelerated DHT)
|
|
if nd.HasActiveDHTClient() && nd.DHTClient != nd.DHT {
|
|
separateClient = true
|
|
}
|
|
|
|
switch name {
|
|
case "wan":
|
|
if separateClient {
|
|
client, ok := nd.DHTClient.(*fullrt.FullRT)
|
|
if !ok {
|
|
return cmds.Errorf(cmds.ErrClient, "could not generate stats for the WAN DHT client type")
|
|
}
|
|
peerMap := client.Stat()
|
|
buckets := make([]dhtBucket, 1)
|
|
b := &dhtBucket{}
|
|
for _, p := range peerMap {
|
|
info := dhtPeerInfo{ID: p.String()}
|
|
|
|
if ver, err := nd.Peerstore.Get(p, "AgentVersion"); err == nil {
|
|
if vs, ok := ver.(string); ok {
|
|
info.AgentVersion = cmdutils.CleanAndTrim(vs)
|
|
}
|
|
} else if err == pstore.ErrNotFound {
|
|
// ignore
|
|
} else {
|
|
// this is a bug, usually.
|
|
log.Errorw(
|
|
"failed to get agent version from peerstore",
|
|
"error", err,
|
|
)
|
|
}
|
|
|
|
info.Connected = nd.PeerHost.Network().Connectedness(p) == network.Connected
|
|
b.Peers = append(b.Peers, info)
|
|
}
|
|
buckets[0] = *b
|
|
|
|
if err := res.Emit(dhtStat{
|
|
Name: name,
|
|
Buckets: buckets,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
continue dhttypeloop
|
|
}
|
|
fallthrough
|
|
case "wanserver":
|
|
dht = nd.DHT.WAN
|
|
case "lan":
|
|
if separateClient {
|
|
return cmds.Errorf(cmds.ErrClient, "no LAN client found")
|
|
}
|
|
fallthrough
|
|
case "lanserver":
|
|
dht = nd.DHT.LAN
|
|
default:
|
|
return cmds.Errorf(cmds.ErrClient, "unknown dht type: %s", name)
|
|
}
|
|
|
|
rt := dht.RoutingTable()
|
|
lastRefresh := rt.GetTrackedCplsForRefresh()
|
|
infos := rt.GetPeerInfos()
|
|
buckets := make([]dhtBucket, 0, len(lastRefresh))
|
|
for _, pi := range infos {
|
|
cpl := kbucket.CommonPrefixLen(id, kbucket.ConvertPeerID(pi.Id))
|
|
if len(buckets) <= cpl {
|
|
buckets = append(buckets, make([]dhtBucket, 1+cpl-len(buckets))...)
|
|
}
|
|
|
|
info := dhtPeerInfo{ID: pi.Id.String()}
|
|
|
|
if ver, err := nd.Peerstore.Get(pi.Id, "AgentVersion"); err == nil {
|
|
if vs, ok := ver.(string); ok {
|
|
info.AgentVersion = cmdutils.CleanAndTrim(vs)
|
|
}
|
|
} else if err == pstore.ErrNotFound {
|
|
// ignore
|
|
} else {
|
|
// this is a bug, usually.
|
|
log.Errorw(
|
|
"failed to get agent version from peerstore",
|
|
"error", err,
|
|
)
|
|
}
|
|
if !pi.LastUsefulAt.IsZero() {
|
|
info.LastUsefulAt = pi.LastUsefulAt.Format(time.RFC3339)
|
|
}
|
|
|
|
if !pi.LastSuccessfulOutboundQueryAt.IsZero() {
|
|
info.LastQueriedAt = pi.LastSuccessfulOutboundQueryAt.Format(time.RFC3339)
|
|
}
|
|
|
|
info.Connected = nd.PeerHost.Network().Connectedness(pi.Id) == network.Connected
|
|
|
|
buckets[cpl].Peers = append(buckets[cpl].Peers, info)
|
|
}
|
|
for i := 0; i < len(buckets) && i < len(lastRefresh); i++ {
|
|
refreshTime := lastRefresh[i]
|
|
if !refreshTime.IsZero() {
|
|
buckets[i].LastRefresh = refreshTime.Format(time.RFC3339)
|
|
}
|
|
}
|
|
if err := res.Emit(dhtStat{
|
|
Name: name,
|
|
Buckets: buckets,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
},
|
|
Encoders: cmds.EncoderMap{
|
|
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out dhtStat) error {
|
|
tw := tabwriter.NewWriter(w, 4, 4, 2, ' ', 0)
|
|
defer tw.Flush()
|
|
|
|
// Formats a time into XX ago and remove any decimal
|
|
// parts. That is, change "2m3.00010101s" to "2m3s ago".
|
|
now := time.Now()
|
|
since := func(t time.Time) string {
|
|
return now.Sub(t).Round(time.Second).String() + " ago"
|
|
}
|
|
|
|
count := 0
|
|
for _, bucket := range out.Buckets {
|
|
count += len(bucket.Peers)
|
|
}
|
|
|
|
fmt.Fprintf(tw, "DHT %s (%d peers):\t\t\t\n", out.Name, count)
|
|
|
|
for i, bucket := range out.Buckets {
|
|
lastRefresh := "never"
|
|
if bucket.LastRefresh != "" {
|
|
t, err := time.Parse(time.RFC3339, bucket.LastRefresh)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
lastRefresh = since(t)
|
|
}
|
|
fmt.Fprintf(tw, " Bucket %2d (%d peers) - refreshed %s:\t\t\t\n", i, len(bucket.Peers), lastRefresh)
|
|
fmt.Fprintln(tw, " Peer\tlast useful\tlast queried\tAgent Version")
|
|
|
|
for _, p := range bucket.Peers {
|
|
lastUseful := "never"
|
|
if p.LastUsefulAt != "" {
|
|
t, err := time.Parse(time.RFC3339, p.LastUsefulAt)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
lastUseful = since(t)
|
|
}
|
|
|
|
lastQueried := "never"
|
|
if p.LastUsefulAt != "" {
|
|
t, err := time.Parse(time.RFC3339, p.LastQueriedAt)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
lastQueried = since(t)
|
|
}
|
|
|
|
state := " "
|
|
if p.Connected {
|
|
state = "@"
|
|
}
|
|
fmt.Fprintf(tw, " %s %s\t%s\t%s\t%s\n", state, p.ID, lastUseful, lastQueried, p.AgentVersion)
|
|
}
|
|
fmt.Fprintln(tw, "\t\t\t")
|
|
}
|
|
return nil
|
|
}),
|
|
},
|
|
Type: dhtStat{},
|
|
}
|