Corenet API: Store state in node

License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:
Łukasz Magiera 2017-05-30 17:13:44 +02:00
parent fc08d5331b
commit 8948519590
6 changed files with 203 additions and 164 deletions

View File

@ -10,20 +10,22 @@ import (
cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
corenet "github.com/ipfs/go-ipfs/core/corenet"
corenet "github.com/ipfs/go-ipfs/corenet"
cnet "github.com/ipfs/go-ipfs/corenet/net"
peerstore "gx/ipfs/QmNUVzEjq3XWJ89hegahPvyfJbTXgTaom48pLb7YBD9gHQ/go-libp2p-peerstore"
net "gx/ipfs/QmVHSBsn8LEeay8m5ERebgUVuhzw838PsyTttCmP6GMJkg/go-libp2p-net"
ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr"
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
manet "gx/ipfs/Qmf1Gq7N45Rpuw7ev47uWgH6dLPtdnvcMRNPkVBwqjLJg2/go-multiaddr-net"
)
// CorenetAppInfoOutput is output type of ls command
type CorenetAppInfoOutput struct {
Protocol string
Address string
}
// CorenetStreamInfoOutput is output type of streams command
type CorenetStreamInfoOutput struct {
HandlerID string
Protocol string
@ -33,116 +35,16 @@ type CorenetStreamInfoOutput struct {
RemoteAddress string
}
// CorenetLsOutput is output type of ls command
type CorenetLsOutput struct {
Apps []CorenetAppInfoOutput
}
// CorenetStreamsOutput is output type of streams command
type CorenetStreamsOutput struct {
Streams []CorenetStreamInfoOutput
}
// cnAppInfo holds information on a local application protocol listener service.
type cnAppInfo struct {
// Application protocol identifier.
protocol string
// Node identity
identity peer.ID
// Local protocol stream address.
address ma.Multiaddr
// Local protocol stream listener.
closer io.Closer
// Flag indicating whether we're still accepting incoming connections, or
// whether this application listener has been shutdown.
running bool
}
func (c *cnAppInfo) Close() error {
apps.Deregister(c.protocol)
c.closer.Close()
return nil
}
// cnAppRegistry is a collection of local application protocol listeners.
type cnAppRegistry struct {
apps []*cnAppInfo
}
func (c *cnAppRegistry) Register(appInfo *cnAppInfo) {
c.apps = append(c.apps, appInfo)
}
func (c *cnAppRegistry) Deregister(proto string) {
foundAt := -1
for i, a := range c.apps {
if a.protocol == proto {
foundAt = i
break
}
}
if foundAt != -1 {
c.apps = append(c.apps[:foundAt], c.apps[foundAt+1:]...)
}
}
// cnStreamInfo holds information on active incoming and outgoing protocol app streams.
type cnStreamInfo struct {
handlerID uint64
protocol string
localPeer peer.ID
localAddr ma.Multiaddr
remotePeer peer.ID
remoteAddr ma.Multiaddr
local io.ReadWriteCloser
remote io.ReadWriteCloser
}
func (c *cnStreamInfo) Close() error {
c.local.Close()
c.remote.Close()
streams.Deregister(c.handlerID)
return nil
}
// cnStreamRegistry is a collection of active incoming and outgoing protocol app streams.
type cnStreamRegistry struct {
streams []*cnStreamInfo
nextID uint64
}
func (c *cnStreamRegistry) Register(streamInfo *cnStreamInfo) {
streamInfo.handlerID = c.nextID
c.streams = append(c.streams, streamInfo)
c.nextID++
}
func (c *cnStreamRegistry) Deregister(handlerID uint64) {
foundAt := -1
for i, s := range c.streams {
if s.handlerID == handlerID {
foundAt = i
break
}
}
if foundAt != -1 {
c.streams = append(c.streams[:foundAt], c.streams[foundAt+1:]...)
}
}
//TODO: Ideally I'd like to see these combined into a module in core.
var apps cnAppRegistry
var streams cnStreamRegistry
var CorenetCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Libp2p stream mounting.",
@ -188,10 +90,10 @@ var CorenetLsCmd = &cmds.Command{
output := &CorenetLsOutput{}
for _, a := range apps.apps {
for _, app := range n.Corenet.Apps.Apps {
output.Apps = append(output.Apps, CorenetAppInfoOutput{
Protocol: a.protocol,
Address: a.address.String(),
Protocol: app.Protocol,
Address: app.Address.String(),
})
}
@ -245,17 +147,17 @@ var CorenetStreamsCmd = &cmds.Command{
output := &CorenetStreamsOutput{}
for _, s := range streams.streams {
for _, s := range n.Corenet.Streams.Streams {
output.Streams = append(output.Streams, CorenetStreamInfoOutput{
HandlerID: strconv.FormatUint(s.handlerID, 10),
HandlerID: strconv.FormatUint(s.HandlerID, 10),
Protocol: s.protocol,
Protocol: s.Protocol,
LocalPeer: s.localPeer.Pretty(),
LocalAddress: s.localAddr.String(),
LocalPeer: s.LocalPeer.Pretty(),
LocalAddress: s.LocalAddr.String(),
RemotePeer: s.remotePeer.Pretty(),
RemoteAddress: s.remoteAddr.String(),
RemotePeer: s.RemotePeer.Pretty(),
RemoteAddress: s.RemoteAddr.String(),
})
}
@ -320,23 +222,24 @@ var CorenetListenCmd = &cmds.Command{
return
}
listener, err := corenet.Listen(n, proto)
listener, err := cnet.Listen(n, proto)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
app := cnAppInfo{
identity: n.Identity,
protocol: proto,
address: addr,
closer: listener,
running: true,
app := corenet.AppInfo{
Identity: n.Identity,
Protocol: proto,
Address: addr,
Closer: listener,
Running: true,
Registry: &n.Corenet.Apps,
}
go acceptStreams(&app, listener)
go acceptStreams(n, &app, listener)
apps.Register(&app)
n.Corenet.Apps.Register(&app)
// Successful response.
res.SetOutput(&CorenetAppInfoOutput{
@ -356,47 +259,49 @@ func checkProtoExists(protos []string, proto string) bool {
return false
}
func acceptStreams(app *cnAppInfo, listener corenet.Listener) {
for app.running {
func acceptStreams(n *core.IpfsNode, app *corenet.AppInfo, listener cnet.Listener) {
for app.Running {
remote, err := listener.Accept()
if err != nil {
listener.Close()
break
}
local, err := manet.Dial(app.address)
local, err := manet.Dial(app.Address)
if err != nil {
remote.Close()
continue
}
stream := cnStreamInfo{
protocol: app.protocol,
stream := corenet.StreamInfo{
Protocol: app.Protocol,
localPeer: app.identity,
localAddr: app.address,
LocalPeer: app.Identity,
LocalAddr: app.Address,
remotePeer: remote.Conn().RemotePeer(),
remoteAddr: remote.Conn().RemoteMultiaddr(),
RemotePeer: remote.Conn().RemotePeer(),
RemoteAddr: remote.Conn().RemoteMultiaddr(),
local: local,
remote: remote,
Local: local,
Remote: remote,
Registry: &n.Corenet.Streams,
}
streams.Register(&stream)
n.Corenet.Streams.Register(&stream)
startStreaming(&stream)
}
apps.Deregister(app.protocol)
n.Corenet.Apps.Deregister(app.Protocol)
}
func startStreaming(stream *cnStreamInfo) {
func startStreaming(stream *corenet.StreamInfo) {
go func() {
io.Copy(stream.local, stream.remote)
io.Copy(stream.Local, stream.Remote)
stream.Close()
}()
go func() {
io.Copy(stream.remote, stream.local)
io.Copy(stream.Remote, stream.Local)
stream.Close()
}()
}
@ -451,14 +356,14 @@ var CorenetDialCmd = &cmds.Command{
return
}
app := cnAppInfo{
identity: n.Identity,
protocol: proto,
app := corenet.AppInfo{
Identity: n.Identity,
Protocol: proto,
}
n.Peerstore.AddAddr(peer, addr, peerstore.TempAddrTTL)
remote, err := corenet.Dial(n, peer, proto)
remote, err := cnet.Dial(n, peer, proto)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
@ -475,11 +380,11 @@ var CorenetDialCmd = &cmds.Command{
return
}
app.address = listener.Multiaddr()
app.closer = listener
app.running = true
app.Address = listener.Multiaddr()
app.Closer = listener
app.Running = true
go doAccept(&app, remote, listener)
go doAccept(n, &app, remote, listener)
default:
res.SetError(errors.New("unsupported protocol: "+lnet), cmds.ErrNormal)
@ -487,15 +392,15 @@ var CorenetDialCmd = &cmds.Command{
}
output := CorenetAppInfoOutput{
Protocol: app.protocol,
Address: app.address.String(),
Protocol: app.Protocol,
Address: app.Address.String(),
}
res.SetOutput(&output)
},
}
func doAccept(app *cnAppInfo, remote net.Stream, listener manet.Listener) {
func doAccept(n *core.IpfsNode, app *corenet.AppInfo, remote net.Stream, listener manet.Listener) {
defer listener.Close()
local, err := listener.Accept()
@ -503,20 +408,22 @@ func doAccept(app *cnAppInfo, remote net.Stream, listener manet.Listener) {
return
}
stream := cnStreamInfo{
protocol: app.protocol,
stream := corenet.StreamInfo{
Protocol: app.Protocol,
localPeer: app.identity,
localAddr: app.address,
LocalPeer: app.Identity,
LocalAddr: app.Address,
remotePeer: remote.Conn().RemotePeer(),
remoteAddr: remote.Conn().RemoteMultiaddr(),
RemotePeer: remote.Conn().RemotePeer(),
RemoteAddr: remote.Conn().RemoteMultiaddr(),
local: local,
remote: remote,
Local: local,
Remote: remote,
Registry: &n.Corenet.Streams,
}
streams.Register(&stream)
n.Corenet.Streams.Register(&stream)
startStreaming(&stream)
}
@ -571,8 +478,8 @@ var CorenetCloseCmd = &cmds.Command{
}
if closeAll || useHandlerID {
for _, s := range streams.streams {
if !closeAll && handlerID != s.handlerID {
for _, s := range n.Corenet.Streams.Streams {
if !closeAll && handlerID != s.HandlerID {
continue
}
s.Close()
@ -583,8 +490,8 @@ var CorenetCloseCmd = &cmds.Command{
}
if closeAll || !useHandlerID {
for _, a := range apps.apps {
if !closeAll && a.protocol != proto {
for _, a := range n.Corenet.Apps.Apps {
if !closeAll && a.Protocol != proto {
continue
}
a.Close()

View File

@ -23,6 +23,7 @@ import (
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
bserv "github.com/ipfs/go-ipfs/blockservice"
corenet "github.com/ipfs/go-ipfs/corenet"
exchange "github.com/ipfs/go-ipfs/exchange"
bitswap "github.com/ipfs/go-ipfs/exchange/bitswap"
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
@ -131,6 +132,7 @@ type IpfsNode struct {
IpnsRepub *ipnsrp.Republisher
Floodsub *floodsub.PubSub
Corenet *corenet.Corenet
proc goprocess.Process
ctx context.Context
@ -246,6 +248,8 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
n.Floodsub = floodsub.NewFloodSub(ctx, peerhost)
}
n.Corenet = corenet.NewCorenet()
// setup local discovery
if do != nil {
service, err := do(ctx, n.PeerHost)

58
corenet/apps.go Normal file
View File

@ -0,0 +1,58 @@
package corenet
import (
"io"
ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr"
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
)
// AppInfo holds information on a local application protocol listener service.
type AppInfo struct {
// Application protocol identifier.
Protocol string
// Node identity
Identity peer.ID
// Local protocol stream address.
Address ma.Multiaddr
// Local protocol stream listener.
Closer io.Closer
// Flag indicating whether we're still accepting incoming connections, or
// whether this application listener has been shutdown.
Running bool
Registry *AppRegistry
}
func (c *AppInfo) Close() error {
c.Registry.Deregister(c.Protocol)
c.Closer.Close()
return nil
}
// AppRegistry is a collection of local application protocol listeners.
type AppRegistry struct {
Apps []*AppInfo
}
func (c *AppRegistry) Register(appInfo *AppInfo) {
c.Apps = append(c.Apps, appInfo)
}
func (c *AppRegistry) Deregister(proto string) {
foundAt := -1
for i, a := range c.Apps {
if a.Protocol == proto {
foundAt = i
break
}
}
if foundAt != -1 {
c.Apps = append(c.Apps[:foundAt], c.Apps[foundAt+1:]...)
}
}

10
corenet/corenet.go Normal file
View File

@ -0,0 +1,10 @@
package corenet
type Corenet struct {
Apps AppRegistry
Streams StreamRegistry
}
func NewCorenet() *Corenet {
return &Corenet{}
}

View File

@ -1,4 +1,4 @@
package corenet
package net
import (
"time"

60
corenet/streams.go Normal file
View File

@ -0,0 +1,60 @@
package corenet
import (
"io"
ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr"
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
)
// StreamInfo holds information on active incoming and outgoing protocol app streams.
type StreamInfo struct {
HandlerID uint64
Protocol string
LocalPeer peer.ID
LocalAddr ma.Multiaddr
RemotePeer peer.ID
RemoteAddr ma.Multiaddr
Local io.ReadWriteCloser
Remote io.ReadWriteCloser
Registry *StreamRegistry
}
func (c *StreamInfo) Close() error {
c.Local.Close()
c.Remote.Close()
c.Registry.Deregister(c.HandlerID)
return nil
}
// StreamRegistry is a collection of active incoming and outgoing protocol app streams.
type StreamRegistry struct {
Streams []*StreamInfo
nextID uint64
}
func (c *StreamRegistry) Register(streamInfo *StreamInfo) {
streamInfo.HandlerID = c.nextID
c.Streams = append(c.Streams, streamInfo)
c.nextID++
}
func (c *StreamRegistry) Deregister(handlerID uint64) {
foundAt := -1
for i, s := range c.Streams {
if s.HandlerID == handlerID {
foundAt = i
break
}
}
if foundAt != -1 {
c.Streams = append(c.Streams[:foundAt], c.Streams[foundAt+1:]...)
}
}