From 4badcdc340f809046966fad75707bbb2182151c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 30 Jul 2018 17:15:01 +0200 Subject: [PATCH] p2p: tag connections in connection manager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Ɓukasz Magiera --- core/commands/p2p.go | 1 - p2p/local.go | 15 +++++++++++---- p2p/p2p.go | 2 +- p2p/remote.go | 13 +++++++++++-- p2p/stream.go | 7 ++++++- 5 files changed, 29 insertions(+), 9 deletions(-) diff --git a/core/commands/p2p.go b/core/commands/p2p.go index 8d01b2607..301195f4a 100644 --- a/core/commands/p2p.go +++ b/core/commands/p2p.go @@ -14,7 +14,6 @@ import ( core "github.com/ipfs/go-ipfs/core" p2p "github.com/ipfs/go-ipfs/p2p" - "gx/ipfs/Qme4QgoVPyQqxVc4G1c2L2wc9TDa6o294rtspGMnBNRujm/go-ipfs-addr" ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" pstore "gx/ipfs/QmZR2XWVVBCtbgBWnQhWk2xcQfaR3W8faQPriAiaaj7rsr/go-libp2p-peerstore" diff --git a/p2p/local.go b/p2p/local.go index c2daa2df1..e64c7de0b 100644 --- a/p2p/local.go +++ b/p2p/local.go @@ -4,12 +4,12 @@ import ( "context" "time" - "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net" - ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" - "gx/ipfs/QmdVrMn1LhB4ybb8hMVaMLXnA8XRSewMnK6YqXKXoTcRvN/go-libp2p-peer" - tec "gx/ipfs/QmWHgLqrghM9zw77nF6gdvT9ExQ2RB9pLxkd8sDHZf1rWb/go-temp-err-catcher" "gx/ipfs/QmPjvxTpVH8qJyQDnxnsxF9kv9jezKD1kozz1hs3fCGsNh/go-libp2p-net" + "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net" + tec "gx/ipfs/QmWHgLqrghM9zw77nF6gdvT9ExQ2RB9pLxkd8sDHZf1rWb/go-temp-err-catcher" + ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" + "gx/ipfs/QmdVrMn1LhB4ybb8hMVaMLXnA8XRSewMnK6YqXKXoTcRvN/go-libp2p-peer" ) // localListener manet streams and proxies them to libp2p services @@ -77,6 +77,9 @@ func (l *localListener) setupStream(local manet.Conn) { return } + cmgr := l.p2p.peerHost.ConnManager() + cmgr.TagPeer(l.peer, CMGR_TAG, 20) + stream := &Stream{ Protocol: l.proto, @@ -87,6 +90,10 @@ func (l *localListener) setupStream(local manet.Conn) { Remote: remote, Registry: l.p2p.Streams, + + cleanup: func() { + cmgr.UntagPeer(l.peer, CMGR_TAG) + }, } l.p2p.Streams.Register(stream) diff --git a/p2p/p2p.go b/p2p/p2p.go index 8228e97c8..737bf1434 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -1,9 +1,9 @@ package p2p import ( - logging "gx/ipfs/QmcVVHfdyv15GVPk7NrxdWjh2hLVccXnoD8j2tyQShiXJb/go-log" pstore "gx/ipfs/QmZR2XWVVBCtbgBWnQhWk2xcQfaR3W8faQPriAiaaj7rsr/go-libp2p-peerstore" p2phost "gx/ipfs/Qmb8T6YBBsjYsVGfrihQLfCJveczZnneSBqBKkYEBWDjge/go-libp2p-host" + logging "gx/ipfs/QmcVVHfdyv15GVPk7NrxdWjh2hLVccXnoD8j2tyQShiXJb/go-log" peer "gx/ipfs/QmdVrMn1LhB4ybb8hMVaMLXnA8XRSewMnK6YqXKXoTcRvN/go-libp2p-peer" ) diff --git a/p2p/remote.go b/p2p/remote.go index 23161bedc..53c25ddf4 100644 --- a/p2p/remote.go +++ b/p2p/remote.go @@ -3,9 +3,9 @@ package p2p import ( "context" + net "gx/ipfs/QmPjvxTpVH8qJyQDnxnsxF9kv9jezKD1kozz1hs3fCGsNh/go-libp2p-net" manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net" ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" - net "gx/ipfs/QmPjvxTpVH8qJyQDnxnsxF9kv9jezKD1kozz1hs3fCGsNh/go-libp2p-net" protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" ) @@ -47,12 +47,17 @@ func (l *remoteListener) start() error { return } - peerMa, err := ma.NewMultiaddr(maPrefix + remote.Conn().RemotePeer().Pretty()) + peer := remote.Conn().RemotePeer() + + peerMa, err := ma.NewMultiaddr(maPrefix + peer.Pretty()) if err != nil { remote.Reset() return } + cmgr := l.p2p.peerHost.ConnManager() + cmgr.TagPeer(peer, CMGR_TAG, 20) + stream := &Stream{ Protocol: l.proto, @@ -63,6 +68,10 @@ func (l *remoteListener) start() error { Remote: remote, Registry: l.p2p.Streams, + + cleanup: func() { + cmgr.UntagPeer(peer, CMGR_TAG) + }, } l.p2p.Streams.Register(stream) diff --git a/p2p/stream.go b/p2p/stream.go index 26e70ad76..daa4a3f85 100644 --- a/p2p/stream.go +++ b/p2p/stream.go @@ -4,12 +4,14 @@ import ( "io" "sync" + net "gx/ipfs/QmPjvxTpVH8qJyQDnxnsxF9kv9jezKD1kozz1hs3fCGsNh/go-libp2p-net" manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net" ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" - net "gx/ipfs/QmPjvxTpVH8qJyQDnxnsxF9kv9jezKD1kozz1hs3fCGsNh/go-libp2p-net" "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" ) +const CMGR_TAG = "stream-fwd" + // Stream holds information on active incoming and outgoing p2p streams. type Stream struct { id uint64 @@ -23,12 +25,15 @@ type Stream struct { Remote net.Stream Registry *StreamRegistry + + cleanup func() } // Close closes stream endpoints and deregisters it func (s *Stream) Close() error { s.Local.Close() s.Remote.Close() + s.cleanup() s.Registry.Deregister(s.id) return nil }