diff --git a/client/httpapi/api.go b/client/httpapi/api.go index e4603bd38..c5a706d05 100644 --- a/client/httpapi/api.go +++ b/client/httpapi/api.go @@ -124,7 +124,6 @@ func (api *HttpApi) request(command string, args ...string) *RequestBuilder { command: command, args: args, shell: api, - drainOut: true, } } diff --git a/client/httpapi/apifile.go b/client/httpapi/apifile.go index e3cb85ea4..a8eb0de1a 100644 --- a/client/httpapi/apifile.go +++ b/client/httpapi/apifile.go @@ -49,15 +49,15 @@ type apiFile struct { size int64 path iface.Path - r io.ReadCloser + r *Response at int64 } func (f *apiFile) reset() error { if f.r != nil { - f.r.Close() + f.r.Cancel() } - req := f.core.request("cat", f.path.String()).NoDrain() + req := f.core.request("cat", f.path.String()) if f.at != 0 { req.Option("offset", f.at) } @@ -68,12 +68,12 @@ func (f *apiFile) reset() error { if resp.Error != nil { return resp.Error } - f.r = resp.Output + f.r = resp return nil } func (f *apiFile) Read(p []byte) (int, error) { - n, err := f.r.Read(p) + n, err := f.r.Output.Read(p) if n > 0 { f.at += int64(n) } @@ -92,7 +92,7 @@ func (f *apiFile) Seek(offset int64, whence int) (int64, error) { } if f.at < offset && offset-f.at < forwardSeekLimit { //forward skip - r, err := io.CopyN(ioutil.Discard, f.r, offset-f.at) + r, err := io.CopyN(ioutil.Discard, f.r.Output, offset-f.at) f.at += r return f.at, err @@ -103,7 +103,7 @@ func (f *apiFile) Seek(offset int64, whence int) (int64, error) { func (f *apiFile) Close() error { if f.r != nil { - return f.r.Close() + return f.r.Cancel() } return nil } diff --git a/client/httpapi/pubsub.go b/client/httpapi/pubsub.go index 49c58ab88..2ac04b53c 100644 --- a/client/httpapi/pubsub.go +++ b/client/httpapi/pubsub.go @@ -60,7 +60,7 @@ type pubsubSub struct { messages chan pubsubMessage done chan struct{} - rcloser io.Closer + rcloser func() error } type pubsubMessage struct { @@ -113,7 +113,7 @@ func (api *PubsubAPI) Subscribe(ctx context.Context, topic string, opts ...caopt } resp, err := api.core().request("pubsub/sub", topic). - Option("discover", options.Discover).NoDrain().Send(ctx) + Option("discover", options.Discover).Send(ctx) if err != nil { return nil, err @@ -125,6 +125,9 @@ func (api *PubsubAPI) Subscribe(ctx context.Context, topic string, opts ...caopt sub := &pubsubSub{ messages: make(chan pubsubMessage), done: make(chan struct{}), + rcloser: func() error { + return resp.Cancel() + }, } dec := json.NewDecoder(resp.Output) @@ -159,7 +162,7 @@ func (s *pubsubSub) Close() error { close(s.done) s.done = nil } - return s.rcloser.Close() + return s.rcloser() } func (api *PubsubAPI) core() *HttpApi { diff --git a/client/httpapi/request.go b/client/httpapi/request.go index d65d63835..f6f7ee486 100644 --- a/client/httpapi/request.go +++ b/client/httpapi/request.go @@ -13,7 +13,6 @@ type Request struct { Opts map[string]string Body io.Reader Headers map[string]string - DrainOut bool // if set, resp.Close will read all remaining data } func NewRequest(ctx context.Context, url, command string, args ...string) *Request { @@ -31,6 +30,5 @@ func NewRequest(ctx context.Context, url, command string, args ...string) *Reque Args: args, Opts: opts, Headers: make(map[string]string), - DrainOut: true, } } diff --git a/client/httpapi/requestbuilder.go b/client/httpapi/requestbuilder.go index c5bc44124..ba407217f 100644 --- a/client/httpapi/requestbuilder.go +++ b/client/httpapi/requestbuilder.go @@ -19,7 +19,6 @@ type RequestBuilder struct { opts map[string]string headers map[string]string body io.Reader - drainOut bool shell *HttpApi } @@ -85,12 +84,6 @@ func (r *RequestBuilder) Header(name, value string) *RequestBuilder { return r } -// NoDrain disables output draining in response closer -func (r *RequestBuilder) NoDrain() *RequestBuilder { - r.drainOut = false - return r -} - // Send sends the request and return the response. func (r *RequestBuilder) Send(ctx context.Context) (*Response, error) { r.shell.applyGlobal(r) diff --git a/client/httpapi/response.go b/client/httpapi/response.go index 708d2714f..c9f178f9c 100644 --- a/client/httpapi/response.go +++ b/client/httpapi/response.go @@ -34,18 +34,13 @@ func (r *trailerReader) Close() error { type Response struct { Output io.ReadCloser Error *Error - - drainOutput bool } func (r *Response) Close() error { if r.Output != nil { - // always drain output (response body) - var err1 error - if r.drainOutput { - _, err1 = io.Copy(ioutil.Discard, r.Output) - } + // drain output (response body) + _, err1 := io.Copy(ioutil.Discard, r.Output) err2 := r.Output.Close() if err1 != nil { return err1 @@ -55,6 +50,15 @@ func (r *Response) Close() error { return nil } +// Cancel aborts running request (without draining request body) +func (r *Response) Cancel() error { + if r.Output != nil { + return r.Output.Close() + } + + return nil +} + func (r *Response) Decode(dec interface{}) error { defer r.Close() if r.Error != nil { @@ -123,7 +127,6 @@ func (r *Request) Send(c *http.Client) (*Response, error) { nresp := new(Response) - nresp.drainOutput = r.DrainOut nresp.Output = &trailerReader{resp} if resp.StatusCode >= http.StatusBadRequest { e := &Error{ diff --git a/client/httpapi/swarm.go b/client/httpapi/swarm.go index 13a814bc4..882711917 100644 --- a/client/httpapi/swarm.go +++ b/client/httpapi/swarm.go @@ -32,74 +32,81 @@ func (api *SwarmAPI) Disconnect(ctx context.Context, addr multiaddr.Multiaddr) e return api.core().request("swarm/disconnect", addr.String()).Exec(ctx, nil) } -type streamInfo struct { - Protocol string -} - type connInfo struct { - Addr string - Peer string - JLatency time.Duration `json:"Latency"` - Muxer string - JDirection inet.Direction `json:"Direction"` - JStreams []streamInfo `json:"Streams"` -} - -func (c *connInfo) valid() error { - _, err := multiaddr.NewMultiaddr(c.Addr) - if err != nil { - return err - } - - _, err = peer.IDB58Decode(c.Peer) - return err + addr multiaddr.Multiaddr + peer peer.ID + latency time.Duration + muxer string + direction inet.Direction + streams []protocol.ID } func (c *connInfo) ID() peer.ID { - id, _ := peer.IDB58Decode(c.Peer) - return id + return c.peer } func (c *connInfo) Address() multiaddr.Multiaddr { - a, _ := multiaddr.NewMultiaddr(c.Addr) - return a + return c.addr } func (c *connInfo) Direction() inet.Direction { - return c.JDirection + return c.direction } func (c *connInfo) Latency() (time.Duration, error) { - return c.JLatency, nil + return c.latency, nil } func (c *connInfo) Streams() ([]protocol.ID, error) { - res := make([]protocol.ID, len(c.JStreams)) - for i, stream := range c.JStreams { - res[i] = protocol.ID(stream.Protocol) - } - return res, nil + return c.streams, nil } func (api *SwarmAPI) Peers(ctx context.Context) ([]iface.ConnectionInfo, error) { - var out struct { - Peers []*connInfo + var resp struct { + Peers []struct{ + Addr string + Peer string + Latency time.Duration + Muxer string + Direction inet.Direction + Streams []struct { + Protocol string + } + } } err := api.core().request("swarm/peers"). Option("streams", true). Option("latency", true). - Exec(ctx, &out) + Exec(ctx, &resp) if err != nil { return nil, err } - res := make([]iface.ConnectionInfo, len(out.Peers)) - for i, conn := range out.Peers { - if err := conn.valid(); err != nil { + res := make([]iface.ConnectionInfo, len(resp.Peers)) + for i, conn := range resp.Peers { + out := &connInfo{ + latency: conn.Latency, + muxer: conn.Muxer, + direction: conn.Direction, + } + + out.peer, err = peer.IDB58Decode(conn.Peer) + if err != nil { return nil, err } - res[i] = conn + + out.addr, err = multiaddr.NewMultiaddr(conn.Addr) + if err != nil { + return nil, err + } + + out.streams = make([]protocol.ID, len(conn.Streams)) + for i, p := range conn.Streams { + out.streams[i] = protocol.ID(p.Protocol) + } + + res[i] = out } return res, nil