ceremonyclient/go-libp2p/x/simlibp2p/libp2p.go
Cassandra Heart 53f7c2b5c9
v2.1.0.2 (#442)
* v2.1.0.2

* restore tweaks to simlibp2p

* fix: nil ref on size calc

* fix: panic should induce shutdown from event_distributor

* fix: friendlier initialization that requires less manual kickstarting for test/devnets

* fix: fewer available shards than provers should choose shard length

* fix: update stored worker registry, improve logging for debug mode

* fix: shut the fuck up, peer log

* qol: log value should be snake cased

* fix:non-archive snap sync issues

* fix: separate X448/Decaf448 signed keys, add onion key to registry

* fix: overflow arithmetic on frame number comparison

* fix: worker registration should be idempotent if inputs are same, otherwise permit updated records

* fix: remove global prover state from size calculation

* fix: divide by zero case

* fix: eager prover

* fix: broadcast listener default

* qol: diagnostic data for peer authenticator

* fix: master/worker connectivity issue in sparse networks

tight coupling of peer and workers can sometimes interfere if mesh is sparse, so give workers a pseudoidentity but publish messages with the proper peer key

* fix: reorder steps of join creation

* fix: join verify frame source + ensure domain is properly padded (unnecessary but good for consistency)

* fix: add delegate to protobuf <-> reified join conversion

* fix: preempt prover from planning with no workers

* fix: use the unallocated workers to generate a proof

* qol: underflow causes join fail in first ten frames on test/devnets

* qol: small logging tweaks for easier log correlation in debug mode

* qol: use fisher-yates shuffle to ensure prover allocations are evenly distributed when scores are equal

* qol: separate decisional logic on post-enrollment confirmation into consensus engine, proposer, and worker manager where relevant, refactor out scoring

* reuse shard descriptors for both join planning and confirm/reject decisions

* fix: add missing interface method and amend test blossomsub to use new peer id basis

* fix: only check allocations if they exist

* fix: pomw mint proof data needs to be hierarchically under global intrinsic domain

* staging temporary state under diagnostics

* fix: first phase of distributed lock refactoring

* fix: compute intrinsic locking

* fix: hypergraph intrinsic locking

* fix: token intrinsic locking

* fix: update execution engines to support new locking model

* fix: adjust tests with new execution shape

* fix: weave in lock/unlock semantics to liveness provider

* fix lock fallthrough, add missing allocation update

* qol: additional logging for diagnostics, also testnet/devnet handling for confirmations

* fix: establish grace period on halt scenario to permit recovery

* fix: support test/devnet defaults for coverage scenarios

* fix: nil ref on consensus halts for non-archive nodes

* fix: remove unnecessary prefix from prover ref

* add test coverage for fork choice behaviors and replay – once passing, blocker (2) is resolved

* fix: no fork replay on repeat for non-archive nodes, snap now behaves correctly

* rollup of pre-liveness check lock interactions

* ahead of tests, get the protobuf/metrics-related changes out so teams can prepare

* add test coverage for distributed lock behaviors – once passing, blocker (3) is resolved

* fix: blocker (3)

* Dev docs improvements (#445)

* Make install deps script more robust

* Improve testing instructions

* Worker node should stop upon OS SIGINT/SIGTERM signal (#447)

* move pebble close to Stop()

* move deferred Stop() to Start()

* add core id to worker stop log message

* create done os signal channel and stop worker upon message to it

---------

Co-authored-by: Cassandra Heart <7929478+CassOnMars@users.noreply.github.com>

---------

Co-authored-by: Daz <daz_the_corgi@proton.me>
Co-authored-by: Black Swan <3999712+blacks1ne@users.noreply.github.com>
2025-10-23 01:03:06 -05:00

269 lines
6.9 KiB
Go

package simconnlibp2p
import (
"crypto/rand"
"fmt"
"net"
"sync/atomic"
"testing"
"time"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/config"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
blankhost "github.com/libp2p/go-libp2p/p2p/host/blank"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
"github.com/marcopolo/simnet"
"github.com/multiformats/go-multiaddr"
"github.com/quic-go/quic-go"
"github.com/stretchr/testify/require"
"go.uber.org/fx"
)
func MustNewHost(t *testing.T, opts ...libp2p.Option) host.Host {
t.Helper()
h, err := libp2p.New(opts...)
require.NoError(t, err)
return h
}
type MockSourceIPSelector struct {
ip atomic.Pointer[net.IP]
}
func (m *MockSourceIPSelector) PreferredSourceIPForDestination(_ *net.UDPAddr) (net.IP, error) {
return *m.ip.Load(), nil
}
const OneMbps = 1_000_000
func QUICSimnet(simnet *simnet.Simnet, linkSettings simnet.NodeBiDiLinkSettings, quicReuseOpts ...quicreuse.Option) libp2p.Option {
m := &MockSourceIPSelector{}
quicReuseOpts = append(quicReuseOpts,
quicreuse.OverrideSourceIPSelector(func() (quicreuse.SourceIPSelector, error) {
return m, nil
}),
quicreuse.OverrideListenUDP(func(_ string, address *net.UDPAddr) (net.PacketConn, error) {
m.ip.Store(&address.IP)
c := simnet.NewEndpoint(address, linkSettings)
return c, nil
}))
return libp2p.QUICReuse(
func(l fx.Lifecycle, statelessResetKey quic.StatelessResetKey, tokenKey quic.TokenGeneratorKey, opts ...quicreuse.Option) (*quicreuse.ConnManager, error) {
cm, err := quicreuse.NewConnManager(statelessResetKey, tokenKey, opts...)
if err != nil {
return nil, err
}
l.Append(fx.StopHook(func() error {
// When we pass in our own conn manager, we need to close it manually (??)
// TODO: this seems like a bug
return cm.Close()
}))
return cm, nil
}, quicReuseOpts...)
}
type wrappedHost struct {
blankhost.BlankHost
ps peerstore.Peerstore
quicCM *quicreuse.ConnManager
idService identify.IDService
connMgr *connmgr.BasicConnMgr
}
func (h *wrappedHost) Close() error {
h.BlankHost.Close()
h.ps.Close()
h.quicCM.Close()
h.idService.Close()
h.connMgr.Close()
return nil
}
type BlankHostOpts struct {
ConnMgr *connmgr.BasicConnMgr
listenMultiaddr multiaddr.Multiaddr
simnet *simnet.Simnet
linkSettings simnet.NodeBiDiLinkSettings
quicReuseOpts []quicreuse.Option
}
func newBlankHost(opts BlankHostOpts) (*wrappedHost, error) {
priv, _, err := crypto.GenerateEd448Key(rand.Reader)
if err != nil {
return nil, err
}
id, err := peer.IDFromPrivateKey(priv)
if err != nil {
return nil, err
}
ps, err := pstoremem.NewPeerstore()
if err != nil {
return nil, err
}
ps.AddPrivKey(id, priv)
eb := eventbus.NewBus()
swarm, err := swarm.NewSwarm(id, ps, eb)
if err != nil {
return nil, err
}
statelessResetKey, err := config.PrivKeyToStatelessResetKey(priv)
if err != nil {
return nil, err
}
tokenGeneratorKey, err := config.PrivKeyToTokenGeneratorKey(priv)
if err != nil {
return nil, err
}
m := &MockSourceIPSelector{}
quicReuseOpts := append(opts.quicReuseOpts,
quicreuse.OverrideSourceIPSelector(func() (quicreuse.SourceIPSelector, error) {
return m, nil
}),
quicreuse.OverrideListenUDP(func(_ string, address *net.UDPAddr) (net.PacketConn, error) {
m.ip.Store(&address.IP)
c := opts.simnet.NewEndpoint(address, opts.linkSettings)
return c, nil
}),
)
quicCM, err := quicreuse.NewConnManager(statelessResetKey, tokenGeneratorKey, quicReuseOpts...)
if err != nil {
return nil, err
}
quicTr, err := libp2pquic.NewTransport(priv, quicCM, nil, nil, &network.NullResourceManager{})
if err != nil {
return nil, err
}
err = swarm.AddTransport(quicTr)
if err != nil {
return nil, err
}
err = swarm.Listen(opts.listenMultiaddr)
if err != nil {
return nil, err
}
var cm *connmgr.BasicConnMgr
if opts.ConnMgr == nil {
cm, err = connmgr.NewConnManager(100, 200, connmgr.WithGracePeriod(time.Second*10))
if err != nil {
return nil, err
}
} else {
cm = opts.ConnMgr
}
host := blankhost.NewBlankHost(swarm, blankhost.WithEventBus(eb), blankhost.WithConnectionManager(cm))
idService, err := identify.NewIDService(host)
if err != nil {
return nil, err
}
idService.Start()
return &wrappedHost{
BlankHost: *host,
ps: ps,
quicCM: quicCM,
idService: idService,
connMgr: cm,
}, nil
}
type NodeLinkSettingsAndIndex struct {
LinkSettings simnet.NodeBiDiLinkSettings
Idx int
}
type HostAndIdx struct {
Host host.Host
Idx int
}
type SimpleLibp2pNetworkMeta struct {
Nodes []host.Host
Keys []crypto.PrivKey
AddrToNode map[string]HostAndIdx
}
type NetworkSettings struct {
UseBlankHost bool
QUICReuseOptsForHostIdx func(idx int) []quicreuse.Option
BlankHostOptsForHostIdx func(idx int) BlankHostOpts
OptsForHostIdx func(idx int) []libp2p.Option
}
func SimpleLibp2pNetwork(linkSettings []NodeLinkSettingsAndIndex, networkSettings NetworkSettings) (*simnet.Simnet, *SimpleLibp2pNetworkMeta, error) {
nw := &simnet.Simnet{}
meta := &SimpleLibp2pNetworkMeta{
AddrToNode: make(map[string]HostAndIdx),
}
for _, l := range linkSettings {
idx := l.Idx
ip := simnet.IntToPublicIPv4(idx)
addr := fmt.Sprintf("/ip4/%s/udp/8000/quic-v1", ip)
var h host.Host
var err error
var quicReuseOpts []quicreuse.Option
if networkSettings.QUICReuseOptsForHostIdx != nil {
quicReuseOpts = networkSettings.QUICReuseOptsForHostIdx(idx)
}
privkey, _, err := crypto.GenerateEd448Key(rand.Reader)
if err != nil {
return nil, nil, err
}
if networkSettings.UseBlankHost {
var opts BlankHostOpts
if networkSettings.BlankHostOptsForHostIdx != nil {
opts = networkSettings.BlankHostOptsForHostIdx(idx)
}
a, _ := multiaddr.StringCast(addr)
h, err = newBlankHost(BlankHostOpts{
listenMultiaddr: a,
simnet: nw,
linkSettings: l.LinkSettings,
quicReuseOpts: quicReuseOpts,
ConnMgr: opts.ConnMgr,
})
} else {
h, err = libp2p.New(
append(
[]libp2p.Option{
libp2p.ListenAddrStrings(addr),
QUICSimnet(nw, l.LinkSettings, quicReuseOpts...),
libp2p.ResourceManager(&network.NullResourceManager{}),
libp2p.Identity(privkey),
},
networkSettings.OptsForHostIdx(idx)...,
)...,
)
}
if err != nil {
return nil, nil, err
}
meta.Keys = append(meta.Keys, privkey)
meta.Nodes = append(meta.Nodes, h)
meta.AddrToNode[addr] = HostAndIdx{Host: h, Idx: idx}
}
return nw, meta, nil
}