mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-03-10 18:57:32 +08:00
* v2.1.0.2 * restore tweaks to simlibp2p * fix: nil ref on size calc * fix: panic should induce shutdown from event_distributor * fix: friendlier initialization that requires less manual kickstarting for test/devnets * fix: fewer available shards than provers should choose shard length * fix: update stored worker registry, improve logging for debug mode * fix: shut the fuck up, peer log * qol: log value should be snake cased * fix:non-archive snap sync issues * fix: separate X448/Decaf448 signed keys, add onion key to registry * fix: overflow arithmetic on frame number comparison * fix: worker registration should be idempotent if inputs are same, otherwise permit updated records * fix: remove global prover state from size calculation * fix: divide by zero case * fix: eager prover * fix: broadcast listener default * qol: diagnostic data for peer authenticator * fix: master/worker connectivity issue in sparse networks tight coupling of peer and workers can sometimes interfere if mesh is sparse, so give workers a pseudoidentity but publish messages with the proper peer key * fix: reorder steps of join creation * fix: join verify frame source + ensure domain is properly padded (unnecessary but good for consistency) * fix: add delegate to protobuf <-> reified join conversion * fix: preempt prover from planning with no workers * fix: use the unallocated workers to generate a proof * qol: underflow causes join fail in first ten frames on test/devnets * qol: small logging tweaks for easier log correlation in debug mode * qol: use fisher-yates shuffle to ensure prover allocations are evenly distributed when scores are equal * qol: separate decisional logic on post-enrollment confirmation into consensus engine, proposer, and worker manager where relevant, refactor out scoring * reuse shard descriptors for both join planning and confirm/reject decisions * fix: add missing interface method and amend test blossomsub to use new peer id basis * fix: only check allocations if they exist * fix: pomw mint proof data needs to be hierarchically under global intrinsic domain * staging temporary state under diagnostics * fix: first phase of distributed lock refactoring * fix: compute intrinsic locking * fix: hypergraph intrinsic locking * fix: token intrinsic locking * fix: update execution engines to support new locking model * fix: adjust tests with new execution shape * fix: weave in lock/unlock semantics to liveness provider * fix lock fallthrough, add missing allocation update * qol: additional logging for diagnostics, also testnet/devnet handling for confirmations * fix: establish grace period on halt scenario to permit recovery * fix: support test/devnet defaults for coverage scenarios * fix: nil ref on consensus halts for non-archive nodes * fix: remove unnecessary prefix from prover ref * add test coverage for fork choice behaviors and replay – once passing, blocker (2) is resolved * fix: no fork replay on repeat for non-archive nodes, snap now behaves correctly * rollup of pre-liveness check lock interactions * ahead of tests, get the protobuf/metrics-related changes out so teams can prepare * add test coverage for distributed lock behaviors – once passing, blocker (3) is resolved * fix: blocker (3) * Dev docs improvements (#445) * Make install deps script more robust * Improve testing instructions * Worker node should stop upon OS SIGINT/SIGTERM signal (#447) * move pebble close to Stop() * move deferred Stop() to Start() * add core id to worker stop log message * create done os signal channel and stop worker upon message to it --------- Co-authored-by: Cassandra Heart <7929478+CassOnMars@users.noreply.github.com> --------- Co-authored-by: Daz <daz_the_corgi@proton.me> Co-authored-by: Black Swan <3999712+blacks1ne@users.noreply.github.com>
214 lines
4.8 KiB
Go
214 lines
4.8 KiB
Go
package blossomsub
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"math/rand"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/libp2p/go-libp2p/core/discovery"
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
)
|
|
|
|
type mockDiscoveryServer struct {
|
|
mx sync.Mutex
|
|
db map[string]map[peer.ID]*discoveryRegistration
|
|
}
|
|
|
|
type discoveryRegistration struct {
|
|
info peer.AddrInfo
|
|
ttl time.Duration
|
|
}
|
|
|
|
func newDiscoveryServer() *mockDiscoveryServer {
|
|
return &mockDiscoveryServer{
|
|
db: make(map[string]map[peer.ID]*discoveryRegistration),
|
|
}
|
|
}
|
|
|
|
func (s *mockDiscoveryServer) Advertise(ns string, info peer.AddrInfo, ttl time.Duration) (time.Duration, error) {
|
|
s.mx.Lock()
|
|
defer s.mx.Unlock()
|
|
|
|
peers, ok := s.db[ns]
|
|
if !ok {
|
|
peers = make(map[peer.ID]*discoveryRegistration)
|
|
s.db[ns] = peers
|
|
}
|
|
peers[info.ID] = &discoveryRegistration{info, ttl}
|
|
return ttl, nil
|
|
}
|
|
|
|
func (s *mockDiscoveryServer) FindPeers(ns string, limit int) (<-chan peer.AddrInfo, error) {
|
|
s.mx.Lock()
|
|
defer s.mx.Unlock()
|
|
|
|
peers, ok := s.db[ns]
|
|
if !ok || len(peers) == 0 {
|
|
emptyCh := make(chan peer.AddrInfo)
|
|
close(emptyCh)
|
|
return emptyCh, nil
|
|
}
|
|
|
|
count := len(peers)
|
|
if count > limit {
|
|
count = limit
|
|
}
|
|
ch := make(chan peer.AddrInfo, count)
|
|
numSent := 0
|
|
for _, reg := range peers {
|
|
if numSent == count {
|
|
break
|
|
}
|
|
numSent++
|
|
ch <- reg.info
|
|
}
|
|
close(ch)
|
|
|
|
return ch, nil
|
|
}
|
|
|
|
func (s *mockDiscoveryServer) hasPeerRecord(ns string, pid peer.ID) bool {
|
|
s.mx.Lock()
|
|
defer s.mx.Unlock()
|
|
|
|
if peers, ok := s.db[ns]; ok {
|
|
_, ok := peers[pid]
|
|
return ok
|
|
}
|
|
return false
|
|
}
|
|
|
|
type mockDiscoveryClient struct {
|
|
host host.Host
|
|
server *mockDiscoveryServer
|
|
}
|
|
|
|
func (d *mockDiscoveryClient) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
|
|
var options discovery.Options
|
|
err := options.Apply(opts...)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return d.server.Advertise(ns, *host.InfoFromHost(d.host), options.Ttl)
|
|
}
|
|
|
|
func (d *mockDiscoveryClient) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
|
|
var options discovery.Options
|
|
err := options.Apply(opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return d.server.FindPeers(ns, options.Limit)
|
|
}
|
|
|
|
type dummyDiscovery struct{}
|
|
|
|
func (d *dummyDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
|
|
return time.Hour, nil
|
|
}
|
|
|
|
func (d *dummyDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
|
|
retCh := make(chan peer.AddrInfo)
|
|
go func() {
|
|
time.Sleep(time.Second)
|
|
close(retCh)
|
|
}()
|
|
return retCh, nil
|
|
}
|
|
|
|
func TestBlossomSubDiscoveryAfterBootstrap(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// Setup Discovery server and pubsub clients
|
|
partitionSize := BlossomSubDlo - 1
|
|
numHosts := partitionSize * 2
|
|
const ttl = 10 * time.Minute
|
|
|
|
bitmask := []byte{0x00, 0x01}
|
|
|
|
server1, server2 := newDiscoveryServer(), newDiscoveryServer()
|
|
discOpts := []discovery.Option{discovery.Limit(numHosts), discovery.TTL(ttl)}
|
|
|
|
// Put the pubsub clients into two partitions
|
|
hosts := getDefaultHosts(t, numHosts)
|
|
psubs := make([]*PubSub, numHosts)
|
|
bitmaskHandlers := make([]*Bitmask, numHosts)
|
|
|
|
for i, h := range hosts {
|
|
s := server1
|
|
if i >= partitionSize {
|
|
s = server2
|
|
}
|
|
disc := &mockDiscoveryClient{h, s}
|
|
ps := getBlossomSub(ctx, h, WithDiscovery(disc, WithDiscoveryOpts(discOpts...)))
|
|
psubs[i] = ps
|
|
handler, _ := ps.Join(bitmask)
|
|
bitmaskHandlers[i] = handler[0]
|
|
}
|
|
|
|
msgs := make([]*Subscription, numHosts)
|
|
for i, th := range bitmaskHandlers {
|
|
subch, err := th.Subscribe()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
msgs[i] = subch
|
|
}
|
|
|
|
// Wait for network to finish forming then join the partitions via discovery
|
|
for _, ps := range psubs {
|
|
waitUntilBlossomSubMeshCount(ps, bitmask, partitionSize-1)
|
|
}
|
|
|
|
for i := 0; i < partitionSize; i++ {
|
|
if _, err := server1.Advertise("blossomsub:"+string(bitmask), *host.InfoFromHost(hosts[i+partitionSize]), ttl); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
// test the mesh
|
|
for i := 0; i < 100; i++ {
|
|
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
|
|
|
|
owner := rand.Intn(numHosts)
|
|
|
|
if err := bitmaskHandlers[owner].Publish(ctx, bitmaskHandlers[owner].bitmask, msg, WithReadiness(MinBitmaskSize(numHosts-4))); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
for _, sub := range msgs {
|
|
got, err := sub.Next(ctx)
|
|
if err != nil {
|
|
t.Fatal(sub.err)
|
|
}
|
|
if !bytes.Equal(msg, got.Data) {
|
|
t.Fatal("got wrong message!")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func waitUntilBlossomSubMeshCount(ps *PubSub, bitmask []byte, count int) {
|
|
done := false
|
|
doneCh := make(chan bool, 1)
|
|
rt := ps.rt.(*BlossomSubRouter)
|
|
for !done {
|
|
ps.eval <- func() {
|
|
doneCh <- len(rt.mesh[string(bitmask)]) == count
|
|
}
|
|
done = <-doneCh
|
|
if !done {
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
}
|
|
}
|