mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 10:27:46 +08:00
Merge pull request #2267 from rht/wire/ctx-http-request-2
Directly wire ctx into http request
This commit is contained in:
commit
b11abe283b
@ -39,6 +39,7 @@ var log = logging.Logger("cmd/ipfs")
|
||||
var (
|
||||
errUnexpectedApiOutput = errors.New("api returned unexpected output")
|
||||
errApiVersionMismatch = errors.New("api version mismatch")
|
||||
errRequestCanceled = errors.New("request canceled")
|
||||
)
|
||||
|
||||
const (
|
||||
@ -328,7 +329,7 @@ func callCommand(ctx context.Context, req cmds.Request, root *cmds.Command, cmd
|
||||
if isConnRefused(err) {
|
||||
err = repo.ErrApiNotRunning
|
||||
}
|
||||
return nil, err
|
||||
return nil, wrapContextCanceled(err)
|
||||
}
|
||||
|
||||
} else {
|
||||
@ -625,3 +626,10 @@ func isConnRefused(err error) bool {
|
||||
|
||||
return netoperr.Op == "dial"
|
||||
}
|
||||
|
||||
func wrapContextCanceled(err error) error {
|
||||
if strings.Contains(err.Error(), "request canceled") {
|
||||
err = errRequestCanceled
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@ -34,20 +34,13 @@ type Client interface {
|
||||
|
||||
type client struct {
|
||||
serverAddress string
|
||||
httpClient http.Client
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
func NewClient(address string) Client {
|
||||
// We cannot use the default transport because of a bug in go's connection reuse
|
||||
// code. It causes random failures in the connection including io.EOF and connection
|
||||
// refused on 'client.Do'
|
||||
return &client{
|
||||
serverAddress: address,
|
||||
httpClient: http.Client{
|
||||
Transport: &http.Transport{
|
||||
DisableKeepAlives: true,
|
||||
},
|
||||
},
|
||||
httpClient: http.DefaultClient,
|
||||
}
|
||||
}
|
||||
|
||||
@ -101,46 +94,28 @@ func (c *client) Send(req cmds.Request) (cmds.Response, error) {
|
||||
}
|
||||
httpReq.Header.Set(uaHeader, config.ApiVersion)
|
||||
|
||||
ec := make(chan error, 1)
|
||||
rc := make(chan cmds.Response, 1)
|
||||
dc := req.Context().Done()
|
||||
httpReq.Cancel = req.Context().Done()
|
||||
httpReq.Close = true
|
||||
|
||||
go func() {
|
||||
httpRes, err := c.httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
ec <- err
|
||||
return
|
||||
}
|
||||
|
||||
// using the overridden JSON encoding in request
|
||||
res, err := getResponse(httpRes, req)
|
||||
if err != nil {
|
||||
ec <- err
|
||||
return
|
||||
}
|
||||
|
||||
rc <- res
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-dc:
|
||||
log.Debug("Context cancelled, cancelling HTTP request...")
|
||||
tr := http.DefaultTransport.(*http.Transport)
|
||||
tr.CancelRequest(httpReq)
|
||||
dc = nil // Wait for ec or rc
|
||||
case err := <-ec:
|
||||
return nil, err
|
||||
case res := <-rc:
|
||||
if found && len(previousUserProvidedEncoding) > 0 {
|
||||
// reset to user provided encoding after sending request
|
||||
// NB: if user has provided an encoding but it is the empty string,
|
||||
// still leave it as JSON.
|
||||
req.SetOption(cmds.EncShort, previousUserProvidedEncoding)
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
httpRes, err := c.httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// using the overridden JSON encoding in request
|
||||
res, err := getResponse(httpRes, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if found && len(previousUserProvidedEncoding) > 0 {
|
||||
// reset to user provided encoding after sending request
|
||||
// NB: if user has provided an encoding but it is the empty string,
|
||||
// still leave it as JSON.
|
||||
req.SetOption(cmds.EncShort, previousUserProvidedEncoding)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func getQuery(req cmds.Request) (string, error) {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user