mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-24 03:47:45 +08:00
Merge 3584e795c9 into 07ad431b00
This commit is contained in:
commit
5686dcd59b
@ -7,7 +7,6 @@ import (
|
||||
|
||||
"github.com/dustin/go-humanize"
|
||||
mdag "github.com/ipfs/boxo/ipld/merkledag"
|
||||
"github.com/ipfs/boxo/ipld/merkledag/traverse"
|
||||
cid "github.com/ipfs/go-cid"
|
||||
cmds "github.com/ipfs/go-ipfs-cmds"
|
||||
"github.com/ipfs/kubo/core/commands/cmdenv"
|
||||
@ -15,9 +14,34 @@ import (
|
||||
"github.com/ipfs/kubo/core/commands/e"
|
||||
)
|
||||
|
||||
// TODO cache every cid traversal in a dp cache
|
||||
// if the cid exists in the cache, don't traverse it, and use the cached result
|
||||
// to compute the new state
|
||||
// cidStatCache caches the statistics for already-traversed CIDs to avoid
|
||||
// redundant traversals when multiple DAGs share common subgraphs
|
||||
type cidStatCache struct {
|
||||
stats map[string]*cachedStat
|
||||
}
|
||||
|
||||
type cachedStat struct {
|
||||
size uint64
|
||||
numBlocks int64
|
||||
}
|
||||
|
||||
func newCidStatCache() *cidStatCache {
|
||||
return &cidStatCache{
|
||||
stats: make(map[string]*cachedStat),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cidStatCache) get(cid cid.Cid) (*cachedStat, bool) {
|
||||
stat, ok := c.stats[cid.String()]
|
||||
return stat, ok
|
||||
}
|
||||
|
||||
func (c *cidStatCache) put(cid cid.Cid, size uint64, numBlocks int64) {
|
||||
c.stats[cid.String()] = &cachedStat{
|
||||
size: size,
|
||||
numBlocks: numBlocks,
|
||||
}
|
||||
}
|
||||
|
||||
func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
|
||||
// Default to true (emit intermediate states) for HTTP/RPC clients that want progress
|
||||
@ -32,6 +56,7 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment)
|
||||
nodeGetter := mdag.NewSession(req.Context, api.Dag())
|
||||
|
||||
cidSet := cid.NewSet()
|
||||
cache := newCidStatCache()
|
||||
dagStatSummary := &DagStatSummary{DagStatsArray: []*DagStat{}}
|
||||
for _, a := range req.Arguments {
|
||||
p, err := cmdutils.PathOrCidPath(a)
|
||||
@ -46,37 +71,81 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment)
|
||||
return fmt.Errorf("cannot return size for anything other than a DAG with a root CID")
|
||||
}
|
||||
|
||||
obj, err := nodeGetter.Get(req.Context, rp.RootCid())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dagstats := &DagStat{Cid: rp.RootCid()}
|
||||
dagStatSummary.appendStats(dagstats)
|
||||
err = traverse.Traverse(obj, traverse.Options{
|
||||
DAG: nodeGetter,
|
||||
Order: traverse.DFSPre,
|
||||
Func: func(current traverse.State) error {
|
||||
currentNodeSize := uint64(len(current.Node.RawData()))
|
||||
dagstats.Size += currentNodeSize
|
||||
dagstats.NumBlocks++
|
||||
if !cidSet.Has(current.Node.Cid()) {
|
||||
dagStatSummary.incrementTotalSize(currentNodeSize)
|
||||
|
||||
// Use a custom recursive traversal with DP caching
|
||||
var traverseWithCache func(c cid.Cid) (*cachedStat, error)
|
||||
traverseWithCache = func(c cid.Cid) (*cachedStat, error) {
|
||||
// Check cache first - this is the DP optimization
|
||||
// If cached, just return the stats without updating global counters
|
||||
if cached, ok := cache.get(c); ok {
|
||||
// Still need to track redundant access
|
||||
node, err := nodeGetter.Get(req.Context, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dagStatSummary.incrementRedundantSize(currentNodeSize)
|
||||
cidSet.Add(current.Node.Cid())
|
||||
nodeSize := uint64(len(node.RawData()))
|
||||
dagStatSummary.incrementRedundantSize(nodeSize)
|
||||
cidSet.Add(c)
|
||||
|
||||
if progressive {
|
||||
if err := res.Emit(dagStatSummary); err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
ErrFunc: nil,
|
||||
SkipDuplicates: true,
|
||||
})
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
node, err := nodeGetter.Get(req.Context, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodeSize := uint64(len(node.RawData()))
|
||||
subtreeSize := nodeSize
|
||||
subtreeBlocks := int64(1)
|
||||
|
||||
// Update global tracking for this new node
|
||||
if !cidSet.Has(c) {
|
||||
dagStatSummary.incrementTotalSize(nodeSize)
|
||||
}
|
||||
dagStatSummary.incrementRedundantSize(nodeSize)
|
||||
cidSet.Add(c)
|
||||
|
||||
// Recursively compute stats for all children
|
||||
for _, link := range node.Links() {
|
||||
childStats, err := traverseWithCache(link.Cid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
subtreeSize += childStats.size
|
||||
subtreeBlocks += childStats.numBlocks
|
||||
}
|
||||
|
||||
// Cache this node's subtree stats
|
||||
stat := &cachedStat{
|
||||
size: subtreeSize,
|
||||
numBlocks: subtreeBlocks,
|
||||
}
|
||||
cache.put(c, subtreeSize, subtreeBlocks)
|
||||
|
||||
if progressive {
|
||||
if err := res.Emit(dagStatSummary); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return stat, nil
|
||||
}
|
||||
|
||||
rootStats, err := traverseWithCache(rp.RootCid())
|
||||
if err != nil {
|
||||
return fmt.Errorf("error traversing DAG: %w", err)
|
||||
}
|
||||
|
||||
dagstats.Size = rootStats.size
|
||||
dagstats.NumBlocks = rootStats.numBlocks
|
||||
}
|
||||
|
||||
dagStatSummary.UniqueBlocks = cidSet.Len()
|
||||
|
||||
@ -305,3 +305,64 @@ func TestDagImportFastProvide(t *testing.T) {
|
||||
require.Contains(t, daemonLog, "fast-provide-root: skipped")
|
||||
})
|
||||
}
|
||||
|
||||
func TestDagStatCaching(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("cache reuses stats for duplicate CIDs", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
node := harness.NewT(t).NewNode().Init().StartDaemon()
|
||||
|
||||
// Import the fixture with shared subgraph
|
||||
r, err := os.Open(fixtureFile)
|
||||
require.NoError(t, err)
|
||||
defer r.Close()
|
||||
err = node.IPFSDagImport(r, fixtureCid)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Run dag stat on the same CID multiple times - cache should make it consistent
|
||||
stat1 := node.RunIPFS("dag", "stat", "--progress=false", "--enc=json", node1Cid)
|
||||
stat2 := node.RunIPFS("dag", "stat", "--progress=false", "--enc=json", node1Cid)
|
||||
|
||||
// Both should return identical results
|
||||
assert.Equal(t, stat1.Stdout.Bytes(), stat2.Stdout.Bytes())
|
||||
|
||||
// Parse and verify the stats are correct
|
||||
var data1, data2 Data
|
||||
err = json.Unmarshal(stat1.Stdout.Bytes(), &data1)
|
||||
require.NoError(t, err)
|
||||
err = json.Unmarshal(stat2.Stdout.Bytes(), &data2)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, data1, data2)
|
||||
assert.Equal(t, 53, data1.TotalSize) // node1 size (46) + child (7)
|
||||
assert.Equal(t, 2, data1.DagStats[0].NumBlocks)
|
||||
})
|
||||
|
||||
t.Run("cache works across multiple CIDs with shared subgraph", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
node := harness.NewT(t).NewNode().Init().StartDaemon()
|
||||
|
||||
r, err := os.Open(fixtureFile)
|
||||
require.NoError(t, err)
|
||||
defer r.Close()
|
||||
err = node.IPFSDagImport(r, fixtureCid)
|
||||
require.NoError(t, err)
|
||||
|
||||
// When querying both nodes that share a child, the shared child should
|
||||
// only be counted once in TotalSize but twice in redundant size
|
||||
stat := node.RunIPFS("dag", "stat", "--progress=false", "--enc=json", node1Cid, node2Cid)
|
||||
var data Data
|
||||
err = json.Unmarshal(stat.Stdout.Bytes(), &data)
|
||||
require.NoError(t, err)
|
||||
|
||||
// TotalSize should be: node1(46) + node2(46) + shared_child(7) = 99
|
||||
assert.Equal(t, 99, data.TotalSize)
|
||||
// SharedSize should be: shared_child(7) counted again = 7
|
||||
assert.Equal(t, 7, data.SharedSize)
|
||||
// Ratio should be (99+7)/99 ≈ 1.0707
|
||||
expectedRatio := float64(99+7) / float64(99)
|
||||
assert.Equal(t, testutils.FloatTruncate(expectedRatio, 4), testutils.FloatTruncate(data.Ratio, 4))
|
||||
assert.Equal(t, 3, data.UniqueBlocks)
|
||||
})
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user