diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index 0fee9ad..620fdf0 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -383,7 +383,7 @@ func (e *DataClockConsensusEngine) Start() <-chan error { } if currentHead.FrameNumber == lastHead.FrameNumber { currentBackoff = min(maxBackoff, currentBackoff+1) - _ = e.pubSub.DiscoverPeers() + _ = e.pubSub.DiscoverPeers(e.ctx) } else { currentBackoff = max(0, currentBackoff-1) lastHead = currentHead diff --git a/node/consensus/data/main_data_loop.go b/node/consensus/data/main_data_loop.go index 6e41286..dfba7b5 100644 --- a/node/consensus/data/main_data_loop.go +++ b/node/consensus/data/main_data_loop.go @@ -93,20 +93,19 @@ func (e *DataClockConsensusEngine) runSync() { case <-e.ctx.Done(): return case enqueuedFrame := <-e.requestSyncCh: + if enqueuedFrame == nil { + var err error + enqueuedFrame, err = e.dataTimeReel.Head() + if err != nil { + panic(err) + } + } + if err := e.pubSub.Bootstrap(e.ctx); err != nil { + e.logger.Error("could not bootstrap", zap.Error(err)) + } if _, err := e.collect(enqueuedFrame); err != nil { e.logger.Error("could not collect", zap.Error(err)) } - case <-time.After(20 * time.Second): - if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) { - continue - } - head, err := e.dataTimeReel.Head() - if err != nil { - panic(err) - } - if _, err := e.collect(head); err != nil { - e.logger.Error("could not collect", zap.Error(err)) - } } } } diff --git a/node/consensus/data/message_handler.go b/node/consensus/data/message_handler.go index 93775cb..542210f 100644 --- a/node/consensus/data/message_handler.go +++ b/node/consensus/data/message_handler.go @@ -339,6 +339,13 @@ func (e *DataClockConsensusEngine) handleDataPeerListAnnounce( } e.peerMapMx.Unlock() + select { + case <-e.ctx.Done(): + return nil + case e.requestSyncCh <- nil: + default: + } + return nil } diff --git a/node/consensus/data/token_handle_mint_test.go b/node/consensus/data/token_handle_mint_test.go index 60af3e2..caa3e27 100644 --- a/node/consensus/data/token_handle_mint_test.go +++ b/node/consensus/data/token_handle_mint_test.go @@ -78,7 +78,8 @@ func (pubsub) GetPeerScore(peerId []byte) int64 { return 0 } func (pubsub) SetPeerScore(peerId []byte, score int64) {} func (pubsub) AddPeerScore(peerId []byte, scoreDelta int64) {} func (pubsub) Reconnect(peerId []byte) error { return nil } -func (pubsub) DiscoverPeers() error { return nil } +func (pubsub) Bootstrap(context.Context) error { return nil } +func (pubsub) DiscoverPeers(context.Context) error { return nil } type outputs struct { difficulty uint32 diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index 818260e..16145db 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -73,6 +73,7 @@ type BlossomSub struct { peerScore map[string]int64 peerScoreMx sync.Mutex network uint8 + bootstrap internal.PeerConnector discovery internal.PeerConnector } @@ -377,6 +378,7 @@ func NewBlossomSub( ), bootstrap, ) + bs.bootstrap = bootstrap discovery := internal.NewPeerConnector( ctx, @@ -756,8 +758,12 @@ func (b *BlossomSub) Reconnect(peerId []byte) error { return nil } -func (b *BlossomSub) DiscoverPeers() error { - return b.discovery.Connect(b.ctx) +func (b *BlossomSub) Bootstrap(ctx context.Context) error { + return b.bootstrap.Connect(ctx) +} + +func (b *BlossomSub) DiscoverPeers(ctx context.Context) error { + return b.discovery.Connect(ctx) } func (b *BlossomSub) GetPeerScore(peerId []byte) int64 { diff --git a/node/p2p/pubsub.go b/node/p2p/pubsub.go index 10278cc..d02acc4 100644 --- a/node/p2p/pubsub.go +++ b/node/p2p/pubsub.go @@ -52,6 +52,7 @@ type PubSub interface { SetPeerScore(peerId []byte, score int64) AddPeerScore(peerId []byte, scoreDelta int64) Reconnect(peerId []byte) error - DiscoverPeers() error + Bootstrap(ctx context.Context) error + DiscoverPeers(ctx context.Context) error GetNetwork() uint }