From 8948519590571222a3161a3dccdfceaa577c0669 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 30 May 2017 17:13:44 +0200 Subject: [PATCH] Corenet API: Store state in node MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Ɓukasz Magiera --- core/commands/corenet.go | 233 ++++++++------------------- core/core.go | 4 + corenet/apps.go | 58 +++++++ corenet/corenet.go | 10 ++ {core/corenet => corenet/net}/net.go | 2 +- corenet/streams.go | 60 +++++++ 6 files changed, 203 insertions(+), 164 deletions(-) create mode 100644 corenet/apps.go create mode 100644 corenet/corenet.go rename {core/corenet => corenet/net}/net.go (99%) create mode 100644 corenet/streams.go diff --git a/core/commands/corenet.go b/core/commands/corenet.go index 9c684d45b..526c1820f 100644 --- a/core/commands/corenet.go +++ b/core/commands/corenet.go @@ -10,20 +10,22 @@ import ( cmds "github.com/ipfs/go-ipfs/commands" core "github.com/ipfs/go-ipfs/core" - corenet "github.com/ipfs/go-ipfs/core/corenet" + corenet "github.com/ipfs/go-ipfs/corenet" + cnet "github.com/ipfs/go-ipfs/corenet/net" peerstore "gx/ipfs/QmNUVzEjq3XWJ89hegahPvyfJbTXgTaom48pLb7YBD9gHQ/go-libp2p-peerstore" net "gx/ipfs/QmVHSBsn8LEeay8m5ERebgUVuhzw838PsyTttCmP6GMJkg/go-libp2p-net" ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr" - peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer" manet "gx/ipfs/Qmf1Gq7N45Rpuw7ev47uWgH6dLPtdnvcMRNPkVBwqjLJg2/go-multiaddr-net" ) +// CorenetAppInfoOutput is output type of ls command type CorenetAppInfoOutput struct { Protocol string Address string } +// CorenetStreamInfoOutput is output type of streams command type CorenetStreamInfoOutput struct { HandlerID string Protocol string @@ -33,116 +35,16 @@ type CorenetStreamInfoOutput struct { RemoteAddress string } +// CorenetLsOutput is output type of ls command type CorenetLsOutput struct { Apps []CorenetAppInfoOutput } +// CorenetStreamsOutput is output type of streams command type CorenetStreamsOutput struct { Streams []CorenetStreamInfoOutput } -// cnAppInfo holds information on a local application protocol listener service. -type cnAppInfo struct { - // Application protocol identifier. - protocol string - - // Node identity - identity peer.ID - - // Local protocol stream address. - address ma.Multiaddr - - // Local protocol stream listener. - closer io.Closer - - // Flag indicating whether we're still accepting incoming connections, or - // whether this application listener has been shutdown. - running bool -} - -func (c *cnAppInfo) Close() error { - apps.Deregister(c.protocol) - c.closer.Close() - return nil -} - -// cnAppRegistry is a collection of local application protocol listeners. -type cnAppRegistry struct { - apps []*cnAppInfo -} - -func (c *cnAppRegistry) Register(appInfo *cnAppInfo) { - c.apps = append(c.apps, appInfo) -} - -func (c *cnAppRegistry) Deregister(proto string) { - foundAt := -1 - for i, a := range c.apps { - if a.protocol == proto { - foundAt = i - break - } - } - - if foundAt != -1 { - c.apps = append(c.apps[:foundAt], c.apps[foundAt+1:]...) - } -} - -// cnStreamInfo holds information on active incoming and outgoing protocol app streams. -type cnStreamInfo struct { - handlerID uint64 - - protocol string - - localPeer peer.ID - localAddr ma.Multiaddr - - remotePeer peer.ID - remoteAddr ma.Multiaddr - - local io.ReadWriteCloser - remote io.ReadWriteCloser -} - -func (c *cnStreamInfo) Close() error { - c.local.Close() - c.remote.Close() - streams.Deregister(c.handlerID) - return nil -} - -// cnStreamRegistry is a collection of active incoming and outgoing protocol app streams. -type cnStreamRegistry struct { - streams []*cnStreamInfo - - nextID uint64 -} - -func (c *cnStreamRegistry) Register(streamInfo *cnStreamInfo) { - streamInfo.handlerID = c.nextID - c.streams = append(c.streams, streamInfo) - c.nextID++ -} - -func (c *cnStreamRegistry) Deregister(handlerID uint64) { - foundAt := -1 - for i, s := range c.streams { - if s.handlerID == handlerID { - foundAt = i - break - } - } - - if foundAt != -1 { - c.streams = append(c.streams[:foundAt], c.streams[foundAt+1:]...) - } -} - -//TODO: Ideally I'd like to see these combined into a module in core. -var apps cnAppRegistry -var streams cnStreamRegistry - var CorenetCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Libp2p stream mounting.", @@ -188,10 +90,10 @@ var CorenetLsCmd = &cmds.Command{ output := &CorenetLsOutput{} - for _, a := range apps.apps { + for _, app := range n.Corenet.Apps.Apps { output.Apps = append(output.Apps, CorenetAppInfoOutput{ - Protocol: a.protocol, - Address: a.address.String(), + Protocol: app.Protocol, + Address: app.Address.String(), }) } @@ -245,17 +147,17 @@ var CorenetStreamsCmd = &cmds.Command{ output := &CorenetStreamsOutput{} - for _, s := range streams.streams { + for _, s := range n.Corenet.Streams.Streams { output.Streams = append(output.Streams, CorenetStreamInfoOutput{ - HandlerID: strconv.FormatUint(s.handlerID, 10), + HandlerID: strconv.FormatUint(s.HandlerID, 10), - Protocol: s.protocol, + Protocol: s.Protocol, - LocalPeer: s.localPeer.Pretty(), - LocalAddress: s.localAddr.String(), + LocalPeer: s.LocalPeer.Pretty(), + LocalAddress: s.LocalAddr.String(), - RemotePeer: s.remotePeer.Pretty(), - RemoteAddress: s.remoteAddr.String(), + RemotePeer: s.RemotePeer.Pretty(), + RemoteAddress: s.RemoteAddr.String(), }) } @@ -320,23 +222,24 @@ var CorenetListenCmd = &cmds.Command{ return } - listener, err := corenet.Listen(n, proto) + listener, err := cnet.Listen(n, proto) if err != nil { res.SetError(err, cmds.ErrNormal) return } - app := cnAppInfo{ - identity: n.Identity, - protocol: proto, - address: addr, - closer: listener, - running: true, + app := corenet.AppInfo{ + Identity: n.Identity, + Protocol: proto, + Address: addr, + Closer: listener, + Running: true, + Registry: &n.Corenet.Apps, } - go acceptStreams(&app, listener) + go acceptStreams(n, &app, listener) - apps.Register(&app) + n.Corenet.Apps.Register(&app) // Successful response. res.SetOutput(&CorenetAppInfoOutput{ @@ -356,47 +259,49 @@ func checkProtoExists(protos []string, proto string) bool { return false } -func acceptStreams(app *cnAppInfo, listener corenet.Listener) { - for app.running { +func acceptStreams(n *core.IpfsNode, app *corenet.AppInfo, listener cnet.Listener) { + for app.Running { remote, err := listener.Accept() if err != nil { listener.Close() break } - local, err := manet.Dial(app.address) + local, err := manet.Dial(app.Address) if err != nil { remote.Close() continue } - stream := cnStreamInfo{ - protocol: app.protocol, + stream := corenet.StreamInfo{ + Protocol: app.Protocol, - localPeer: app.identity, - localAddr: app.address, + LocalPeer: app.Identity, + LocalAddr: app.Address, - remotePeer: remote.Conn().RemotePeer(), - remoteAddr: remote.Conn().RemoteMultiaddr(), + RemotePeer: remote.Conn().RemotePeer(), + RemoteAddr: remote.Conn().RemoteMultiaddr(), - local: local, - remote: remote, + Local: local, + Remote: remote, + + Registry: &n.Corenet.Streams, } - streams.Register(&stream) + n.Corenet.Streams.Register(&stream) startStreaming(&stream) } - apps.Deregister(app.protocol) + n.Corenet.Apps.Deregister(app.Protocol) } -func startStreaming(stream *cnStreamInfo) { +func startStreaming(stream *corenet.StreamInfo) { go func() { - io.Copy(stream.local, stream.remote) + io.Copy(stream.Local, stream.Remote) stream.Close() }() go func() { - io.Copy(stream.remote, stream.local) + io.Copy(stream.Remote, stream.Local) stream.Close() }() } @@ -451,14 +356,14 @@ var CorenetDialCmd = &cmds.Command{ return } - app := cnAppInfo{ - identity: n.Identity, - protocol: proto, + app := corenet.AppInfo{ + Identity: n.Identity, + Protocol: proto, } n.Peerstore.AddAddr(peer, addr, peerstore.TempAddrTTL) - remote, err := corenet.Dial(n, peer, proto) + remote, err := cnet.Dial(n, peer, proto) if err != nil { res.SetError(err, cmds.ErrNormal) return @@ -475,11 +380,11 @@ var CorenetDialCmd = &cmds.Command{ return } - app.address = listener.Multiaddr() - app.closer = listener - app.running = true + app.Address = listener.Multiaddr() + app.Closer = listener + app.Running = true - go doAccept(&app, remote, listener) + go doAccept(n, &app, remote, listener) default: res.SetError(errors.New("unsupported protocol: "+lnet), cmds.ErrNormal) @@ -487,15 +392,15 @@ var CorenetDialCmd = &cmds.Command{ } output := CorenetAppInfoOutput{ - Protocol: app.protocol, - Address: app.address.String(), + Protocol: app.Protocol, + Address: app.Address.String(), } res.SetOutput(&output) }, } -func doAccept(app *cnAppInfo, remote net.Stream, listener manet.Listener) { +func doAccept(n *core.IpfsNode, app *corenet.AppInfo, remote net.Stream, listener manet.Listener) { defer listener.Close() local, err := listener.Accept() @@ -503,20 +408,22 @@ func doAccept(app *cnAppInfo, remote net.Stream, listener manet.Listener) { return } - stream := cnStreamInfo{ - protocol: app.protocol, + stream := corenet.StreamInfo{ + Protocol: app.Protocol, - localPeer: app.identity, - localAddr: app.address, + LocalPeer: app.Identity, + LocalAddr: app.Address, - remotePeer: remote.Conn().RemotePeer(), - remoteAddr: remote.Conn().RemoteMultiaddr(), + RemotePeer: remote.Conn().RemotePeer(), + RemoteAddr: remote.Conn().RemoteMultiaddr(), - local: local, - remote: remote, + Local: local, + Remote: remote, + + Registry: &n.Corenet.Streams, } - streams.Register(&stream) + n.Corenet.Streams.Register(&stream) startStreaming(&stream) } @@ -571,8 +478,8 @@ var CorenetCloseCmd = &cmds.Command{ } if closeAll || useHandlerID { - for _, s := range streams.streams { - if !closeAll && handlerID != s.handlerID { + for _, s := range n.Corenet.Streams.Streams { + if !closeAll && handlerID != s.HandlerID { continue } s.Close() @@ -583,8 +490,8 @@ var CorenetCloseCmd = &cmds.Command{ } if closeAll || !useHandlerID { - for _, a := range apps.apps { - if !closeAll && a.protocol != proto { + for _, a := range n.Corenet.Apps.Apps { + if !closeAll && a.Protocol != proto { continue } a.Close() diff --git a/core/core.go b/core/core.go index 3b0b9ded1..c90e32dfb 100644 --- a/core/core.go +++ b/core/core.go @@ -23,6 +23,7 @@ import ( bstore "github.com/ipfs/go-ipfs/blocks/blockstore" bserv "github.com/ipfs/go-ipfs/blockservice" + corenet "github.com/ipfs/go-ipfs/corenet" exchange "github.com/ipfs/go-ipfs/exchange" bitswap "github.com/ipfs/go-ipfs/exchange/bitswap" bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network" @@ -131,6 +132,7 @@ type IpfsNode struct { IpnsRepub *ipnsrp.Republisher Floodsub *floodsub.PubSub + Corenet *corenet.Corenet proc goprocess.Process ctx context.Context @@ -246,6 +248,8 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin n.Floodsub = floodsub.NewFloodSub(ctx, peerhost) } + n.Corenet = corenet.NewCorenet() + // setup local discovery if do != nil { service, err := do(ctx, n.PeerHost) diff --git a/corenet/apps.go b/corenet/apps.go new file mode 100644 index 000000000..c21d42bde --- /dev/null +++ b/corenet/apps.go @@ -0,0 +1,58 @@ +package corenet + +import ( + "io" + + ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr" + peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer" +) + +// AppInfo holds information on a local application protocol listener service. +type AppInfo struct { + // Application protocol identifier. + Protocol string + + // Node identity + Identity peer.ID + + // Local protocol stream address. + Address ma.Multiaddr + + // Local protocol stream listener. + Closer io.Closer + + // Flag indicating whether we're still accepting incoming connections, or + // whether this application listener has been shutdown. + Running bool + + Registry *AppRegistry +} + +func (c *AppInfo) Close() error { + c.Registry.Deregister(c.Protocol) + c.Closer.Close() + return nil +} + +// AppRegistry is a collection of local application protocol listeners. +type AppRegistry struct { + Apps []*AppInfo +} + +func (c *AppRegistry) Register(appInfo *AppInfo) { + c.Apps = append(c.Apps, appInfo) +} + +func (c *AppRegistry) Deregister(proto string) { + foundAt := -1 + for i, a := range c.Apps { + if a.Protocol == proto { + foundAt = i + break + } + } + + if foundAt != -1 { + c.Apps = append(c.Apps[:foundAt], c.Apps[foundAt+1:]...) + } +} diff --git a/corenet/corenet.go b/corenet/corenet.go new file mode 100644 index 000000000..d549a0f36 --- /dev/null +++ b/corenet/corenet.go @@ -0,0 +1,10 @@ +package corenet + +type Corenet struct { + Apps AppRegistry + Streams StreamRegistry +} + +func NewCorenet() *Corenet { + return &Corenet{} +} diff --git a/core/corenet/net.go b/corenet/net/net.go similarity index 99% rename from core/corenet/net.go rename to corenet/net/net.go index 4fc83092f..8e7de82b5 100644 --- a/core/corenet/net.go +++ b/corenet/net/net.go @@ -1,4 +1,4 @@ -package corenet +package net import ( "time" diff --git a/corenet/streams.go b/corenet/streams.go new file mode 100644 index 000000000..818962053 --- /dev/null +++ b/corenet/streams.go @@ -0,0 +1,60 @@ +package corenet + +import ( + "io" + + ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr" + peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer" +) + +// StreamInfo holds information on active incoming and outgoing protocol app streams. +type StreamInfo struct { + HandlerID uint64 + + Protocol string + + LocalPeer peer.ID + LocalAddr ma.Multiaddr + + RemotePeer peer.ID + RemoteAddr ma.Multiaddr + + Local io.ReadWriteCloser + Remote io.ReadWriteCloser + + Registry *StreamRegistry +} + +func (c *StreamInfo) Close() error { + c.Local.Close() + c.Remote.Close() + c.Registry.Deregister(c.HandlerID) + return nil +} + +// StreamRegistry is a collection of active incoming and outgoing protocol app streams. +type StreamRegistry struct { + Streams []*StreamInfo + + nextID uint64 +} + +func (c *StreamRegistry) Register(streamInfo *StreamInfo) { + streamInfo.HandlerID = c.nextID + c.Streams = append(c.Streams, streamInfo) + c.nextID++ +} + +func (c *StreamRegistry) Deregister(handlerID uint64) { + foundAt := -1 + for i, s := range c.Streams { + if s.HandlerID == handlerID { + foundAt = i + break + } + } + + if foundAt != -1 { + c.Streams = append(c.Streams[:foundAt], c.Streams[foundAt+1:]...) + } +}