fix(daemon): write api/gateway files only after HTTP server is ready

fixes race condition where $IPFS_PATH/api and $IPFS_PATH/gateway files
were written before the HTTP servers were ready to accept connections.
this caused issues for tools like systemd path units that immediately
try to connect when these files appear.

changes:
- add corehttp.ServeWithReady() that signals when server is ready
- wait for ready signal before writing address files
- use sync.WaitGroup.Go() (Go 1.25) for cleaner goroutine management
- add TestAddressFileReady to verify both api and gateway files
This commit is contained in:
Marcin Rataj 2025-12-16 21:53:21 +01:00
parent ff81e3f372
commit 80b703a733
3 changed files with 171 additions and 21 deletions

View File

@ -883,21 +883,35 @@ func serveHTTPApi(req *cmds.Request, cctx *oldcmds.Context) (<-chan error, error
return nil, fmt.Errorf("serveHTTPApi: ConstructNode() failed: %s", err) return nil, fmt.Errorf("serveHTTPApi: ConstructNode() failed: %s", err)
} }
errc := make(chan error)
var wg sync.WaitGroup
// Start first server and wait for it to be ready before writing api file.
// This prevents race conditions where external tools (like systemd path units)
// see the file and try to connect before the server can accept connections.
if len(listeners) > 0 { if len(listeners) > 0 {
// Only add an api file if the API is running. ready := make(chan struct{})
wg.Go(func() {
errc <- corehttp.ServeWithReady(node, manet.NetListener(listeners[0]), ready, opts...)
})
select {
case <-ready:
// Server announced in $IPFS_PATH/api is ready to accept connections
case err := <-errc:
return nil, fmt.Errorf("serveHTTPApi: %w", err)
}
if err := node.Repo.SetAPIAddr(rewriteMaddrToUseLocalhostIfItsAny(listeners[0].Multiaddr())); err != nil { if err := node.Repo.SetAPIAddr(rewriteMaddrToUseLocalhostIfItsAny(listeners[0].Multiaddr())); err != nil {
return nil, fmt.Errorf("serveHTTPApi: SetAPIAddr() failed: %w", err) return nil, fmt.Errorf("serveHTTPApi: SetAPIAddr() failed: %w", err)
} }
}
errc := make(chan error) // Start remaining servers
var wg sync.WaitGroup for _, lis := range listeners[1:] {
for _, apiLis := range listeners { wg.Go(func() {
wg.Add(1) errc <- corehttp.Serve(node, manet.NetListener(lis), opts...)
go func(lis manet.Listener) { })
defer wg.Done() }
errc <- corehttp.Serve(node, manet.NetListener(lis), opts...)
}(apiLis)
} }
go func() { go func() {
@ -1058,24 +1072,39 @@ func serveHTTPGateway(req *cmds.Request, cctx *oldcmds.Context) (<-chan error, e
return nil, fmt.Errorf("serveHTTPGateway: ConstructNode() failed: %s", err) return nil, fmt.Errorf("serveHTTPGateway: ConstructNode() failed: %s", err)
} }
errc := make(chan error)
var wg sync.WaitGroup
// Start first server and wait for it to be ready before writing gateway file.
// This prevents race conditions where external tools (like systemd path units)
// see the file and try to connect before the server can accept connections.
if len(listeners) > 0 { if len(listeners) > 0 {
ready := make(chan struct{})
wg.Go(func() {
errc <- corehttp.ServeWithReady(node, manet.NetListener(listeners[0]), ready, opts...)
})
select {
case <-ready:
// Server announced in $IPFS_PATH/gateway is ready to accept connections
case err := <-errc:
return nil, fmt.Errorf("serveHTTPGateway: %w", err)
}
addr, err := manet.ToNetAddr(rewriteMaddrToUseLocalhostIfItsAny(listeners[0].Multiaddr())) addr, err := manet.ToNetAddr(rewriteMaddrToUseLocalhostIfItsAny(listeners[0].Multiaddr()))
if err != nil { if err != nil {
return nil, fmt.Errorf("serveHTTPGateway: manet.ToIP() failed: %w", err) return nil, fmt.Errorf("serveHTTPGateway: manet.ToNetAddr() failed: %w", err)
} }
if err := node.Repo.SetGatewayAddr(addr); err != nil { if err := node.Repo.SetGatewayAddr(addr); err != nil {
return nil, fmt.Errorf("serveHTTPGateway: SetGatewayAddr() failed: %w", err) return nil, fmt.Errorf("serveHTTPGateway: SetGatewayAddr() failed: %w", err)
} }
}
errc := make(chan error) // Start remaining servers
var wg sync.WaitGroup for _, lis := range listeners[1:] {
for _, lis := range listeners { wg.Go(func() {
wg.Add(1) errc <- corehttp.Serve(node, manet.NetListener(lis), opts...)
go func(lis manet.Listener) { })
defer wg.Done() }
errc <- corehttp.Serve(node, manet.NetListener(lis), opts...)
}(lis)
} }
go func() { go func() {

View File

@ -78,9 +78,23 @@ func ListenAndServe(n *core.IpfsNode, listeningMultiAddr string, options ...Serv
return Serve(n, manet.NetListener(list), options...) return Serve(n, manet.NetListener(list), options...)
} }
// Serve accepts incoming HTTP connections on the listener and pass them // Serve accepts incoming HTTP connections on the listener and passes them
// to ServeOption handlers. // to ServeOption handlers.
func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error { func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error {
return ServeWithReady(node, lis, nil, options...)
}
// ServeWithReady is like Serve but signals on the ready channel when the
// server is about to accept connections. The channel is closed right before
// server.Serve() is called.
//
// This is useful for callers that need to perform actions (like writing
// address files) only after the server is guaranteed to be accepting
// connections, avoiding race conditions where clients see the file before
// the server is ready.
//
// Passing nil for ready is equivalent to calling Serve().
func ServeWithReady(node *core.IpfsNode, lis net.Listener, ready chan<- struct{}, options ...ServeOption) error {
// make sure we close this no matter what. // make sure we close this no matter what.
defer lis.Close() defer lis.Close()
@ -107,6 +121,9 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error
var serverError error var serverError error
serverClosed := make(chan struct{}) serverClosed := make(chan struct{})
go func() { go func() {
if ready != nil {
close(ready)
}
serverError = server.Serve(lis) serverError = server.Serve(lis)
close(serverClosed) close(serverClosed)
}() }()

104
test/cli/api_file_test.go Normal file
View File

@ -0,0 +1,104 @@
package cli
import (
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/stretchr/testify/require"
)
// TestAddressFileReady verifies that when address files ($IPFS_PATH/api and
// $IPFS_PATH/gateway) are created, the corresponding HTTP servers are ready
// to accept connections immediately. This prevents race conditions for tools
// like systemd path units that start services when these files appear.
func TestAddressFileReady(t *testing.T) {
t.Parallel()
t.Run("api file", func(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
node := h.NewNode().Init()
// Start daemon in background (don't use StartDaemon which waits for API)
res := node.Runner.MustRun(harness.RunRequest{
Path: node.IPFSBin,
Args: []string{"daemon"},
RunFunc: (*exec.Cmd).Start,
})
node.Daemon = res
defer node.StopDaemon()
// Poll for api file to appear
apiFile := filepath.Join(node.Dir, "api")
var fileExists bool
for i := 0; i < 100; i++ {
if _, err := os.Stat(apiFile); err == nil {
fileExists = true
break
}
time.Sleep(100 * time.Millisecond)
}
require.True(t, fileExists, "api file should be created")
// Read the api file to get the address
apiAddr, err := node.TryAPIAddr()
require.NoError(t, err)
// Extract IP and port from multiaddr
ip, err := apiAddr.ValueForProtocol(4) // P_IP4
require.NoError(t, err)
port, err := apiAddr.ValueForProtocol(6) // P_TCP
require.NoError(t, err)
// Immediately try to use the API - should work on first attempt
url := "http://" + ip + ":" + port + "/api/v0/id"
resp, err := http.Post(url, "", nil)
require.NoError(t, err, "RPC API should be ready immediately when api file exists")
defer resp.Body.Close()
require.Equal(t, http.StatusOK, resp.StatusCode)
})
t.Run("gateway file", func(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
node := h.NewNode().Init()
// Start daemon in background
res := node.Runner.MustRun(harness.RunRequest{
Path: node.IPFSBin,
Args: []string{"daemon"},
RunFunc: (*exec.Cmd).Start,
})
node.Daemon = res
defer node.StopDaemon()
// Poll for gateway file to appear
gatewayFile := filepath.Join(node.Dir, "gateway")
var fileExists bool
for i := 0; i < 100; i++ {
if _, err := os.Stat(gatewayFile); err == nil {
fileExists = true
break
}
time.Sleep(100 * time.Millisecond)
}
require.True(t, fileExists, "gateway file should be created")
// Read the gateway file to get the URL (already includes http:// prefix)
gatewayURL, err := os.ReadFile(gatewayFile)
require.NoError(t, err)
// Immediately try to use the Gateway - should work on first attempt
url := strings.TrimSpace(string(gatewayURL)) + "/ipfs/bafkqaaa" // empty file CID
resp, err := http.Get(url)
require.NoError(t, err, "Gateway should be ready immediately when gateway file exists")
defer resp.Body.Close()
require.Equal(t, http.StatusOK, resp.StatusCode)
})
}