mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 10:27:46 +08:00
chore: migrate peering to ipfs/boxo (#10157)
Co-authored-by: Henrique Dias <hacdias@gmail.com>
This commit is contained in:
parent
33bbee5adf
commit
ab7630fcd4
@ -49,13 +49,13 @@ import (
|
||||
|
||||
"github.com/ipfs/boxo/namesys"
|
||||
ipnsrp "github.com/ipfs/boxo/namesys/republisher"
|
||||
"github.com/ipfs/boxo/peering"
|
||||
"github.com/ipfs/kubo/config"
|
||||
"github.com/ipfs/kubo/core/bootstrap"
|
||||
"github.com/ipfs/kubo/core/node"
|
||||
"github.com/ipfs/kubo/core/node/libp2p"
|
||||
"github.com/ipfs/kubo/fuse/mount"
|
||||
"github.com/ipfs/kubo/p2p"
|
||||
"github.com/ipfs/kubo/peering"
|
||||
"github.com/ipfs/kubo/repo"
|
||||
irouting "github.com/ipfs/kubo/routing"
|
||||
)
|
||||
|
||||
@ -3,7 +3,7 @@ package node
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ipfs/kubo/peering"
|
||||
"github.com/ipfs/boxo/peering"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"go.uber.org/fx"
|
||||
@ -18,7 +18,8 @@ func Peering(lc fx.Lifecycle, host host.Host) *peering.PeeringService {
|
||||
return ps.Start()
|
||||
},
|
||||
OnStop: func(context.Context) error {
|
||||
return ps.Stop()
|
||||
ps.Stop()
|
||||
return nil
|
||||
},
|
||||
})
|
||||
return ps
|
||||
|
||||
@ -1,325 +0,0 @@
|
||||
package peering
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-log"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
// Seed the random number generator.
|
||||
//
|
||||
// We don't need good randomness, but we do need randomness.
|
||||
const (
|
||||
// maxBackoff is the maximum time between reconnect attempts.
|
||||
maxBackoff = 10 * time.Minute
|
||||
// The backoff will be cut off when we get within 10% of the actual max.
|
||||
// If we go over the max, we'll adjust the delay down to a random value
|
||||
// between 90-100% of the max backoff.
|
||||
maxBackoffJitter = 10 // %
|
||||
connmgrTag = "ipfs-peering"
|
||||
// This needs to be sufficient to prevent two sides from simultaneously
|
||||
// dialing.
|
||||
initialDelay = 5 * time.Second
|
||||
)
|
||||
|
||||
var logger = log.Logger("peering")
|
||||
|
||||
type State uint
|
||||
|
||||
func (s State) String() string {
|
||||
switch s {
|
||||
case StateInit:
|
||||
return "init"
|
||||
case StateRunning:
|
||||
return "running"
|
||||
case StateStopped:
|
||||
return "stopped"
|
||||
default:
|
||||
return "unknown peering state: " + strconv.FormatUint(uint64(s), 10)
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
StateInit State = iota
|
||||
StateRunning
|
||||
StateStopped
|
||||
)
|
||||
|
||||
// peerHandler keeps track of all state related to a specific "peering" peer.
|
||||
type peerHandler struct {
|
||||
peer peer.ID
|
||||
host host.Host
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
mu sync.Mutex
|
||||
addrs []multiaddr.Multiaddr
|
||||
reconnectTimer *time.Timer
|
||||
|
||||
nextDelay time.Duration
|
||||
}
|
||||
|
||||
// setAddrs sets the addresses for this peer.
|
||||
func (ph *peerHandler) setAddrs(addrs []multiaddr.Multiaddr) {
|
||||
// Not strictly necessary, but it helps to not trust the calling code.
|
||||
addrCopy := make([]multiaddr.Multiaddr, len(addrs))
|
||||
copy(addrCopy, addrs)
|
||||
|
||||
ph.mu.Lock()
|
||||
defer ph.mu.Unlock()
|
||||
ph.addrs = addrCopy
|
||||
}
|
||||
|
||||
// getAddrs returns a shared slice of addresses for this peer. Do not modify.
|
||||
func (ph *peerHandler) getAddrs() []multiaddr.Multiaddr {
|
||||
ph.mu.Lock()
|
||||
defer ph.mu.Unlock()
|
||||
return ph.addrs
|
||||
}
|
||||
|
||||
// stop permanently stops the peer handler.
|
||||
func (ph *peerHandler) stop() {
|
||||
ph.cancel()
|
||||
|
||||
ph.mu.Lock()
|
||||
defer ph.mu.Unlock()
|
||||
if ph.reconnectTimer != nil {
|
||||
ph.reconnectTimer.Stop()
|
||||
ph.reconnectTimer = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (ph *peerHandler) nextBackoff() time.Duration {
|
||||
if ph.nextDelay < maxBackoff {
|
||||
ph.nextDelay += ph.nextDelay/2 + time.Duration(rand.Int63n(int64(ph.nextDelay)))
|
||||
}
|
||||
|
||||
// If we've gone over the max backoff, reduce it under the max.
|
||||
if ph.nextDelay > maxBackoff {
|
||||
ph.nextDelay = maxBackoff
|
||||
// randomize the backoff a bit (10%).
|
||||
ph.nextDelay -= time.Duration(rand.Int63n(int64(maxBackoff) * maxBackoffJitter / 100))
|
||||
}
|
||||
|
||||
return ph.nextDelay
|
||||
}
|
||||
|
||||
func (ph *peerHandler) reconnect() {
|
||||
// Try connecting
|
||||
addrs := ph.getAddrs()
|
||||
logger.Debugw("reconnecting", "peer", ph.peer, "addrs", addrs)
|
||||
|
||||
err := ph.host.Connect(ph.ctx, peer.AddrInfo{ID: ph.peer, Addrs: addrs})
|
||||
if err != nil {
|
||||
logger.Debugw("failed to reconnect", "peer", ph.peer, "error", err)
|
||||
// Ok, we failed. Extend the timeout.
|
||||
ph.mu.Lock()
|
||||
if ph.reconnectTimer != nil {
|
||||
// Only counts if the reconnectTimer still exists. If not, a
|
||||
// connection _was_ somehow established.
|
||||
ph.reconnectTimer.Reset(ph.nextBackoff())
|
||||
}
|
||||
// Otherwise, someone else has stopped us so we can assume that
|
||||
// we're either connected or someone else will start us.
|
||||
ph.mu.Unlock()
|
||||
}
|
||||
|
||||
// Always call this. We could have connected since we processed the
|
||||
// error.
|
||||
ph.stopIfConnected()
|
||||
}
|
||||
|
||||
func (ph *peerHandler) stopIfConnected() {
|
||||
ph.mu.Lock()
|
||||
defer ph.mu.Unlock()
|
||||
|
||||
if ph.reconnectTimer != nil && ph.host.Network().Connectedness(ph.peer) == network.Connected {
|
||||
logger.Debugw("successfully reconnected", "peer", ph.peer)
|
||||
ph.reconnectTimer.Stop()
|
||||
ph.reconnectTimer = nil
|
||||
ph.nextDelay = initialDelay
|
||||
}
|
||||
}
|
||||
|
||||
// startIfDisconnected is the inverse of stopIfConnected.
|
||||
func (ph *peerHandler) startIfDisconnected() {
|
||||
ph.mu.Lock()
|
||||
defer ph.mu.Unlock()
|
||||
|
||||
if ph.reconnectTimer == nil && ph.host.Network().Connectedness(ph.peer) != network.Connected {
|
||||
logger.Debugw("disconnected from peer", "peer", ph.peer)
|
||||
// Always start with a short timeout so we can stagger things a bit.
|
||||
ph.reconnectTimer = time.AfterFunc(ph.nextBackoff(), ph.reconnect)
|
||||
}
|
||||
}
|
||||
|
||||
// PeeringService maintains connections to specified peers, reconnecting on
|
||||
// disconnect with a back-off.
|
||||
type PeeringService struct {
|
||||
host host.Host
|
||||
|
||||
mu sync.RWMutex
|
||||
peers map[peer.ID]*peerHandler
|
||||
state State
|
||||
}
|
||||
|
||||
// NewPeeringService constructs a new peering service. Peers can be added and
|
||||
// removed immediately, but connections won't be formed until `Start` is called.
|
||||
func NewPeeringService(host host.Host) *PeeringService {
|
||||
return &PeeringService{host: host, peers: make(map[peer.ID]*peerHandler)}
|
||||
}
|
||||
|
||||
// Start starts the peering service, connecting and maintaining connections to
|
||||
// all registered peers. It returns an error if the service has already been
|
||||
// stopped.
|
||||
func (ps *PeeringService) Start() error {
|
||||
ps.mu.Lock()
|
||||
defer ps.mu.Unlock()
|
||||
|
||||
switch ps.state {
|
||||
case StateInit:
|
||||
logger.Infow("starting")
|
||||
case StateRunning:
|
||||
return nil
|
||||
case StateStopped:
|
||||
return errors.New("already stopped")
|
||||
}
|
||||
ps.host.Network().Notify((*netNotifee)(ps))
|
||||
ps.state = StateRunning
|
||||
for _, handler := range ps.peers {
|
||||
go handler.startIfDisconnected()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetState get the State of the PeeringService.
|
||||
func (ps *PeeringService) GetState() State {
|
||||
ps.mu.RLock()
|
||||
defer ps.mu.RUnlock()
|
||||
return ps.state
|
||||
}
|
||||
|
||||
// Stop stops the peering service.
|
||||
func (ps *PeeringService) Stop() error {
|
||||
ps.host.Network().StopNotify((*netNotifee)(ps))
|
||||
ps.mu.Lock()
|
||||
defer ps.mu.Unlock()
|
||||
|
||||
switch ps.state {
|
||||
case StateInit, StateRunning:
|
||||
logger.Infow("stopping")
|
||||
for _, handler := range ps.peers {
|
||||
handler.stop()
|
||||
}
|
||||
ps.state = StateStopped
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddPeer adds a peer to the peering service. This function may be safely
|
||||
// called at any time: before the service is started, while running, or after it
|
||||
// stops.
|
||||
//
|
||||
// Add peer may also be called multiple times for the same peer. The new
|
||||
// addresses will replace the old.
|
||||
func (ps *PeeringService) AddPeer(info peer.AddrInfo) {
|
||||
ps.mu.Lock()
|
||||
defer ps.mu.Unlock()
|
||||
|
||||
if handler, ok := ps.peers[info.ID]; ok {
|
||||
logger.Infow("updating addresses", "peer", info.ID, "addrs", info.Addrs)
|
||||
handler.setAddrs(info.Addrs)
|
||||
} else {
|
||||
logger.Infow("peer added", "peer", info.ID, "addrs", info.Addrs)
|
||||
ps.host.ConnManager().Protect(info.ID, connmgrTag)
|
||||
|
||||
handler = &peerHandler{
|
||||
host: ps.host,
|
||||
peer: info.ID,
|
||||
addrs: info.Addrs,
|
||||
nextDelay: initialDelay,
|
||||
}
|
||||
handler.ctx, handler.cancel = context.WithCancel(context.Background())
|
||||
ps.peers[info.ID] = handler
|
||||
switch ps.state {
|
||||
case StateRunning:
|
||||
go handler.startIfDisconnected()
|
||||
case StateStopped:
|
||||
// We still construct everything in this state because
|
||||
// it's easier to reason about. But we should still free
|
||||
// resources.
|
||||
handler.cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ListPeers lists peers in the peering service.
|
||||
func (ps *PeeringService) ListPeers() []peer.AddrInfo {
|
||||
ps.mu.RLock()
|
||||
defer ps.mu.RUnlock()
|
||||
|
||||
out := make([]peer.AddrInfo, 0, len(ps.peers))
|
||||
for id, addrs := range ps.peers {
|
||||
ai := peer.AddrInfo{ID: id}
|
||||
ai.Addrs = append(ai.Addrs, addrs.addrs...)
|
||||
out = append(out, ai)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// RemovePeer removes a peer from the peering service. This function may be
|
||||
// safely called at any time: before the service is started, while running, or
|
||||
// after it stops.
|
||||
func (ps *PeeringService) RemovePeer(id peer.ID) {
|
||||
ps.mu.Lock()
|
||||
defer ps.mu.Unlock()
|
||||
|
||||
if handler, ok := ps.peers[id]; ok {
|
||||
logger.Infow("peer removed", "peer", id)
|
||||
ps.host.ConnManager().Unprotect(id, connmgrTag)
|
||||
|
||||
handler.stop()
|
||||
delete(ps.peers, id)
|
||||
}
|
||||
}
|
||||
|
||||
type netNotifee PeeringService
|
||||
|
||||
func (nn *netNotifee) Connected(_ network.Network, c network.Conn) {
|
||||
ps := (*PeeringService)(nn)
|
||||
|
||||
p := c.RemotePeer()
|
||||
ps.mu.RLock()
|
||||
defer ps.mu.RUnlock()
|
||||
|
||||
if handler, ok := ps.peers[p]; ok {
|
||||
// use a goroutine to avoid blocking events.
|
||||
go handler.stopIfConnected()
|
||||
}
|
||||
}
|
||||
|
||||
func (nn *netNotifee) Disconnected(_ network.Network, c network.Conn) {
|
||||
ps := (*PeeringService)(nn)
|
||||
|
||||
p := c.RemotePeer()
|
||||
ps.mu.RLock()
|
||||
defer ps.mu.RUnlock()
|
||||
|
||||
if handler, ok := ps.peers[p]; ok {
|
||||
// use a goroutine to avoid blocking events.
|
||||
go handler.startIfDisconnected()
|
||||
}
|
||||
}
|
||||
func (nn *netNotifee) OpenedStream(network.Network, network.Stream) {}
|
||||
func (nn *netNotifee) ClosedStream(network.Network, network.Stream) {}
|
||||
func (nn *netNotifee) Listen(network.Network, multiaddr.Multiaddr) {}
|
||||
func (nn *netNotifee) ListenClose(network.Network, multiaddr.Multiaddr) {}
|
||||
@ -1,172 +0,0 @@
|
||||
package peering
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func newNode(t *testing.T) host.Host {
|
||||
cm, err := connmgr.NewConnManager(1, 100, connmgr.WithGracePeriod(0))
|
||||
require.NoError(t, err)
|
||||
h, err := libp2p.New(
|
||||
libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"),
|
||||
// We'd like to set the connection manager low water to 0, but
|
||||
// that would disable the connection manager.
|
||||
libp2p.ConnectionManager(cm),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
return h
|
||||
}
|
||||
|
||||
func TestPeeringService(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
h1 := newNode(t)
|
||||
ps1 := NewPeeringService(h1)
|
||||
|
||||
h2 := newNode(t)
|
||||
h3 := newNode(t)
|
||||
h4 := newNode(t)
|
||||
|
||||
// peer 1 -> 2
|
||||
ps1.AddPeer(peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()})
|
||||
require.Contains(t, ps1.ListPeers(), peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()})
|
||||
|
||||
// We haven't started so we shouldn't have any peers.
|
||||
require.Never(t, func() bool {
|
||||
return len(h1.Network().Peers()) > 0
|
||||
}, 100*time.Millisecond, 1*time.Second, "expected host 1 to have no peers")
|
||||
|
||||
// Use p4 to take up the one slot we have in the connection manager.
|
||||
for _, h := range []host.Host{h1, h2} {
|
||||
require.NoError(t, h.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}))
|
||||
h.ConnManager().TagPeer(h4.ID(), "sticky-peer", 1000)
|
||||
}
|
||||
|
||||
// Now start.
|
||||
require.NoError(t, ps1.Start())
|
||||
// starting twice is fine.
|
||||
require.NoError(t, ps1.Start())
|
||||
|
||||
// We should eventually connect.
|
||||
t.Logf("waiting for h1 to connect to h2")
|
||||
require.Eventually(t, func() bool {
|
||||
return h1.Network().Connectedness(h2.ID()) == network.Connected
|
||||
}, 30*time.Second, 10*time.Millisecond)
|
||||
|
||||
// Now explicitly connect to h3.
|
||||
t.Logf("waiting for h1's connection to h3 to work")
|
||||
require.NoError(t, h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}))
|
||||
require.Eventually(t, func() bool {
|
||||
return h1.Network().Connectedness(h3.ID()) == network.Connected
|
||||
}, 30*time.Second, 100*time.Millisecond)
|
||||
|
||||
require.Len(t, h1.Network().Peers(), 3)
|
||||
|
||||
// force a disconnect
|
||||
h1.ConnManager().TrimOpenConns(ctx)
|
||||
|
||||
// Should disconnect from h3.
|
||||
t.Logf("waiting for h1's connection to h3 to disconnect")
|
||||
require.Eventually(t, func() bool {
|
||||
return h1.Network().Connectedness(h3.ID()) != network.Connected
|
||||
}, 5*time.Second, 10*time.Millisecond)
|
||||
|
||||
// Should remain connected to p2
|
||||
require.Never(t, func() bool {
|
||||
return h1.Network().Connectedness(h2.ID()) != network.Connected
|
||||
}, 5*time.Second, 1*time.Second)
|
||||
|
||||
// Now force h2 to disconnect (we have an asymmetric peering).
|
||||
conns := h2.Network().ConnsToPeer(h1.ID())
|
||||
require.NotEmpty(t, conns)
|
||||
h2.ConnManager().TrimOpenConns(ctx)
|
||||
|
||||
// All conns to peer should eventually close.
|
||||
t.Logf("waiting for all connections to close")
|
||||
for _, c := range conns {
|
||||
require.Eventually(t, func() bool {
|
||||
s, err := c.NewStream(context.Background())
|
||||
if s != nil {
|
||||
_ = s.Reset()
|
||||
}
|
||||
return err != nil
|
||||
}, 5*time.Second, 10*time.Millisecond)
|
||||
}
|
||||
|
||||
// Should eventually re-connect.
|
||||
require.Eventually(t, func() bool {
|
||||
return h1.Network().Connectedness(h2.ID()) == network.Connected
|
||||
}, 30*time.Second, 1*time.Second)
|
||||
|
||||
// Unprotect 2 from 1.
|
||||
ps1.RemovePeer(h2.ID())
|
||||
require.NotContains(t, ps1.ListPeers(), peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()})
|
||||
|
||||
// Trim connections.
|
||||
h1.ConnManager().TrimOpenConns(ctx)
|
||||
|
||||
// Should disconnect
|
||||
t.Logf("waiting for h1 to disconnect from h2")
|
||||
require.Eventually(t, func() bool {
|
||||
return h1.Network().Connectedness(h2.ID()) != network.Connected
|
||||
}, 5*time.Second, 10*time.Millisecond)
|
||||
|
||||
// Should never reconnect.
|
||||
t.Logf("ensuring h1 is not connected to h2 again")
|
||||
require.Never(t, func() bool {
|
||||
return h1.Network().Connectedness(h2.ID()) == network.Connected
|
||||
}, 20*time.Second, 1*time.Second)
|
||||
|
||||
// Until added back
|
||||
ps1.AddPeer(peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()})
|
||||
require.Contains(t, ps1.ListPeers(), peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()})
|
||||
ps1.AddPeer(peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()})
|
||||
require.Contains(t, ps1.ListPeers(), peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()})
|
||||
t.Logf("wait for h1 to connect to h2 and h3 again")
|
||||
require.Eventually(t, func() bool {
|
||||
return h1.Network().Connectedness(h2.ID()) == network.Connected
|
||||
}, 30*time.Second, 1*time.Second)
|
||||
require.Eventually(t, func() bool {
|
||||
return h1.Network().Connectedness(h3.ID()) == network.Connected
|
||||
}, 30*time.Second, 1*time.Second)
|
||||
|
||||
// Should be able to repeatedly stop.
|
||||
require.NoError(t, ps1.Stop())
|
||||
require.NoError(t, ps1.Stop())
|
||||
|
||||
// Adding and removing should work after stopping.
|
||||
ps1.AddPeer(peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()})
|
||||
require.Contains(t, ps1.ListPeers(), peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()})
|
||||
ps1.RemovePeer(h2.ID())
|
||||
require.NotContains(t, ps1.ListPeers(), peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()})
|
||||
}
|
||||
|
||||
func TestNextBackoff(t *testing.T) {
|
||||
minMaxBackoff := (100 - maxBackoffJitter) / 100 * maxBackoff
|
||||
for x := 0; x < 1000; x++ {
|
||||
ph := peerHandler{nextDelay: time.Second}
|
||||
for min, max := time.Second*3/2, time.Second*5/2; min < minMaxBackoff; min, max = min*3/2, max*5/2 {
|
||||
b := ph.nextBackoff()
|
||||
if b > max || b < min {
|
||||
t.Errorf("expected backoff %s to be between %s and %s", b, min, max)
|
||||
}
|
||||
}
|
||||
for i := 0; i < 100; i++ {
|
||||
b := ph.nextBackoff()
|
||||
if b < minMaxBackoff || b > maxBackoff {
|
||||
t.Fatal("failed to stay within max bounds")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user