fix(test): fix flaky kubo-as-a-library and GetClosestPeers tests

kubo-as-a-library: use `Bootstrap()` instead of raw `Swarm().Connect()`
to fix race condition between swarm connection and bitswap peer
discovery. `Bootstrap()` properly integrates peers into the routing
system, ensuring bitswap learns about connected peers synchronously.

GetClosestPeers: simplify retry logic using `EventuallyWithT` with
10-minute timeout. tests all 4 routing types (`auto`, `autoclient`,
`dht`, `dhtclient`) against real bootstrap peers with patient polling.
This commit is contained in:
Marcin Rataj 2025-12-19 19:39:42 +01:00
parent 553ae4b0b5
commit d45a0073a5
2 changed files with 40 additions and 109 deletions

View File

@ -5,17 +5,16 @@ import (
"flag"
"fmt"
"io"
"log"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/ipfs/boxo/bootstrap"
"github.com/ipfs/boxo/files"
"github.com/ipfs/boxo/path"
icore "github.com/ipfs/kubo/core/coreiface"
ma "github.com/multiformats/go-multiaddr"
"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/core"
@ -68,6 +67,10 @@ func createTempRepo(swarmPort int) (string, error) {
fmt.Sprintf("/ip4/0.0.0.0/udp/%d/webrtc-direct", swarmPort),
}
// Disable auto-bootstrap. For this example, we manually connect only the peers we need.
// In production, you'd typically keep the default bootstrap peers to join the network.
cfg.Bootstrap = []string{}
// When creating the repository, you can define custom settings on the repository, such as enabling experimental
// features (See experimental-features.md) or customizing the gateway endpoint.
// To do such things, you should modify the variable `cfg`. For example:
@ -144,40 +147,6 @@ func spawnEphemeral(ctx context.Context, swarmPort int) (icore.CoreAPI, *core.Ip
return api, node, err
}
func connectToPeers(ctx context.Context, ipfs icore.CoreAPI, peers []string) error {
var wg sync.WaitGroup
peerInfos := make(map[peer.ID]*peer.AddrInfo, len(peers))
for _, addrStr := range peers {
addr, err := ma.NewMultiaddr(addrStr)
if err != nil {
return err
}
pii, err := peer.AddrInfoFromP2pAddr(addr)
if err != nil {
return err
}
pi, ok := peerInfos[pii.ID]
if !ok {
pi = &peer.AddrInfo{ID: pii.ID}
peerInfos[pi.ID] = pi
}
pi.Addrs = append(pi.Addrs, pii.Addrs...)
}
wg.Add(len(peerInfos))
for _, peerInfo := range peerInfos {
go func(peerInfo *peer.AddrInfo) {
defer wg.Done()
err := ipfs.Swarm().Connect(ctx, *peerInfo)
if err != nil {
log.Printf("failed to connect to %s: %s", peerInfo.ID, err)
}
}(peerInfo)
}
wg.Wait()
return nil
}
func getUnixfsNode(path string) (files.Node, error) {
st, err := os.Stat(path)
if err != nil {
@ -224,7 +193,7 @@ func main() {
// Spawn a node using a temporary path, creating a temporary repo for the run
// Using port 4011 (different from nodeA's port 4010)
fmt.Println("Spawning Kubo node on a temporary repo")
ipfsB, _, err := spawnEphemeral(ctx, 4011)
ipfsB, nodeB, err := spawnEphemeral(ctx, 4011)
if err != nil {
panic(fmt.Errorf("failed to spawn ephemeral node: %s", err))
}
@ -297,38 +266,30 @@ func main() {
fmt.Printf("Got directory back from IPFS (IPFS path: %s) and wrote it to %s\n", cidDirectory.String(), outputPathDirectory)
/// --- Part IV: Getting a file from the IPFS Network
/// --- Part IV: Getting a file from another IPFS node
fmt.Println("\n-- Going to connect to a few nodes in the Network as bootstrappers --")
fmt.Println("\n-- Connecting to nodeA and fetching content via bitswap --")
// Get nodeA's address so we can fetch the file we added to it
peerAddrs, err := ipfsA.Swarm().LocalAddrs(ctx)
if err != nil {
panic(fmt.Errorf("could not get peer addresses: %s", err))
// In production, you'd typically connect to bootstrap peers to join the IPFS network.
// autoconf.FallbackBootstrapPeers from boxo/autoconf provides well-known bootstrap peers:
// "/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
// "/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
// "/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ",
//
// For this example, we only connect nodeB to nodeA to demonstrate direct bitswap.
// We use Bootstrap() instead of raw Swarm().Connect() because Bootstrap() properly
// integrates peers into the routing system and ensures bitswap learns about them.
fmt.Println("Bootstrapping nodeB to nodeA...")
nodeAPeerInfo := nodeA.Peerstore.PeerInfo(nodeA.Identity)
if err := nodeB.Bootstrap(bootstrap.BootstrapConfigWithPeers([]peer.AddrInfo{nodeAPeerInfo})); err != nil {
panic(fmt.Errorf("failed to bootstrap nodeB to nodeA: %s", err))
}
peerMa := peerAddrs[0].String() + "/p2p/" + nodeA.Identity.String()
fmt.Println("nodeB is now connected to nodeA")
bootstrapNodes := []string{
// In production, use autoconf.FallbackBootstrapPeers from boxo/autoconf
// which includes well-known IPFS bootstrap peers like:
// "/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
// "/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
// "/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ",
// You can add custom peers here. For example, another IPFS node:
// "/ip4/192.0.2.1/tcp/4001/p2p/QmYourPeerID...",
// "/ip4/192.0.2.1/udp/4001/quic-v1/p2p/QmYourPeerID...",
// nodeA's address (the peer we created above that has our test file)
peerMa,
}
fmt.Println("Connecting to peers...")
err = connectToPeers(ctx, ipfsB, bootstrapNodes)
if err != nil {
panic(fmt.Errorf("failed to connect to peers: %s", err))
}
fmt.Println("Connected to peers")
// Since nodeB is directly connected to nodeA, bitswap can fetch the content
// without needing DHT-based provider discovery. In a production scenario where
// nodes aren't directly connected, you'd use Routing().Provide() to announce
// content to the DHT (see docs/config.md#providedhtsweepenabled for background providing).
exampleCIDStr := peerCidFile.RootCid().String()

View File

@ -2,7 +2,6 @@ package cli
import (
"context"
"encoding/json"
"strings"
"testing"
"time"
@ -21,11 +20,6 @@ import (
"github.com/stretchr/testify/require"
)
// swarmPeersOutput is used to parse the JSON output of 'ipfs swarm peers --enc=json'
type swarmPeersOutput struct {
Peers []struct{} `json:"Peers"`
}
func TestRoutingV1Server(t *testing.T) {
t.Parallel()
@ -224,67 +218,43 @@ func TestRoutingV1Server(t *testing.T) {
}
})
t.Run("GetClosestPeers returns peers for self", func(t *testing.T) {
t.Run("GetClosestPeers returns peers", func(t *testing.T) {
t.Parallel()
// Test all DHT-enabled routing types
routingTypes := []string{"auto", "autoclient", "dht", "dhtclient"}
for _, routingType := range routingTypes {
t.Run("routing_type="+routingType, func(t *testing.T) {
t.Parallel()
// Single node with DHT and real bootstrap peers
node := harness.NewT(t).NewNode().Init()
node.UpdateConfig(func(cfg *config.Config) {
cfg.Gateway.ExposeRoutingAPI = config.True
cfg.Routing.Type = config.NewOptionalString(routingType)
// Set real bootstrap peers from boxo/autoconf
cfg.Bootstrap = autoconf.FallbackBootstrapPeers
})
node.StartDaemon()
// Create client before waiting so we can probe DHT readiness
c, err := client.New(node.GatewayURL())
require.NoError(t, err)
// Query for closest peers to our own peer ID
key := peer.ToCid(node.PeerID())
// Wait for node to connect to bootstrap peers and populate WAN DHT routing table
minPeers := len(autoconf.FallbackBootstrapPeers)
require.EventuallyWithT(t, func(t *assert.CollectT) {
res := node.RunIPFS("swarm", "peers", "--enc=json")
var output swarmPeersOutput
err := json.Unmarshal(res.Stdout.Bytes(), &output)
assert.NoError(t, err)
peerCount := len(output.Peers)
// Wait until we have at least minPeers connected
assert.GreaterOrEqual(t, peerCount, minPeers,
"waiting for at least %d bootstrap peers, currently have %d", minPeers, peerCount)
}, 60*time.Second, time.Second)
// Wait for DHT to be ready by probing GetClosestPeers until it succeeds
require.EventuallyWithT(t, func(t *assert.CollectT) {
probeCtx, probeCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer probeCancel()
probeIter, probeErr := c.GetClosestPeers(probeCtx, key)
if probeErr == nil {
probeIter.Close()
// Wait for WAN DHT routing table to be populated
var records []*types.PeerRecord
require.EventuallyWithT(t, func(ct *assert.CollectT) {
resultsIter, err := c.GetClosestPeers(t.Context(), key)
if !assert.NoError(ct, err) {
return
}
assert.NoError(t, probeErr, "DHT should be ready to handle GetClosestPeers")
}, 2*time.Minute, 5*time.Second)
records, err = iter.ReadAllResults(resultsIter)
assert.NoError(ct, err)
}, 10*time.Minute, 5*time.Second)
require.NotEmpty(t, records, "should return peers close to own peerid")
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
resultsIter, err := c.GetClosestPeers(ctx, key)
require.NoError(t, err)
// Per IPIP-0476, GetClosestPeers returns at most 20 peers
assert.LessOrEqual(t, len(records), 20, "IPIP-0476 limits GetClosestPeers to 20 peers")
records, err := iter.ReadAllResults(resultsIter)
require.NoError(t, err)
// Verify we got some peers back from WAN DHT
assert.NotEmpty(t, records, "should return some peers close to own peerid")
// Verify structure of returned records
for _, record := range records {
assert.Equal(t, types.SchemaPeer, record.Schema)
assert.NotNil(t, record.ID)