diff --git a/go-libp2p-blossomsub/pubsub.go b/go-libp2p-blossomsub/pubsub.go index aa2de5f..f968b5f 100644 --- a/go-libp2p-blossomsub/pubsub.go +++ b/go-libp2p-blossomsub/pubsub.go @@ -26,13 +26,13 @@ import ( logging "github.com/ipfs/go-log/v2" ) -// DefaultMaximumMessageSize is 1 MB. -const DefaultMaxMessageSize = 1 << 20 +// DefaultMaximumMessageSize is 10 MB. +const DefaultMaxMessageSize = 10 << 20 var ( // TimeCacheDuration specifies how long a message ID will be remembered as seen. // Use WithSeenMessagesTTL to configure this per pubsub instance, instead of overriding the global default. - TimeCacheDuration = 120 * time.Second + TimeCacheDuration = 1 * time.Hour // TimeCacheStrategy specifies which type of lookup/cleanup strategy is used by the seen messages cache. // Use WithSeenMessagesStrategy to configure this per pubsub instance, instead of overriding the global default. diff --git a/node/config/version.go b/node/config/version.go index 2ba2001..003cdb4 100644 --- a/node/config/version.go +++ b/node/config/version.go @@ -40,5 +40,5 @@ func GetPatchNumber() byte { } func GetRCNumber() byte { - return 0x03 + return 0x04 } diff --git a/node/consensus/data/consensus_frames.go b/node/consensus/data/consensus_frames.go index dc380a6..579a855 100644 --- a/node/consensus/data/consensus_frames.go +++ b/node/consensus/data/consensus_frames.go @@ -3,7 +3,6 @@ package data import ( "bytes" "context" - "encoding/binary" "time" "golang.org/x/crypto/sha3" @@ -108,41 +107,6 @@ func (e *DataClockConsensusEngine) prove( zap.Int("failed", len(invalidTransactions.Requests)), ) e.stagedTransactions = &protobufs.TokenRequests{} - // reapply failed mints caused by two sends in the same frame - minters := map[string]uint64{} - for _, suc := range validTransactions.Requests { - switch t := suc.Request.(type) { - case *protobufs.TokenRequest_Mint: - if len(t.Mint.Proofs) >= 3 { - minters[string(t.Mint.Signature.PublicKey.KeyValue)] = - binary.BigEndian.Uint64(t.Mint.Proofs[2]) - } - } - } - next := map[string]*protobufs.TokenRequest{} - for _, inv := range invalidTransactions.Requests { - switch t := inv.Request.(type) { - case *protobufs.TokenRequest_Mint: - if t.Mint != nil && t.Mint.Signature != nil && - t.Mint.Signature.PublicKey != nil && - t.Mint.Signature.PublicKey.KeyValue != nil && len(t.Mint.Proofs) >= 3 { - frameNumber := binary.BigEndian.Uint64(t.Mint.Proofs[2]) - if priorFrame, ok := minters[string( - t.Mint.Signature.PublicKey.KeyValue, - )]; ok && priorFrame < frameNumber { - next[string( - t.Mint.Signature.PublicKey.KeyValue, - )] = inv - } - } - } - } - for _, v := range next { - e.stagedTransactions.Requests = append( - e.stagedTransactions.Requests, - v, - ) - } e.stagedTransactionsMx.Unlock() outputState, err := app.MaterializeStateFromApplication() diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index 34e3d95..52c1f77 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -6,7 +6,6 @@ import ( "crypto" "encoding/binary" "fmt" - "math/big" "math/rand" "sync" "time" @@ -71,6 +70,7 @@ type DataClockConsensusEngine struct { config *config.Config logger *zap.Logger state consensus.EngineState + stateMx sync.RWMutex clockStore store.ClockStore coinStore store.CoinStore dataProofStore store.DataProofStore @@ -119,7 +119,6 @@ type DataClockConsensusEngine struct { peerMapMx sync.RWMutex peerAnnounceMapMx sync.Mutex lastKeyBundleAnnouncementFrame uint64 - peerSeniority *peerSeniority peerMap map[string]*peerInfo uncooperativePeersMap map[string]*peerInfo frameMessageProcessorCh chan *pb.Message @@ -128,28 +127,6 @@ type DataClockConsensusEngine struct { report *protobufs.SelfTestReport } -type peerSeniorityItem struct { - seniority uint64 - addr string -} - -type peerSeniority map[string]peerSeniorityItem - -func newFromMap(m map[string]uint64) *peerSeniority { - s := &peerSeniority{} - for k, v := range m { - (*s)[k] = peerSeniorityItem{ - seniority: v, - addr: k, - } - } - return s -} - -func (p peerSeniorityItem) Priority() *big.Int { - return big.NewInt(int64(p.seniority)) -} - var _ consensus.DataConsensusEngine = (*DataClockConsensusEngine)(nil) func NewDataClockConsensusEngine( @@ -169,7 +146,6 @@ func NewDataClockConsensusEngine( report *protobufs.SelfTestReport, filter []byte, seed []byte, - peerSeniority map[string]uint64, ) *DataClockConsensusEngine { if logger == nil { panic(errors.New("logger is nil")) @@ -276,7 +252,6 @@ func NewDataClockConsensusEngine( masterTimeReel: masterTimeReel, dataTimeReel: dataTimeReel, peerInfoManager: peerInfoManager, - peerSeniority: newFromMap(peerSeniority), frameMessageProcessorCh: make(chan *pb.Message), txMessageProcessorCh: make(chan *pb.Message), infoMessageProcessorCh: make(chan *pb.Message), @@ -305,9 +280,13 @@ func NewDataClockConsensusEngine( func (e *DataClockConsensusEngine) Start() <-chan error { e.logger.Info("starting data consensus engine") + e.stateMx.Lock() e.state = consensus.EngineStateStarting + e.stateMx.Unlock() errChan := make(chan error) + e.stateMx.Lock() e.state = consensus.EngineStateLoading + e.stateMx.Unlock() e.logger.Info("loading last seen state") err := e.dataTimeReel.Start() @@ -363,7 +342,9 @@ func (e *DataClockConsensusEngine) Start() <-chan error { } }() + e.stateMx.Lock() e.state = consensus.EngineStateCollecting + e.stateMx.Unlock() go func() { const baseDuration = 2 * time.Minute @@ -374,7 +355,7 @@ func (e *DataClockConsensusEngine) Start() <-chan error { panic(err) } source := rand.New(rand.NewSource(rand.Int63())) - for e.state < consensus.EngineStateStopping { + for e.GetState() < consensus.EngineStateStopping { // Use exponential backoff with jitter in order to avoid hammering the bootstrappers. time.Sleep( backoff.FullJitter( @@ -574,7 +555,7 @@ func (e *DataClockConsensusEngine) Start() <-chan error { var previousTree *mt.MerkleTree - for e.state < consensus.EngineStateStopping { + for e.GetState() < consensus.EngineStateStopping { nextFrame, err := e.dataTimeReel.Head() if err != nil { panic(err) @@ -728,7 +709,9 @@ func (e *DataClockConsensusEngine) PerformTimeProof( func (e *DataClockConsensusEngine) Stop(force bool) <-chan error { e.logger.Info("stopping ceremony consensus engine") + e.stateMx.Lock() e.state = consensus.EngineStateStopping + e.stateMx.Unlock() errChan := make(chan error) msg := []byte("pause") @@ -777,7 +760,9 @@ func (e *DataClockConsensusEngine) Stop(force bool) <-chan error { e.logger.Info("execution engines stopped") e.dataTimeReel.Stop() + e.stateMx.Lock() e.state = consensus.EngineStateStopped + e.stateMx.Unlock() e.engineMx.Lock() defer e.engineMx.Unlock() @@ -801,6 +786,8 @@ func (e *DataClockConsensusEngine) GetFrame() *protobufs.ClockFrame { } func (e *DataClockConsensusEngine) GetState() consensus.EngineState { + e.stateMx.RLock() + defer e.stateMx.RUnlock() return e.state } diff --git a/node/consensus/data/main_data_loop.go b/node/consensus/data/main_data_loop.go index 514c4a7..82ec7b8 100644 --- a/node/consensus/data/main_data_loop.go +++ b/node/consensus/data/main_data_loop.go @@ -40,7 +40,7 @@ func ( func (e *DataClockConsensusEngine) runLoop() { dataFrameCh := e.dataTimeReel.NewFrameCh() - for e.state < consensus.EngineStateStopping { + for e.GetState() < consensus.EngineStateStopping { peerCount := e.pubSub.GetNetworkPeersCount() if peerCount < e.minimumPeersRequired { e.logger.Info( @@ -108,7 +108,9 @@ func (e *DataClockConsensusEngine) processFrame( var nextFrame *protobufs.ClockFrame if nextFrame, err = e.prove(latestFrame); err != nil { e.logger.Error("could not prove", zap.Error(err)) + e.stateMx.Lock() e.state = consensus.EngineStateCollecting + e.stateMx.Unlock() return latestFrame } @@ -116,7 +118,9 @@ func (e *DataClockConsensusEngine) processFrame( if err = e.publishProof(nextFrame); err != nil { e.logger.Error("could not publish", zap.Error(err)) + e.stateMx.Lock() e.state = consensus.EngineStateCollecting + e.stateMx.Unlock() } return nextFrame diff --git a/node/consensus/data/message_handler.go b/node/consensus/data/message_handler.go index e7d8848..e7862f5 100644 --- a/node/consensus/data/message_handler.go +++ b/node/consensus/data/message_handler.go @@ -19,7 +19,7 @@ import ( ) func (e *DataClockConsensusEngine) runFrameMessageHandler() { - for e.state < consensus.EngineStateStopping { + for e.GetState() < consensus.EngineStateStopping { select { case message := <-e.frameMessageProcessorCh: e.logger.Debug("handling frame message") @@ -55,7 +55,7 @@ func (e *DataClockConsensusEngine) runFrameMessageHandler() { } func (e *DataClockConsensusEngine) runTxMessageHandler() { - for e.state < consensus.EngineStateStopping { + for e.GetState() < consensus.EngineStateStopping { select { case message := <-e.txMessageProcessorCh: e.logger.Debug("handling tx message") @@ -127,7 +127,7 @@ func (e *DataClockConsensusEngine) runTxMessageHandler() { } func (e *DataClockConsensusEngine) runInfoMessageHandler() { - for e.state < consensus.EngineStateStopping { + for e.GetState() < consensus.EngineStateStopping { select { case message := <-e.infoMessageProcessorCh: e.logger.Debug("handling info message") diff --git a/node/consensus/data/pre_midnight_proof_worker.go b/node/consensus/data/pre_midnight_proof_worker.go index 026b943..d0161a5 100644 --- a/node/consensus/data/pre_midnight_proof_worker.go +++ b/node/consensus/data/pre_midnight_proof_worker.go @@ -36,7 +36,7 @@ func (e *DataClockConsensusEngine) runPreMidnightProofWorker() { } for { - if e.state < consensus.EngineStateCollecting { + if e.GetState() < consensus.EngineStateCollecting { e.logger.Info("waiting for node to finish starting") time.Sleep(10 * time.Second) continue @@ -95,7 +95,8 @@ func (e *DataClockConsensusEngine) runPreMidnightProofWorker() { time.Sleep(10 * time.Second) } for { - if e.state >= consensus.EngineStateStopping || e.state == consensus.EngineStateStopped { + state := e.GetState() + if state >= consensus.EngineStateStopping || state == consensus.EngineStateStopped { break } _, prfs, err := e.coinStore.GetPreCoinProofsForOwner(addr) diff --git a/node/consensus/data/token_handle_mint_test.go b/node/consensus/data/token_handle_mint_test.go index 25d82c9..269c389 100644 --- a/node/consensus/data/token_handle_mint_test.go +++ b/node/consensus/data/token_handle_mint_test.go @@ -130,7 +130,6 @@ func TestHandlePreMidnightMint(t *testing.T) { masterTimeReel: nil, dataTimeReel: &qtime.DataTimeReel{}, peerInfoManager: nil, - peerSeniority: newFromMap(map[string]uint64{}), frameMessageProcessorCh: make(chan *pb.Message), txMessageProcessorCh: make(chan *pb.Message), infoMessageProcessorCh: make(chan *pb.Message), diff --git a/node/execution/intrinsics/token/token_execution_engine.go b/node/execution/intrinsics/token/token_execution_engine.go index 05faa06..6726d45 100644 --- a/node/execution/intrinsics/token/token_execution_engine.go +++ b/node/execution/intrinsics/token/token_execution_engine.go @@ -212,7 +212,6 @@ func NewTokenExecutionEngine( report, intrinsicFilter, seed, - peerSeniority, ) peerId := e.pubSub.GetPeerID() diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index b29069f..2abfb19 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -398,7 +398,7 @@ func NewBlossomSub( BitmaskScoreCap: 0, IPColocationFactorWeight: 0, IPColocationFactorThreshold: 6, - BehaviourPenaltyWeight: 0, + BehaviourPenaltyWeight: -10, BehaviourPenaltyThreshold: 100, BehaviourPenaltyDecay: .5, DecayInterval: 10 * time.Second,