diff --git a/hypergraph/hypergraph.go b/hypergraph/hypergraph.go index 702c79f..806be23 100644 --- a/hypergraph/hypergraph.go +++ b/hypergraph/hypergraph.go @@ -135,7 +135,7 @@ func (hg *HypergraphCRDT) SetShutdownContext(ctx context.Context) { go func() { select { case <-hg.shutdownCtx.Done(): - hg.snapshotMgr.publish(nil) + hg.snapshotMgr.close() } }() } diff --git a/hypergraph/snapshot_manager.go b/hypergraph/snapshot_manager.go index 4fd6d1f..3f0c2ac 100644 --- a/hypergraph/snapshot_manager.go +++ b/hypergraph/snapshot_manager.go @@ -170,6 +170,7 @@ type snapshotManager struct { logger *zap.Logger store tries.TreeBackingStore mu sync.Mutex + closed bool // generations holds snapshot generations ordered from newest to oldest. // generations[0] is the current/latest generation. generations []*snapshotGeneration @@ -190,6 +191,10 @@ func (m *snapshotManager) publish(root []byte) { m.mu.Lock() defer m.mu.Unlock() + if m.closed { + return + } + rootHex := "" if len(root) != 0 { rootHex = hex.EncodeToString(root) @@ -287,7 +292,7 @@ func (m *snapshotManager) acquire( m.mu.Lock() defer m.mu.Unlock() - if len(m.generations) == 0 { + if m.closed || len(m.generations) == 0 { m.logger.Warn("no snapshot generations available") return nil } @@ -425,6 +430,38 @@ func (m *snapshotManager) release(handle *snapshotHandle) { } } +// close releases all snapshot generations and their DB snapshots. After close, +// publish and acquire become no-ops. Shard snapshot handles held by active sync +// sessions remain valid (they are self-contained in-memory DBs) and will be +// released when the session ends. +func (m *snapshotManager) close() { + m.mu.Lock() + defer m.mu.Unlock() + + if m.closed { + return + } + m.closed = true + + for _, gen := range m.generations { + for key, handle := range gen.handles { + delete(gen.handles, key) + if handle != nil { + handle.releaseRef(m.logger) + } + } + if gen.dbSnapshot != nil { + if err := gen.dbSnapshot.Close(); err != nil { + m.logger.Warn("failed to close DB snapshot during shutdown", zap.Error(err)) + } + gen.dbSnapshot = nil + } + } + m.generations = nil + + m.logger.Debug("snapshot manager closed") +} + func shardKeyString(sk tries.ShardKey) string { buf := make([]byte, 0, len(sk.L1)+len(sk.L2)) buf = append(buf, sk.L1[:]...) diff --git a/node/consensus/app/app_consensus_engine.go b/node/consensus/app/app_consensus_engine.go index bb17944..866b0cf 100644 --- a/node/consensus/app/app_consensus_engine.go +++ b/node/consensus/app/app_consensus_engine.go @@ -155,7 +155,6 @@ type AppConsensusEngine struct { globalProverRootVerifiedFrame atomic.Uint64 globalProverRootSynced atomic.Bool globalProverSyncInProgress atomic.Bool - lastGlobalFrameHeader *protobufs.GlobalFrameHeader // previous frame for deferred root check // Genesis initialization genesisInitialized atomic.Bool @@ -986,22 +985,24 @@ func (e *AppConsensusEngine) handleGlobalProverRoot( return } - // Defer root check by one frame: when frame N arrives, check frame N-1's - // root. This matches the GlobalConsensusEngine which checks the parent - // frame's root during materialize(N-1), triggered when frame N certifies - // frame N-1. By the time frame N arrives, the master has had time to - // materialize N-2 (triggered when N-1 arrived), so the worker's tree - // should reflect post-materialize(N-2) state — exactly what frame N-1's - // ProverTreeCommitment was computed against. - prevHeader := e.lastGlobalFrameHeader - e.lastGlobalFrameHeader = frame.Header + frameNumber := frame.Header.FrameNumber + expectedProverRoot := frame.Header.ProverTreeCommitment - if prevHeader == nil { + if len(expectedProverRoot) == 0 { return } - frameNumber := prevHeader.FrameNumber - expectedProverRoot := prevHeader.ProverTreeCommitment + // Match the GlobalConsensusEngine's ordering: commit the tree first as a + // standalone step, then extract and verify the prover root. The global + // engine calls Commit(N) at the start of materialize(N) before checking + // the root. We mirror this by committing first, then extracting. + if _, err := e.hypergraph.Commit(frameNumber); err != nil { + e.logger.Warn( + "failed to commit hypergraph for global prover root check", + zap.Uint64("frame_number", frameNumber), + zap.Error(err), + ) + } localRoot, err := e.computeLocalGlobalProverRoot(frameNumber) if err != nil { @@ -1012,11 +1013,11 @@ func (e *AppConsensusEngine) handleGlobalProverRoot( ) e.globalProverRootSynced.Store(false) e.globalProverRootVerifiedFrame.Store(0) - e.performBlockingGlobalHypersync(prevHeader.Prover, expectedProverRoot) + e.performBlockingGlobalHypersync(frame.Header.Prover, expectedProverRoot) return } - if len(localRoot) == 0 || len(expectedProverRoot) == 0 { + if len(localRoot) == 0 { return } @@ -1029,7 +1030,35 @@ func (e *AppConsensusEngine) handleGlobalProverRoot( ) e.globalProverRootSynced.Store(false) e.globalProverRootVerifiedFrame.Store(0) - e.performBlockingGlobalHypersync(prevHeader.Prover, expectedProverRoot) + e.performBlockingGlobalHypersync(frame.Header.Prover, expectedProverRoot) + + // Re-compute local root after sync to verify convergence, matching + // the global engine's post-sync verification pattern. + newLocalRoot, newRootErr := e.computeLocalGlobalProverRoot(frameNumber) + if newRootErr != nil { + e.logger.Warn( + "failed to compute local global prover root after sync", + zap.Uint64("frame_number", frameNumber), + zap.Error(newRootErr), + ) + } else if bytes.Equal(newLocalRoot, expectedProverRoot) { + e.logger.Info( + "global prover root converged after sync", + zap.Uint64("frame_number", frameNumber), + ) + e.globalProverRootSynced.Store(true) + e.globalProverRootVerifiedFrame.Store(frameNumber) + if err := e.proverRegistry.Refresh(); err != nil { + e.logger.Warn("failed to refresh prover registry", zap.Error(err)) + } + } else { + e.logger.Warn( + "global prover root still mismatched after sync", + zap.Uint64("frame_number", frameNumber), + zap.String("expected_root", hex.EncodeToString(expectedProverRoot)), + zap.String("post_sync_root", hex.EncodeToString(newLocalRoot)), + ) + } return } diff --git a/node/consensus/global/coverage_events.go b/node/consensus/global/coverage_events.go index 5d0d1d8..14eba09 100644 --- a/node/consensus/global/coverage_events.go +++ b/node/consensus/global/coverage_events.go @@ -61,7 +61,9 @@ func (e *GlobalConsensusEngine) triggerCoverageCheckAsync( return } + e.coverageWg.Add(1) go func() { + defer e.coverageWg.Done() defer e.coverageCheckInProgress.Store(false) if err := e.checkShardCoverage(frameNumber, frameProver); err != nil { diff --git a/node/consensus/global/event_distributor.go b/node/consensus/global/event_distributor.go index baee81e..e9251b4 100644 --- a/node/consensus/global/event_distributor.go +++ b/node/consensus/global/event_distributor.go @@ -122,6 +122,7 @@ func (e *GlobalConsensusEngine) eventDistributorLoop( // prover allocations in the registry. e.reconcileWorkerAllocations(data.Frame.Header.FrameNumber, self) e.checkExcessPendingJoins(self, data.Frame.Header.FrameNumber) + e.checkAndSubmitSeniorityMerge(self, data.Frame.Header.FrameNumber) e.logAllocationStatusOnly(ctx, data, self, effectiveSeniority) } } @@ -668,47 +669,8 @@ func (e *GlobalConsensusEngine) evaluateForProposals( ) } - // Standalone seniority merge: when no join was proposed this cycle but the - // prover exists with incorrect seniority, submit a seniority merge to fix - // it. This covers the case where all worker slots are filled and no new - // joins are being proposed. - if !joinProposedThisCycle && self != nil { - frameNum := data.Frame.Header.FrameNumber - mergeSeniority := e.estimateSeniorityFromConfig() - - if mergeSeniority > self.Seniority { - lastJoin := e.lastJoinAttemptFrame.Load() - lastMerge := e.lastSeniorityMergeFrame.Load() - joinCooldownOk := lastJoin == 0 || frameNum-lastJoin >= 10 - mergeCooldownOk := lastMerge == 0 || frameNum-lastMerge >= 10 - - if joinCooldownOk && mergeCooldownOk { - frame := e.GetFrame() - if frame != nil { - helpers, peerIds := e.buildMergeHelpers() - err := e.submitSeniorityMerge( - frame, helpers, mergeSeniority, peerIds, - ) - if err != nil { - e.logger.Error( - "could not submit seniority merge", - zap.Error(err), - ) - } else { - e.lastSeniorityMergeFrame.Store(frameNum) - } - } - } else { - e.logger.Debug( - "seniority merge deferred due to cooldown", - zap.Uint64("merge_seniority", mergeSeniority), - zap.Uint64("existing_seniority", self.Seniority), - zap.Uint64("last_join_frame", lastJoin), - zap.Uint64("last_merge_frame", lastMerge), - zap.Uint64("current_frame", frameNum), - ) - } - } + if !joinProposedThisCycle { + e.checkAndSubmitSeniorityMerge(self, data.Frame.Header.FrameNumber) } if len(pendingFilters) != 0 { @@ -1082,12 +1044,23 @@ func (e *GlobalConsensusEngine) collectAllocationSnapshot( decideDescriptors := []provers.ShardDescriptor{} for _, shardInfo := range shards { - resp, err := e.getAppShardsFromProver( - client, - slices.Concat(shardInfo.L1, shardInfo.L2), - ) + shardKey := slices.Concat(shardInfo.L1, shardInfo.L2) + var resp *protobufs.GetAppShardsResponse + var err error + for attempt := 0; attempt < 3; attempt++ { + resp, err = e.getAppShardsFromProver(client, shardKey) + if err == nil { + break + } + e.logger.Debug( + "retrying app shard retrieval", + zap.Int("attempt", attempt+1), + zap.Error(err), + ) + time.Sleep(time.Duration(attempt+1) * 500 * time.Millisecond) + } if err != nil { - e.logger.Debug("could not get app shards from prover", zap.Error(err)) + e.logger.Debug("could not get app shards from prover after retries", zap.Error(err)) return nil, false } @@ -1262,6 +1235,56 @@ func (e *GlobalConsensusEngine) logAllocationStatusOnly( e.logAllocationStatus(snapshot) } +// checkAndSubmitSeniorityMerge submits a seniority merge if the prover exists +// with incorrect seniority and cooldowns have elapsed. This is called both from +// evaluateForProposals (when no join was proposed) and from the "all workers +// allocated" path, ensuring seniority is corrected regardless of allocation state. +func (e *GlobalConsensusEngine) checkAndSubmitSeniorityMerge( + self *typesconsensus.ProverInfo, + frameNumber uint64, +) { + if self == nil { + return + } + + mergeSeniority := e.estimateSeniorityFromConfig() + if mergeSeniority <= self.Seniority { + return + } + + lastJoin := e.lastJoinAttemptFrame.Load() + lastMerge := e.lastSeniorityMergeFrame.Load() + joinCooldownOk := lastJoin == 0 || frameNumber-lastJoin >= 10 + mergeCooldownOk := lastMerge == 0 || frameNumber-lastMerge >= 10 + + if joinCooldownOk && mergeCooldownOk { + frame := e.GetFrame() + if frame != nil { + helpers, peerIds := e.buildMergeHelpers() + err := e.submitSeniorityMerge( + frame, helpers, mergeSeniority, peerIds, + ) + if err != nil { + e.logger.Error( + "could not submit seniority merge", + zap.Error(err), + ) + } else { + e.lastSeniorityMergeFrame.Store(frameNumber) + } + } + } else { + e.logger.Debug( + "seniority merge deferred due to cooldown", + zap.Uint64("merge_seniority", mergeSeniority), + zap.Uint64("existing_seniority", self.Seniority), + zap.Uint64("last_join_frame", lastJoin), + zap.Uint64("last_merge_frame", lastMerge), + zap.Uint64("current_frame", frameNumber), + ) + } +} + func (e *GlobalConsensusEngine) allocationContext() ( *typesconsensus.ProverInfo, uint64, diff --git a/node/consensus/global/global_consensus_engine.go b/node/consensus/global/global_consensus_engine.go index 42ee511..c3b3949 100644 --- a/node/consensus/global/global_consensus_engine.go +++ b/node/consensus/global/global_consensus_engine.go @@ -206,6 +206,7 @@ type GlobalConsensusEngine struct { lastShardActionFrame map[string]uint64 lastShardActionFrameMu sync.Mutex coverageCheckInProgress atomic.Bool + coverageWg sync.WaitGroup peerInfoDigestCache map[string]struct{} peerInfoDigestCacheMu sync.Mutex keyRegistryDigestCache map[string]struct{} @@ -1206,6 +1207,10 @@ func (e *GlobalConsensusEngine) Stop(force bool) <-chan error { } } + // Wait for any in-flight coverage check goroutine to finish before + // returning, so callers can safely close the Pebble DB. + e.coverageWg.Wait() + close(errChan) return errChan } diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index c001385..20533f1 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -95,6 +95,7 @@ type BlossomSub struct { manualReachability atomic.Pointer[bool] p2pConfig config.P2PConfig dht *dht.IpfsDHT + routingDiscovery *routing.RoutingDiscovery coreId uint configDir ConfigDir } @@ -317,6 +318,7 @@ func NewBlossomSubWithHost( peerID := host.ID() bs.dht = kademliaDHT + bs.routingDiscovery = routingDiscovery bs.ps = pubsub bs.peerID = peerID bs.h = host @@ -767,6 +769,7 @@ func NewBlossomSub( peerID := h.ID() bs.dht = kademliaDHT + bs.routingDiscovery = routingDiscovery bs.ps = pubsub bs.peerID = peerID bs.h = h @@ -890,24 +893,60 @@ func (b *BlossomSub) background(ctx context.Context) { func (b *BlossomSub) checkAndReconnectPeers(ctx context.Context) { peerCount := len(b.h.Network().Peers()) - if peerCount > 1 { + if peerCount >= b.p2pConfig.MinBootstrapPeers { return } b.logger.Warn( - "no peers connected, attempting to re-bootstrap and discover", - zap.Duration("check_interval", b.p2pConfig.PeerReconnectCheckInterval), + "low peer count, attempting to re-bootstrap and discover", + zap.Int("current_peers", peerCount), + zap.Int("min_bootstrap_peers", b.p2pConfig.MinBootstrapPeers), ) + // Re-bootstrap the DHT to refresh the routing table. At startup, + // kademliaDHT.Bootstrap() populates the routing table by connecting to + // bootstrap peers. Without calling it again here, the routing table can + // go empty after all peers disconnect, making FindPeers unable to + // discover anyone — leaving the node permanently stuck. + if b.dht != nil { + if err := b.dht.Bootstrap(ctx); err != nil { + b.logger.Error("DHT re-bootstrap failed", zap.Error(err)) + } + } + + // Re-advertise so other peers can find us through the DHT. + if b.routingDiscovery != nil { + util.Advertise( + ctx, + b.routingDiscovery, + getNetworkNamespace(b.p2pConfig.Network), + ) + } + + // Clear peerstore addresses for disconnected peers so we don't keep + // dialing stale/invalid addresses that were added in previous attempts. + for _, p := range b.h.Peerstore().Peers() { + if p == b.h.ID() { + continue + } + if b.h.Network().Connectedness(p) != network.Connected && + b.h.Network().Connectedness(p) != network.Limited { + b.h.Peerstore().ClearAddrs(p) + } + } + if err := b.DiscoverPeers(ctx); err != nil { b.logger.Error("peer reconnect failed", zap.Error(err)) } newCount := len(b.h.Network().Peers()) - if newCount > 1 { + if newCount >= b.p2pConfig.MinBootstrapPeers { b.logger.Info("peer reconnect succeeded", zap.Int("peers", newCount)) } else { - b.logger.Warn("peer reconnect: still no peers found, will retry at next interval") + b.logger.Warn( + "peer reconnect: still low peer count, will retry at next interval", + zap.Int("peers", newCount), + ) } } diff --git a/node/p2p/internal/peer_connector.go b/node/p2p/internal/peer_connector.go index a5d32d7..b91b323 100644 --- a/node/p2p/internal/peer_connector.go +++ b/node/p2p/internal/peer_connector.go @@ -11,6 +11,8 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/p2p/protocol/identify" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" "go.uber.org/zap" ) @@ -84,7 +86,16 @@ func (pc *peerConnector) connectToPeer( return } - pc.host.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.AddressTTL) + routable := ma.FilterAddrs(p.Addrs, func(a ma.Multiaddr) bool { + pub, err := manet.IsPublicAddr(a) + return pub && err == nil + }) + if len(routable) == 0 { + atomic.AddUint32(failure, 1) + return + } + + pc.host.Peerstore().AddAddrs(p.ID, routable, peerstore.AddressTTL) conn, err := pc.host.Network().DialPeer(ctx, p.ID) if err != nil {