mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 10:27:46 +08:00
fix(daemon): write api/gateway files only after HTTP server is ready
fixes race condition where $IPFS_PATH/api and $IPFS_PATH/gateway files were written before the HTTP servers were ready to accept connections. this caused issues for tools like systemd path units that immediately try to connect when these files appear. changes: - add corehttp.ServeWithReady() that signals when server is ready - wait for ready signal before writing address files - use sync.WaitGroup.Go() (Go 1.25) for cleaner goroutine management - add TestAddressFileReady to verify both api and gateway files
This commit is contained in:
parent
ff81e3f372
commit
80b703a733
@ -883,21 +883,35 @@ func serveHTTPApi(req *cmds.Request, cctx *oldcmds.Context) (<-chan error, error
|
|||||||
return nil, fmt.Errorf("serveHTTPApi: ConstructNode() failed: %s", err)
|
return nil, fmt.Errorf("serveHTTPApi: ConstructNode() failed: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
errc := make(chan error)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
// Start first server and wait for it to be ready before writing api file.
|
||||||
|
// This prevents race conditions where external tools (like systemd path units)
|
||||||
|
// see the file and try to connect before the server can accept connections.
|
||||||
if len(listeners) > 0 {
|
if len(listeners) > 0 {
|
||||||
// Only add an api file if the API is running.
|
ready := make(chan struct{})
|
||||||
|
wg.Go(func() {
|
||||||
|
errc <- corehttp.ServeWithReady(node, manet.NetListener(listeners[0]), ready, opts...)
|
||||||
|
})
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ready:
|
||||||
|
// Server announced in $IPFS_PATH/api is ready to accept connections
|
||||||
|
case err := <-errc:
|
||||||
|
return nil, fmt.Errorf("serveHTTPApi: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
if err := node.Repo.SetAPIAddr(rewriteMaddrToUseLocalhostIfItsAny(listeners[0].Multiaddr())); err != nil {
|
if err := node.Repo.SetAPIAddr(rewriteMaddrToUseLocalhostIfItsAny(listeners[0].Multiaddr())); err != nil {
|
||||||
return nil, fmt.Errorf("serveHTTPApi: SetAPIAddr() failed: %w", err)
|
return nil, fmt.Errorf("serveHTTPApi: SetAPIAddr() failed: %w", err)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
errc := make(chan error)
|
// Start remaining servers
|
||||||
var wg sync.WaitGroup
|
for _, lis := range listeners[1:] {
|
||||||
for _, apiLis := range listeners {
|
wg.Go(func() {
|
||||||
wg.Add(1)
|
errc <- corehttp.Serve(node, manet.NetListener(lis), opts...)
|
||||||
go func(lis manet.Listener) {
|
})
|
||||||
defer wg.Done()
|
}
|
||||||
errc <- corehttp.Serve(node, manet.NetListener(lis), opts...)
|
|
||||||
}(apiLis)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
@ -1058,24 +1072,39 @@ func serveHTTPGateway(req *cmds.Request, cctx *oldcmds.Context) (<-chan error, e
|
|||||||
return nil, fmt.Errorf("serveHTTPGateway: ConstructNode() failed: %s", err)
|
return nil, fmt.Errorf("serveHTTPGateway: ConstructNode() failed: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
errc := make(chan error)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
// Start first server and wait for it to be ready before writing gateway file.
|
||||||
|
// This prevents race conditions where external tools (like systemd path units)
|
||||||
|
// see the file and try to connect before the server can accept connections.
|
||||||
if len(listeners) > 0 {
|
if len(listeners) > 0 {
|
||||||
|
ready := make(chan struct{})
|
||||||
|
wg.Go(func() {
|
||||||
|
errc <- corehttp.ServeWithReady(node, manet.NetListener(listeners[0]), ready, opts...)
|
||||||
|
})
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ready:
|
||||||
|
// Server announced in $IPFS_PATH/gateway is ready to accept connections
|
||||||
|
case err := <-errc:
|
||||||
|
return nil, fmt.Errorf("serveHTTPGateway: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
addr, err := manet.ToNetAddr(rewriteMaddrToUseLocalhostIfItsAny(listeners[0].Multiaddr()))
|
addr, err := manet.ToNetAddr(rewriteMaddrToUseLocalhostIfItsAny(listeners[0].Multiaddr()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("serveHTTPGateway: manet.ToIP() failed: %w", err)
|
return nil, fmt.Errorf("serveHTTPGateway: manet.ToNetAddr() failed: %w", err)
|
||||||
}
|
}
|
||||||
if err := node.Repo.SetGatewayAddr(addr); err != nil {
|
if err := node.Repo.SetGatewayAddr(addr); err != nil {
|
||||||
return nil, fmt.Errorf("serveHTTPGateway: SetGatewayAddr() failed: %w", err)
|
return nil, fmt.Errorf("serveHTTPGateway: SetGatewayAddr() failed: %w", err)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
errc := make(chan error)
|
// Start remaining servers
|
||||||
var wg sync.WaitGroup
|
for _, lis := range listeners[1:] {
|
||||||
for _, lis := range listeners {
|
wg.Go(func() {
|
||||||
wg.Add(1)
|
errc <- corehttp.Serve(node, manet.NetListener(lis), opts...)
|
||||||
go func(lis manet.Listener) {
|
})
|
||||||
defer wg.Done()
|
}
|
||||||
errc <- corehttp.Serve(node, manet.NetListener(lis), opts...)
|
|
||||||
}(lis)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
|||||||
@ -78,9 +78,23 @@ func ListenAndServe(n *core.IpfsNode, listeningMultiAddr string, options ...Serv
|
|||||||
return Serve(n, manet.NetListener(list), options...)
|
return Serve(n, manet.NetListener(list), options...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Serve accepts incoming HTTP connections on the listener and pass them
|
// Serve accepts incoming HTTP connections on the listener and passes them
|
||||||
// to ServeOption handlers.
|
// to ServeOption handlers.
|
||||||
func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error {
|
func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error {
|
||||||
|
return ServeWithReady(node, lis, nil, options...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServeWithReady is like Serve but signals on the ready channel when the
|
||||||
|
// server is about to accept connections. The channel is closed right before
|
||||||
|
// server.Serve() is called.
|
||||||
|
//
|
||||||
|
// This is useful for callers that need to perform actions (like writing
|
||||||
|
// address files) only after the server is guaranteed to be accepting
|
||||||
|
// connections, avoiding race conditions where clients see the file before
|
||||||
|
// the server is ready.
|
||||||
|
//
|
||||||
|
// Passing nil for ready is equivalent to calling Serve().
|
||||||
|
func ServeWithReady(node *core.IpfsNode, lis net.Listener, ready chan<- struct{}, options ...ServeOption) error {
|
||||||
// make sure we close this no matter what.
|
// make sure we close this no matter what.
|
||||||
defer lis.Close()
|
defer lis.Close()
|
||||||
|
|
||||||
@ -107,6 +121,9 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error
|
|||||||
var serverError error
|
var serverError error
|
||||||
serverClosed := make(chan struct{})
|
serverClosed := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
|
if ready != nil {
|
||||||
|
close(ready)
|
||||||
|
}
|
||||||
serverError = server.Serve(lis)
|
serverError = server.Serve(lis)
|
||||||
close(serverClosed)
|
close(serverClosed)
|
||||||
}()
|
}()
|
||||||
|
|||||||
104
test/cli/api_file_test.go
Normal file
104
test/cli/api_file_test.go
Normal file
@ -0,0 +1,104 @@
|
|||||||
|
package cli
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ipfs/kubo/test/cli/harness"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestAddressFileReady verifies that when address files ($IPFS_PATH/api and
|
||||||
|
// $IPFS_PATH/gateway) are created, the corresponding HTTP servers are ready
|
||||||
|
// to accept connections immediately. This prevents race conditions for tools
|
||||||
|
// like systemd path units that start services when these files appear.
|
||||||
|
func TestAddressFileReady(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
t.Run("api file", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
h := harness.NewT(t)
|
||||||
|
node := h.NewNode().Init()
|
||||||
|
|
||||||
|
// Start daemon in background (don't use StartDaemon which waits for API)
|
||||||
|
res := node.Runner.MustRun(harness.RunRequest{
|
||||||
|
Path: node.IPFSBin,
|
||||||
|
Args: []string{"daemon"},
|
||||||
|
RunFunc: (*exec.Cmd).Start,
|
||||||
|
})
|
||||||
|
node.Daemon = res
|
||||||
|
defer node.StopDaemon()
|
||||||
|
|
||||||
|
// Poll for api file to appear
|
||||||
|
apiFile := filepath.Join(node.Dir, "api")
|
||||||
|
var fileExists bool
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
if _, err := os.Stat(apiFile); err == nil {
|
||||||
|
fileExists = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
require.True(t, fileExists, "api file should be created")
|
||||||
|
|
||||||
|
// Read the api file to get the address
|
||||||
|
apiAddr, err := node.TryAPIAddr()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Extract IP and port from multiaddr
|
||||||
|
ip, err := apiAddr.ValueForProtocol(4) // P_IP4
|
||||||
|
require.NoError(t, err)
|
||||||
|
port, err := apiAddr.ValueForProtocol(6) // P_TCP
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Immediately try to use the API - should work on first attempt
|
||||||
|
url := "http://" + ip + ":" + port + "/api/v0/id"
|
||||||
|
resp, err := http.Post(url, "", nil)
|
||||||
|
require.NoError(t, err, "RPC API should be ready immediately when api file exists")
|
||||||
|
defer resp.Body.Close()
|
||||||
|
require.Equal(t, http.StatusOK, resp.StatusCode)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("gateway file", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
h := harness.NewT(t)
|
||||||
|
node := h.NewNode().Init()
|
||||||
|
|
||||||
|
// Start daemon in background
|
||||||
|
res := node.Runner.MustRun(harness.RunRequest{
|
||||||
|
Path: node.IPFSBin,
|
||||||
|
Args: []string{"daemon"},
|
||||||
|
RunFunc: (*exec.Cmd).Start,
|
||||||
|
})
|
||||||
|
node.Daemon = res
|
||||||
|
defer node.StopDaemon()
|
||||||
|
|
||||||
|
// Poll for gateway file to appear
|
||||||
|
gatewayFile := filepath.Join(node.Dir, "gateway")
|
||||||
|
var fileExists bool
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
if _, err := os.Stat(gatewayFile); err == nil {
|
||||||
|
fileExists = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
require.True(t, fileExists, "gateway file should be created")
|
||||||
|
|
||||||
|
// Read the gateway file to get the URL (already includes http:// prefix)
|
||||||
|
gatewayURL, err := os.ReadFile(gatewayFile)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Immediately try to use the Gateway - should work on first attempt
|
||||||
|
url := strings.TrimSpace(string(gatewayURL)) + "/ipfs/bafkqaaa" // empty file CID
|
||||||
|
resp, err := http.Get(url)
|
||||||
|
require.NoError(t, err, "Gateway should be ready immediately when gateway file exists")
|
||||||
|
defer resp.Body.Close()
|
||||||
|
require.Equal(t, http.StatusOK, resp.StatusCode)
|
||||||
|
})
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user