cleanup Swarm.Peers

This commit was moved from ipfs/go-ipfs-http-client@fd7858dc57
This commit is contained in:
Łukasz Magiera 2019-02-19 23:11:08 +01:00
parent db1af499c9
commit f34788e6ee
7 changed files with 69 additions and 66 deletions

View File

@ -124,7 +124,6 @@ func (api *HttpApi) request(command string, args ...string) *RequestBuilder {
command: command,
args: args,
shell: api,
drainOut: true,
}
}

View File

@ -49,15 +49,15 @@ type apiFile struct {
size int64
path iface.Path
r io.ReadCloser
r *Response
at int64
}
func (f *apiFile) reset() error {
if f.r != nil {
f.r.Close()
f.r.Cancel()
}
req := f.core.request("cat", f.path.String()).NoDrain()
req := f.core.request("cat", f.path.String())
if f.at != 0 {
req.Option("offset", f.at)
}
@ -68,12 +68,12 @@ func (f *apiFile) reset() error {
if resp.Error != nil {
return resp.Error
}
f.r = resp.Output
f.r = resp
return nil
}
func (f *apiFile) Read(p []byte) (int, error) {
n, err := f.r.Read(p)
n, err := f.r.Output.Read(p)
if n > 0 {
f.at += int64(n)
}
@ -92,7 +92,7 @@ func (f *apiFile) Seek(offset int64, whence int) (int64, error) {
}
if f.at < offset && offset-f.at < forwardSeekLimit { //forward skip
r, err := io.CopyN(ioutil.Discard, f.r, offset-f.at)
r, err := io.CopyN(ioutil.Discard, f.r.Output, offset-f.at)
f.at += r
return f.at, err
@ -103,7 +103,7 @@ func (f *apiFile) Seek(offset int64, whence int) (int64, error) {
func (f *apiFile) Close() error {
if f.r != nil {
return f.r.Close()
return f.r.Cancel()
}
return nil
}

View File

@ -60,7 +60,7 @@ type pubsubSub struct {
messages chan pubsubMessage
done chan struct{}
rcloser io.Closer
rcloser func() error
}
type pubsubMessage struct {
@ -113,7 +113,7 @@ func (api *PubsubAPI) Subscribe(ctx context.Context, topic string, opts ...caopt
}
resp, err := api.core().request("pubsub/sub", topic).
Option("discover", options.Discover).NoDrain().Send(ctx)
Option("discover", options.Discover).Send(ctx)
if err != nil {
return nil, err
@ -125,6 +125,9 @@ func (api *PubsubAPI) Subscribe(ctx context.Context, topic string, opts ...caopt
sub := &pubsubSub{
messages: make(chan pubsubMessage),
done: make(chan struct{}),
rcloser: func() error {
return resp.Cancel()
},
}
dec := json.NewDecoder(resp.Output)
@ -159,7 +162,7 @@ func (s *pubsubSub) Close() error {
close(s.done)
s.done = nil
}
return s.rcloser.Close()
return s.rcloser()
}
func (api *PubsubAPI) core() *HttpApi {

View File

@ -13,7 +13,6 @@ type Request struct {
Opts map[string]string
Body io.Reader
Headers map[string]string
DrainOut bool // if set, resp.Close will read all remaining data
}
func NewRequest(ctx context.Context, url, command string, args ...string) *Request {
@ -31,6 +30,5 @@ func NewRequest(ctx context.Context, url, command string, args ...string) *Reque
Args: args,
Opts: opts,
Headers: make(map[string]string),
DrainOut: true,
}
}

View File

@ -19,7 +19,6 @@ type RequestBuilder struct {
opts map[string]string
headers map[string]string
body io.Reader
drainOut bool
shell *HttpApi
}
@ -85,12 +84,6 @@ func (r *RequestBuilder) Header(name, value string) *RequestBuilder {
return r
}
// NoDrain disables output draining in response closer
func (r *RequestBuilder) NoDrain() *RequestBuilder {
r.drainOut = false
return r
}
// Send sends the request and return the response.
func (r *RequestBuilder) Send(ctx context.Context) (*Response, error) {
r.shell.applyGlobal(r)

View File

@ -34,18 +34,13 @@ func (r *trailerReader) Close() error {
type Response struct {
Output io.ReadCloser
Error *Error
drainOutput bool
}
func (r *Response) Close() error {
if r.Output != nil {
// always drain output (response body)
var err1 error
if r.drainOutput {
_, err1 = io.Copy(ioutil.Discard, r.Output)
}
// drain output (response body)
_, err1 := io.Copy(ioutil.Discard, r.Output)
err2 := r.Output.Close()
if err1 != nil {
return err1
@ -55,6 +50,15 @@ func (r *Response) Close() error {
return nil
}
// Cancel aborts running request (without draining request body)
func (r *Response) Cancel() error {
if r.Output != nil {
return r.Output.Close()
}
return nil
}
func (r *Response) Decode(dec interface{}) error {
defer r.Close()
if r.Error != nil {
@ -123,7 +127,6 @@ func (r *Request) Send(c *http.Client) (*Response, error) {
nresp := new(Response)
nresp.drainOutput = r.DrainOut
nresp.Output = &trailerReader{resp}
if resp.StatusCode >= http.StatusBadRequest {
e := &Error{

View File

@ -32,74 +32,81 @@ func (api *SwarmAPI) Disconnect(ctx context.Context, addr multiaddr.Multiaddr) e
return api.core().request("swarm/disconnect", addr.String()).Exec(ctx, nil)
}
type streamInfo struct {
Protocol string
}
type connInfo struct {
Addr string
Peer string
JLatency time.Duration `json:"Latency"`
Muxer string
JDirection inet.Direction `json:"Direction"`
JStreams []streamInfo `json:"Streams"`
}
func (c *connInfo) valid() error {
_, err := multiaddr.NewMultiaddr(c.Addr)
if err != nil {
return err
}
_, err = peer.IDB58Decode(c.Peer)
return err
addr multiaddr.Multiaddr
peer peer.ID
latency time.Duration
muxer string
direction inet.Direction
streams []protocol.ID
}
func (c *connInfo) ID() peer.ID {
id, _ := peer.IDB58Decode(c.Peer)
return id
return c.peer
}
func (c *connInfo) Address() multiaddr.Multiaddr {
a, _ := multiaddr.NewMultiaddr(c.Addr)
return a
return c.addr
}
func (c *connInfo) Direction() inet.Direction {
return c.JDirection
return c.direction
}
func (c *connInfo) Latency() (time.Duration, error) {
return c.JLatency, nil
return c.latency, nil
}
func (c *connInfo) Streams() ([]protocol.ID, error) {
res := make([]protocol.ID, len(c.JStreams))
for i, stream := range c.JStreams {
res[i] = protocol.ID(stream.Protocol)
}
return res, nil
return c.streams, nil
}
func (api *SwarmAPI) Peers(ctx context.Context) ([]iface.ConnectionInfo, error) {
var out struct {
Peers []*connInfo
var resp struct {
Peers []struct{
Addr string
Peer string
Latency time.Duration
Muxer string
Direction inet.Direction
Streams []struct {
Protocol string
}
}
}
err := api.core().request("swarm/peers").
Option("streams", true).
Option("latency", true).
Exec(ctx, &out)
Exec(ctx, &resp)
if err != nil {
return nil, err
}
res := make([]iface.ConnectionInfo, len(out.Peers))
for i, conn := range out.Peers {
if err := conn.valid(); err != nil {
res := make([]iface.ConnectionInfo, len(resp.Peers))
for i, conn := range resp.Peers {
out := &connInfo{
latency: conn.Latency,
muxer: conn.Muxer,
direction: conn.Direction,
}
out.peer, err = peer.IDB58Decode(conn.Peer)
if err != nil {
return nil, err
}
res[i] = conn
out.addr, err = multiaddr.NewMultiaddr(conn.Addr)
if err != nil {
return nil, err
}
out.streams = make([]protocol.ID, len(conn.Streams))
for i, p := range conn.Streams {
out.streams[i] = protocol.ID(p.Protocol)
}
res[i] = out
}
return res, nil