From d45a0073a5f00fc0c2a7af308ab2523a2b1c795e Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Fri, 19 Dec 2025 19:39:42 +0100 Subject: [PATCH] fix(test): fix flaky kubo-as-a-library and GetClosestPeers tests kubo-as-a-library: use `Bootstrap()` instead of raw `Swarm().Connect()` to fix race condition between swarm connection and bitswap peer discovery. `Bootstrap()` properly integrates peers into the routing system, ensuring bitswap learns about connected peers synchronously. GetClosestPeers: simplify retry logic using `EventuallyWithT` with 10-minute timeout. tests all 4 routing types (`auto`, `autoclient`, `dht`, `dhtclient`) against real bootstrap peers with patient polling. --- docs/examples/kubo-as-a-library/main.go | 91 ++++++------------- .../delegated_routing_v1_http_server_test.go | 58 +++--------- 2 files changed, 40 insertions(+), 109 deletions(-) diff --git a/docs/examples/kubo-as-a-library/main.go b/docs/examples/kubo-as-a-library/main.go index ffa86c7f0..8edfa1aeb 100644 --- a/docs/examples/kubo-as-a-library/main.go +++ b/docs/examples/kubo-as-a-library/main.go @@ -5,17 +5,16 @@ import ( "flag" "fmt" "io" - "log" "os" "path/filepath" "strings" "sync" "time" + "github.com/ipfs/boxo/bootstrap" "github.com/ipfs/boxo/files" "github.com/ipfs/boxo/path" icore "github.com/ipfs/kubo/core/coreiface" - ma "github.com/multiformats/go-multiaddr" "github.com/ipfs/kubo/config" "github.com/ipfs/kubo/core" @@ -68,6 +67,10 @@ func createTempRepo(swarmPort int) (string, error) { fmt.Sprintf("/ip4/0.0.0.0/udp/%d/webrtc-direct", swarmPort), } + // Disable auto-bootstrap. For this example, we manually connect only the peers we need. + // In production, you'd typically keep the default bootstrap peers to join the network. + cfg.Bootstrap = []string{} + // When creating the repository, you can define custom settings on the repository, such as enabling experimental // features (See experimental-features.md) or customizing the gateway endpoint. // To do such things, you should modify the variable `cfg`. For example: @@ -144,40 +147,6 @@ func spawnEphemeral(ctx context.Context, swarmPort int) (icore.CoreAPI, *core.Ip return api, node, err } -func connectToPeers(ctx context.Context, ipfs icore.CoreAPI, peers []string) error { - var wg sync.WaitGroup - peerInfos := make(map[peer.ID]*peer.AddrInfo, len(peers)) - for _, addrStr := range peers { - addr, err := ma.NewMultiaddr(addrStr) - if err != nil { - return err - } - pii, err := peer.AddrInfoFromP2pAddr(addr) - if err != nil { - return err - } - pi, ok := peerInfos[pii.ID] - if !ok { - pi = &peer.AddrInfo{ID: pii.ID} - peerInfos[pi.ID] = pi - } - pi.Addrs = append(pi.Addrs, pii.Addrs...) - } - - wg.Add(len(peerInfos)) - for _, peerInfo := range peerInfos { - go func(peerInfo *peer.AddrInfo) { - defer wg.Done() - err := ipfs.Swarm().Connect(ctx, *peerInfo) - if err != nil { - log.Printf("failed to connect to %s: %s", peerInfo.ID, err) - } - }(peerInfo) - } - wg.Wait() - return nil -} - func getUnixfsNode(path string) (files.Node, error) { st, err := os.Stat(path) if err != nil { @@ -224,7 +193,7 @@ func main() { // Spawn a node using a temporary path, creating a temporary repo for the run // Using port 4011 (different from nodeA's port 4010) fmt.Println("Spawning Kubo node on a temporary repo") - ipfsB, _, err := spawnEphemeral(ctx, 4011) + ipfsB, nodeB, err := spawnEphemeral(ctx, 4011) if err != nil { panic(fmt.Errorf("failed to spawn ephemeral node: %s", err)) } @@ -297,38 +266,30 @@ func main() { fmt.Printf("Got directory back from IPFS (IPFS path: %s) and wrote it to %s\n", cidDirectory.String(), outputPathDirectory) - /// --- Part IV: Getting a file from the IPFS Network + /// --- Part IV: Getting a file from another IPFS node - fmt.Println("\n-- Going to connect to a few nodes in the Network as bootstrappers --") + fmt.Println("\n-- Connecting to nodeA and fetching content via bitswap --") - // Get nodeA's address so we can fetch the file we added to it - peerAddrs, err := ipfsA.Swarm().LocalAddrs(ctx) - if err != nil { - panic(fmt.Errorf("could not get peer addresses: %s", err)) + // In production, you'd typically connect to bootstrap peers to join the IPFS network. + // autoconf.FallbackBootstrapPeers from boxo/autoconf provides well-known bootstrap peers: + // "/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", + // "/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa", + // "/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", + // + // For this example, we only connect nodeB to nodeA to demonstrate direct bitswap. + // We use Bootstrap() instead of raw Swarm().Connect() because Bootstrap() properly + // integrates peers into the routing system and ensures bitswap learns about them. + fmt.Println("Bootstrapping nodeB to nodeA...") + nodeAPeerInfo := nodeA.Peerstore.PeerInfo(nodeA.Identity) + if err := nodeB.Bootstrap(bootstrap.BootstrapConfigWithPeers([]peer.AddrInfo{nodeAPeerInfo})); err != nil { + panic(fmt.Errorf("failed to bootstrap nodeB to nodeA: %s", err)) } - peerMa := peerAddrs[0].String() + "/p2p/" + nodeA.Identity.String() + fmt.Println("nodeB is now connected to nodeA") - bootstrapNodes := []string{ - // In production, use autoconf.FallbackBootstrapPeers from boxo/autoconf - // which includes well-known IPFS bootstrap peers like: - // "/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", - // "/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa", - // "/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", - - // You can add custom peers here. For example, another IPFS node: - // "/ip4/192.0.2.1/tcp/4001/p2p/QmYourPeerID...", - // "/ip4/192.0.2.1/udp/4001/quic-v1/p2p/QmYourPeerID...", - - // nodeA's address (the peer we created above that has our test file) - peerMa, - } - - fmt.Println("Connecting to peers...") - err = connectToPeers(ctx, ipfsB, bootstrapNodes) - if err != nil { - panic(fmt.Errorf("failed to connect to peers: %s", err)) - } - fmt.Println("Connected to peers") + // Since nodeB is directly connected to nodeA, bitswap can fetch the content + // without needing DHT-based provider discovery. In a production scenario where + // nodes aren't directly connected, you'd use Routing().Provide() to announce + // content to the DHT (see docs/config.md#providedhtsweepenabled for background providing). exampleCIDStr := peerCidFile.RootCid().String() diff --git a/test/cli/delegated_routing_v1_http_server_test.go b/test/cli/delegated_routing_v1_http_server_test.go index 7883fa793..51a6834e1 100644 --- a/test/cli/delegated_routing_v1_http_server_test.go +++ b/test/cli/delegated_routing_v1_http_server_test.go @@ -2,7 +2,6 @@ package cli import ( "context" - "encoding/json" "strings" "testing" "time" @@ -21,11 +20,6 @@ import ( "github.com/stretchr/testify/require" ) -// swarmPeersOutput is used to parse the JSON output of 'ipfs swarm peers --enc=json' -type swarmPeersOutput struct { - Peers []struct{} `json:"Peers"` -} - func TestRoutingV1Server(t *testing.T) { t.Parallel() @@ -224,67 +218,43 @@ func TestRoutingV1Server(t *testing.T) { } }) - t.Run("GetClosestPeers returns peers for self", func(t *testing.T) { + t.Run("GetClosestPeers returns peers", func(t *testing.T) { t.Parallel() + // Test all DHT-enabled routing types routingTypes := []string{"auto", "autoclient", "dht", "dhtclient"} for _, routingType := range routingTypes { t.Run("routing_type="+routingType, func(t *testing.T) { t.Parallel() - // Single node with DHT and real bootstrap peers node := harness.NewT(t).NewNode().Init() node.UpdateConfig(func(cfg *config.Config) { cfg.Gateway.ExposeRoutingAPI = config.True cfg.Routing.Type = config.NewOptionalString(routingType) - // Set real bootstrap peers from boxo/autoconf cfg.Bootstrap = autoconf.FallbackBootstrapPeers }) node.StartDaemon() - // Create client before waiting so we can probe DHT readiness c, err := client.New(node.GatewayURL()) require.NoError(t, err) - // Query for closest peers to our own peer ID key := peer.ToCid(node.PeerID()) - // Wait for node to connect to bootstrap peers and populate WAN DHT routing table - minPeers := len(autoconf.FallbackBootstrapPeers) - require.EventuallyWithT(t, func(t *assert.CollectT) { - res := node.RunIPFS("swarm", "peers", "--enc=json") - var output swarmPeersOutput - err := json.Unmarshal(res.Stdout.Bytes(), &output) - assert.NoError(t, err) - peerCount := len(output.Peers) - // Wait until we have at least minPeers connected - assert.GreaterOrEqual(t, peerCount, minPeers, - "waiting for at least %d bootstrap peers, currently have %d", minPeers, peerCount) - }, 60*time.Second, time.Second) - - // Wait for DHT to be ready by probing GetClosestPeers until it succeeds - require.EventuallyWithT(t, func(t *assert.CollectT) { - probeCtx, probeCancel := context.WithTimeout(context.Background(), 30*time.Second) - defer probeCancel() - probeIter, probeErr := c.GetClosestPeers(probeCtx, key) - if probeErr == nil { - probeIter.Close() + // Wait for WAN DHT routing table to be populated + var records []*types.PeerRecord + require.EventuallyWithT(t, func(ct *assert.CollectT) { + resultsIter, err := c.GetClosestPeers(t.Context(), key) + if !assert.NoError(ct, err) { + return } - assert.NoError(t, probeErr, "DHT should be ready to handle GetClosestPeers") - }, 2*time.Minute, 5*time.Second) + records, err = iter.ReadAllResults(resultsIter) + assert.NoError(ct, err) + }, 10*time.Minute, 5*time.Second) + require.NotEmpty(t, records, "should return peers close to own peerid") - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() - resultsIter, err := c.GetClosestPeers(ctx, key) - require.NoError(t, err) + // Per IPIP-0476, GetClosestPeers returns at most 20 peers + assert.LessOrEqual(t, len(records), 20, "IPIP-0476 limits GetClosestPeers to 20 peers") - records, err := iter.ReadAllResults(resultsIter) - require.NoError(t, err) - - // Verify we got some peers back from WAN DHT - assert.NotEmpty(t, records, "should return some peers close to own peerid") - - // Verify structure of returned records for _, record := range records { assert.Equal(t, types.SchemaPeer, record.Schema) assert.NotNil(t, record.ID)