From db66b3f1184e7c62b5eee8905b17ee59fee1e51e Mon Sep 17 00:00:00 2001 From: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> Date: Fri, 22 Nov 2024 01:00:29 +0100 Subject: [PATCH 1/9] Optimise token request duplicate handling (#375) * Optimise token request duplicate handling * Do not hold lock while applying transitions --- node/consensus/data/consensus_frames.go | 21 +-- .../data/data_clock_consensus_engine.go | 1 + node/consensus/data/message_handler.go | 151 ++++++------------ 3 files changed, 62 insertions(+), 111 deletions(-) diff --git a/node/consensus/data/consensus_frames.go b/node/consensus/data/consensus_frames.go index 81b081b..1ec2a6b 100644 --- a/node/consensus/data/consensus_frames.go +++ b/node/consensus/data/consensus_frames.go @@ -61,7 +61,6 @@ func (e *DataClockConsensusEngine) prove( if e.lastProven >= previousFrame.FrameNumber && e.lastProven != 0 { return previousFrame, nil } - e.stagedTransactionsMx.Lock() executionOutput := &protobufs.IntrinsicExecutionOutput{} _, tries, err := e.clockStore.GetDataClockFrame( e.filter, @@ -78,29 +77,33 @@ func (e *DataClockConsensusEngine) prove( e.logger, ) if err != nil { - e.stagedTransactionsMx.Unlock() return nil, errors.Wrap(err, "prove") } - if e.stagedTransactions == nil { - e.stagedTransactions = &protobufs.TokenRequests{} + e.stagedTransactionsMx.Lock() + stagedTransactions := e.stagedTransactions + if stagedTransactions == nil { + stagedTransactions = &protobufs.TokenRequests{} } + e.stagedTransactions = &protobufs.TokenRequests{ + Requests: make([]*protobufs.TokenRequest, 0, len(stagedTransactions.Requests)), + } + e.stagedTransactionsSet = make(map[string]struct{}, len(e.stagedTransactionsSet)) + e.stagedTransactionsMx.Unlock() e.logger.Info( "proving new frame", - zap.Int("transactions", len(e.stagedTransactions.Requests)), + zap.Int("transactions", len(stagedTransactions.Requests)), ) var validTransactions *protobufs.TokenRequests var invalidTransactions *protobufs.TokenRequests app, validTransactions, invalidTransactions, err = app.ApplyTransitions( previousFrame.FrameNumber+1, - e.stagedTransactions, + stagedTransactions, true, ) if err != nil { - e.stagedTransactions = &protobufs.TokenRequests{} - e.stagedTransactionsMx.Unlock() return nil, errors.Wrap(err, "prove") } @@ -109,8 +112,6 @@ func (e *DataClockConsensusEngine) prove( zap.Int("successful", len(validTransactions.Requests)), zap.Int("failed", len(invalidTransactions.Requests)), ) - e.stagedTransactions = &protobufs.TokenRequests{} - e.stagedTransactionsMx.Unlock() outputState, err := app.MaterializeStateFromApplication() if err != nil { diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index 620fdf0..3c2a5fd 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -116,6 +116,7 @@ type DataClockConsensusEngine struct { engineMx sync.Mutex dependencyMapMx sync.Mutex stagedTransactions *protobufs.TokenRequests + stagedTransactionsSet map[string]struct{} stagedTransactionsMx sync.Mutex peerMapMx sync.RWMutex peerAnnounceMapMx sync.Mutex diff --git a/node/consensus/data/message_handler.go b/node/consensus/data/message_handler.go index 542210f..f77e9da 100644 --- a/node/consensus/data/message_handler.go +++ b/node/consensus/data/message_handler.go @@ -2,12 +2,14 @@ package data import ( "bytes" + "fmt" "time" "github.com/iden3/go-iden3-crypto/poseidon" "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" "go.uber.org/zap" + "golang.org/x/crypto/sha3" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "source.quilibrium.com/quilibrium/monorepo/node/config" @@ -349,116 +351,60 @@ func (e *DataClockConsensusEngine) handleDataPeerListAnnounce( return nil } +func TokenRequestIdentifiers(transition *protobufs.TokenRequest) []string { + switch t := transition.Request.(type) { + case *protobufs.TokenRequest_Transfer: + return []string{fmt.Sprintf("transfer-%x", t.Transfer.OfCoin.Address)} + case *protobufs.TokenRequest_Split: + return []string{fmt.Sprintf("split-%x", t.Split.OfCoin.Address)} + case *protobufs.TokenRequest_Merge: + identifiers := make([]string, len(t.Merge.Coins)) + for i, coin := range t.Merge.Coins { + identifiers[i] = fmt.Sprintf("merge-%x", coin.Address) + } + return identifiers + case *protobufs.TokenRequest_Mint: + if len(t.Mint.Proofs) == 1 { + return []string{fmt.Sprintf("mint-%x", sha3.Sum512(t.Mint.Proofs[0]))} + } + // Large proofs are currently not deduplicated. + return nil + case *protobufs.TokenRequest_Announce: + identifiers := make([]string, len(t.Announce.GetPublicKeySignaturesEd448())) + for i, sig := range t.Announce.GetPublicKeySignaturesEd448() { + identifiers[i] = fmt.Sprintf("announce-%x", sig.PublicKey.KeyValue) + } + return identifiers + case *protobufs.TokenRequest_Join: + return []string{fmt.Sprintf("join-%x", t.Join.GetPublicKeySignatureEd448().PublicKey.KeyValue)} + case *protobufs.TokenRequest_Leave: + return []string{fmt.Sprintf("leave-%x", t.Leave.GetPublicKeySignatureEd448().PublicKey.KeyValue)} + case *protobufs.TokenRequest_Pause: + return []string{fmt.Sprintf("pause-%x", t.Pause.GetPublicKeySignatureEd448().PublicKey.KeyValue)} + case *protobufs.TokenRequest_Resume: + return []string{fmt.Sprintf("resume-%x", t.Resume.GetPublicKeySignatureEd448().PublicKey.KeyValue)} + default: + panic("unhandled transition type") + } +} + func (e *DataClockConsensusEngine) handleTokenRequest( transition *protobufs.TokenRequest, ) error { if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) { + identifiers := TokenRequestIdentifiers(transition) + e.stagedTransactionsMx.Lock() if e.stagedTransactions == nil { e.stagedTransactions = &protobufs.TokenRequests{} + e.stagedTransactionsSet = make(map[string]struct{}) } - found := false - for _, ti := range e.stagedTransactions.Requests { - switch t := ti.Request.(type) { - case *protobufs.TokenRequest_Transfer: - switch r := transition.Request.(type) { - case *protobufs.TokenRequest_Transfer: - if bytes.Equal(r.Transfer.OfCoin.Address, t.Transfer.OfCoin.Address) { - found = true - } - } - case *protobufs.TokenRequest_Split: - switch r := transition.Request.(type) { - case *protobufs.TokenRequest_Split: - if bytes.Equal(r.Split.OfCoin.Address, r.Split.OfCoin.Address) { - found = true - } - } - case *protobufs.TokenRequest_Merge: - switch r := transition.Request.(type) { - case *protobufs.TokenRequest_Merge: - checkmerge: - for i := range t.Merge.Coins { - for j := range r.Merge.Coins { - if bytes.Equal(t.Merge.Coins[i].Address, r.Merge.Coins[j].Address) { - found = true - break checkmerge - } - } - } - } - case *protobufs.TokenRequest_Mint: - switch r := transition.Request.(type) { - case *protobufs.TokenRequest_Mint: - checkmint: - for i := range t.Mint.Proofs { - if len(r.Mint.Proofs) < 2 { - for j := range r.Mint.Proofs { - if bytes.Equal(t.Mint.Proofs[i], r.Mint.Proofs[j]) { - found = true - break checkmint - } - } - } - } - } - case *protobufs.TokenRequest_Announce: - switch r := transition.Request.(type) { - case *protobufs.TokenRequest_Announce: - checkannounce: - for i := range t.Announce.GetPublicKeySignaturesEd448() { - for j := range r.Announce.GetPublicKeySignaturesEd448() { - if bytes.Equal( - t.Announce.GetPublicKeySignaturesEd448()[i].PublicKey.KeyValue, - r.Announce.GetPublicKeySignaturesEd448()[j].PublicKey.KeyValue, - ) { - found = true - break checkannounce - } - } - } - } - case *protobufs.TokenRequest_Join: - switch r := transition.Request.(type) { - case *protobufs.TokenRequest_Join: - if bytes.Equal( - t.Join.GetPublicKeySignatureEd448().PublicKey.KeyValue, - r.Join.GetPublicKeySignatureEd448().PublicKey.KeyValue, - ) { - found = true - } - } - case *protobufs.TokenRequest_Leave: - switch r := transition.Request.(type) { - case *protobufs.TokenRequest_Leave: - if bytes.Equal( - t.Leave.GetPublicKeySignatureEd448().PublicKey.KeyValue, - r.Leave.GetPublicKeySignatureEd448().PublicKey.KeyValue, - ) { - found = true - } - } - case *protobufs.TokenRequest_Pause: - switch r := transition.Request.(type) { - case *protobufs.TokenRequest_Pause: - if bytes.Equal( - t.Pause.GetPublicKeySignatureEd448().PublicKey.KeyValue, - r.Pause.GetPublicKeySignatureEd448().PublicKey.KeyValue, - ) { - found = true - } - } - case *protobufs.TokenRequest_Resume: - switch r := transition.Request.(type) { - case *protobufs.TokenRequest_Resume: - if bytes.Equal( - t.Resume.GetPublicKeySignatureEd448().PublicKey.KeyValue, - r.Resume.GetPublicKeySignatureEd448().PublicKey.KeyValue, - ) { - found = true - } - } + var found bool + for _, identifier := range identifiers { + if _, ok := e.stagedTransactionsSet[identifier]; ok { + found = true + break } } @@ -467,6 +413,9 @@ func (e *DataClockConsensusEngine) handleTokenRequest( e.stagedTransactions.Requests, transition, ) + for _, identifier := range identifiers { + e.stagedTransactionsSet[identifier] = struct{}{} + } } e.stagedTransactionsMx.Unlock() } From af0eded2310ebb3009ab314535cf523069ac5890 Mon Sep 17 00:00:00 2001 From: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> Date: Fri, 22 Nov 2024 01:01:16 +0100 Subject: [PATCH 2/9] Release buffers in edge cases (#374) --- go-libp2p-blossomsub/comm.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/go-libp2p-blossomsub/comm.go b/go-libp2p-blossomsub/comm.go index b5d8c81..9bad2a5 100644 --- a/go-libp2p-blossomsub/comm.go +++ b/go-libp2p-blossomsub/comm.go @@ -77,6 +77,7 @@ func (p *PubSub) handleNewStream(s network.Stream) { return } if len(msgbytes) == 0 { + r.ReleaseMsg(msgbytes) continue } @@ -186,18 +187,18 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, ou s.Reset() log.Debugf("writing message to %s: %s", s.Conn().RemotePeer(), err) s.Close() + pool.Put(buf) return } _, err = s.Write(buf) + pool.Put(buf) if err != nil { s.Reset() log.Debugf("writing message to %s: %s", s.Conn().RemotePeer(), err) s.Close() return } - - pool.Put(buf) case <-ctx.Done(): s.Close() return From 44ccd1487117be6ee5903490bc388e863ed923cf Mon Sep 17 00:00:00 2001 From: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> Date: Fri, 22 Nov 2024 02:32:04 +0100 Subject: [PATCH 3/9] Use buffered channels when applicable (#373) * Use buffered channels when applicable * Do not start additional goroutines for processing * Use context to stop ongoing loops --- node/consensus/data/broadcast_messaging.go | 43 ++++++---- node/consensus/data/consensus_frames.go | 6 +- .../data/data_clock_consensus_engine.go | 43 ++++++---- node/consensus/data/main_data_loop.go | 10 ++- node/consensus/data/message_handler.go | 67 ++++++++-------- node/consensus/master/broadcast_messaging.go | 3 +- .../master/master_clock_consensus_engine.go | 3 +- node/consensus/time/data_time_reel.go | 78 ++++++++++--------- node/consensus/time/data_time_reel_test.go | 12 +-- node/consensus/time/master_time_reel.go | 2 + node/consensus/time/master_time_reel_test.go | 6 +- node/consensus/time/time_reel.go | 4 +- .../token/token_execution_engine.go | 27 +++++-- 13 files changed, 183 insertions(+), 121 deletions(-) diff --git a/node/consensus/data/broadcast_messaging.go b/node/consensus/data/broadcast_messaging.go index 78b1b0a..b48d314 100644 --- a/node/consensus/data/broadcast_messaging.go +++ b/node/consensus/data/broadcast_messaging.go @@ -17,30 +17,39 @@ import ( func (e *DataClockConsensusEngine) handleFrameMessage( message *pb.Message, ) error { - go func() { - e.frameMessageProcessorCh <- message - }() - + select { + case <-e.ctx.Done(): + return e.ctx.Err() + case e.frameMessageProcessorCh <- message: + default: + e.logger.Warn("dropping frame message") + } return nil } func (e *DataClockConsensusEngine) handleTxMessage( message *pb.Message, ) error { - go func() { - e.txMessageProcessorCh <- message - }() - + select { + case <-e.ctx.Done(): + return e.ctx.Err() + case e.txMessageProcessorCh <- message: + default: + e.logger.Warn("dropping tx message") + } return nil } func (e *DataClockConsensusEngine) handleInfoMessage( message *pb.Message, ) error { - go func() { - e.infoMessageProcessorCh <- message - }() - + select { + case <-e.ctx.Done(): + return e.ctx.Err() + case e.infoMessageProcessorCh <- message: + default: + e.logger.Warn("dropping info message") + } return nil } @@ -130,9 +139,13 @@ func (e *DataClockConsensusEngine) insertTxMessage( Seqno: nil, } - go func() { - e.txMessageProcessorCh <- m - }() + select { + case <-e.ctx.Done(): + return e.ctx.Err() + case e.txMessageProcessorCh <- m: + default: + e.logger.Warn("dropping tx message") + } return nil } diff --git a/node/consensus/data/consensus_frames.go b/node/consensus/data/consensus_frames.go index 1ec2a6b..71e6869 100644 --- a/node/consensus/data/consensus_frames.go +++ b/node/consensus/data/consensus_frames.go @@ -7,7 +7,6 @@ import ( "golang.org/x/crypto/sha3" "source.quilibrium.com/quilibrium/monorepo/node/config" - "source.quilibrium.com/quilibrium/monorepo/node/consensus" "source.quilibrium.com/quilibrium/monorepo/node/consensus/data/internal" "source.quilibrium.com/quilibrium/monorepo/node/internal/frametime" @@ -318,7 +317,7 @@ func (e *DataClockConsensusEngine) sync( syncTimeout = defaultSyncTimeout } - for e.GetState() < consensus.EngineStateStopping { + for { ctx, cancel := context.WithTimeout(e.ctx, syncTimeout) response, err := client.GetDataFrame( ctx, @@ -364,11 +363,10 @@ func (e *DataClockConsensusEngine) sync( ); err != nil { return nil, errors.Wrap(err, "sync") } - e.dataTimeReel.Insert(response.ClockFrame, true) + e.dataTimeReel.Insert(e.ctx, response.ClockFrame, true) latest = response.ClockFrame if latest.FrameNumber >= maxFrame { return latest, nil } } - return latest, nil } diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index 3c2a5fd..4ffa4c7 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -257,9 +257,9 @@ func NewDataClockConsensusEngine( masterTimeReel: masterTimeReel, dataTimeReel: dataTimeReel, peerInfoManager: peerInfoManager, - frameMessageProcessorCh: make(chan *pb.Message), - txMessageProcessorCh: make(chan *pb.Message), - infoMessageProcessorCh: make(chan *pb.Message), + frameMessageProcessorCh: make(chan *pb.Message, 65536), + txMessageProcessorCh: make(chan *pb.Message, 65536), + infoMessageProcessorCh: make(chan *pb.Message, 65536), config: cfg, preMidnightMint: map[string]struct{}{}, grpcRateLimiter: NewRateLimiter( @@ -368,16 +368,19 @@ func (e *DataClockConsensusEngine) Start() <-chan error { panic(err) } source := rand.New(rand.NewSource(rand.Int63())) - for e.GetState() < consensus.EngineStateStopping { + for { // Use exponential backoff with jitter in order to avoid hammering the bootstrappers. - time.Sleep( - backoff.FullJitter( - baseDuration<= nextFrame.FrameNumber || nextFrame.FrameNumber == 0 { - time.Sleep(120 * time.Second) + select { + case <-e.ctx.Done(): + return + case <-time.After(2 * time.Minute): + } continue } @@ -485,7 +492,11 @@ func (e *DataClockConsensusEngine) Start() <-chan error { thresholdBeforeConfirming-- } - time.Sleep(120 * time.Second) + select { + case <-e.ctx.Done(): + return + case <-time.After(2 * time.Minute): + } } }() @@ -494,7 +505,11 @@ func (e *DataClockConsensusEngine) Start() <-chan error { go e.runFramePruning() go func() { - time.Sleep(30 * time.Second) + select { + case <-e.ctx.Done(): + return + case <-time.After(30 * time.Second): + } e.logger.Info("checking for snapshots to play forward") if err := e.downloadSnapshot(e.config.DB.Path, e.config.P2P.Network); err != nil { e.logger.Debug("error downloading snapshot", zap.Error(err)) diff --git a/node/consensus/data/main_data_loop.go b/node/consensus/data/main_data_loop.go index 2c15b18..195c9fa 100644 --- a/node/consensus/data/main_data_loop.go +++ b/node/consensus/data/main_data_loop.go @@ -113,14 +113,18 @@ func (e *DataClockConsensusEngine) runSync() { func (e *DataClockConsensusEngine) runLoop() { dataFrameCh := e.dataTimeReel.NewFrameCh() runOnce := true - for e.GetState() < consensus.EngineStateStopping { + for { peerCount := e.pubSub.GetNetworkPeersCount() if peerCount < e.minimumPeersRequired { e.logger.Info( "waiting for minimum peers", zap.Int("peer_count", peerCount), ) - time.Sleep(1 * time.Second) + select { + case <-e.ctx.Done(): + return + case <-time.After(1 * time.Second): + } } else { latestFrame, err := e.dataTimeReel.Head() if err != nil { @@ -205,7 +209,7 @@ func (e *DataClockConsensusEngine) processFrame( return dataFrame } - e.dataTimeReel.Insert(nextFrame, true) + e.dataTimeReel.Insert(e.ctx, nextFrame, true) return nextFrame } else { diff --git a/node/consensus/data/message_handler.go b/node/consensus/data/message_handler.go index f77e9da..f3fe4fc 100644 --- a/node/consensus/data/message_handler.go +++ b/node/consensus/data/message_handler.go @@ -3,6 +3,7 @@ package data import ( "bytes" "fmt" + "sync" "time" "github.com/iden3/go-iden3-crypto/poseidon" @@ -13,14 +14,15 @@ import ( "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "source.quilibrium.com/quilibrium/monorepo/node/config" - "source.quilibrium.com/quilibrium/monorepo/node/consensus" "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token/application" "source.quilibrium.com/quilibrium/monorepo/node/protobufs" ) func (e *DataClockConsensusEngine) runFrameMessageHandler() { - for e.GetState() < consensus.EngineStateStopping { + for { select { + case <-e.ctx.Done(): + return case message := <-e.frameMessageProcessorCh: e.logger.Debug("handling frame message") msg := &protobufs.Message{} @@ -49,26 +51,26 @@ func (e *DataClockConsensusEngine) runFrameMessageHandler() { continue } - go func() { - switch any.TypeUrl { - case protobufs.ClockFrameType: - if err := e.handleClockFrameData( - message.From, - msg.Address, - any, - false, - ); err != nil { - return - } + switch any.TypeUrl { + case protobufs.ClockFrameType: + if err := e.handleClockFrameData( + message.From, + msg.Address, + any, + false, + ); err != nil { + e.logger.Debug("could not handle clock frame data", zap.Error(err)) } - }() + } } } } func (e *DataClockConsensusEngine) runTxMessageHandler() { - for e.GetState() < consensus.EngineStateStopping { + for { select { + case <-e.ctx.Done(): + return case message := <-e.txMessageProcessorCh: e.logger.Debug("handling tx message") msg := &protobufs.Message{} @@ -97,9 +99,12 @@ func (e *DataClockConsensusEngine) runTxMessageHandler() { } if e.frameProverTries[0].Contains(e.provingKeyAddress) { + wg := &sync.WaitGroup{} for name := range e.executionEngines { name := name + wg.Add(1) go func() error { + defer wg.Done() messages, err := e.executionEngines[name].ProcessMessage( application.TOKEN_ADDRESS, msg, @@ -125,18 +130,17 @@ func (e *DataClockConsensusEngine) runTxMessageHandler() { continue } - e.logger.Debug(appMsg.TypeUrl) - switch appMsg.TypeUrl { case protobufs.TokenRequestType: t := &protobufs.TokenRequest{} err := proto.Unmarshal(appMsg.Value, t) if err != nil { + e.logger.Debug("could not unmarshal token request", zap.Error(err)) continue } if err := e.handleTokenRequest(t); err != nil { - continue + e.logger.Debug("could not handle token request", zap.Error(err)) } } } @@ -144,14 +148,17 @@ func (e *DataClockConsensusEngine) runTxMessageHandler() { return nil }() } + wg.Wait() } } } } func (e *DataClockConsensusEngine) runInfoMessageHandler() { - for e.GetState() < consensus.EngineStateStopping { + for { select { + case <-e.ctx.Done(): + return case message := <-e.infoMessageProcessorCh: e.logger.Debug("handling info message") msg := &protobufs.Message{} @@ -180,18 +187,16 @@ func (e *DataClockConsensusEngine) runInfoMessageHandler() { continue } - go func() { - switch any.TypeUrl { - case protobufs.DataPeerListAnnounceType: - if err := e.handleDataPeerListAnnounce( - message.From, - msg.Address, - any, - ); err != nil { - return - } + switch any.TypeUrl { + case protobufs.DataPeerListAnnounceType: + if err := e.handleDataPeerListAnnounce( + message.From, + msg.Address, + any, + ); err != nil { + e.logger.Debug("could not handle data peer list announce", zap.Error(err)) } - }() + } } } } @@ -249,7 +254,7 @@ func (e *DataClockConsensusEngine) handleClockFrame( } if frame.FrameNumber > head.FrameNumber { - e.dataTimeReel.Insert(frame, false) + e.dataTimeReel.Insert(e.ctx, frame, false) } return nil diff --git a/node/consensus/master/broadcast_messaging.go b/node/consensus/master/broadcast_messaging.go index e2fe211..cde9b3f 100644 --- a/node/consensus/master/broadcast_messaging.go +++ b/node/consensus/master/broadcast_messaging.go @@ -2,6 +2,7 @@ package master import ( "bytes" + "context" "encoding/binary" "strings" "time" @@ -154,7 +155,7 @@ func (e *MasterClockConsensusEngine) publishProof( zap.Uint64("frame_number", frame.FrameNumber), ) - e.masterTimeReel.Insert(frame, false) + e.masterTimeReel.Insert(context.TODO(), frame, false) } e.state = consensus.EngineStateCollecting diff --git a/node/consensus/master/master_clock_consensus_engine.go b/node/consensus/master/master_clock_consensus_engine.go index b17acdb..54fa8f7 100644 --- a/node/consensus/master/master_clock_consensus_engine.go +++ b/node/consensus/master/master_clock_consensus_engine.go @@ -2,6 +2,7 @@ package master import ( "bytes" + "context" gcrypto "crypto" "encoding/hex" "math/big" @@ -207,7 +208,7 @@ func (e *MasterClockConsensusEngine) Start() <-chan error { continue } - e.masterTimeReel.Insert(newFrame, false) + e.masterTimeReel.Insert(context.TODO(), newFrame, false) } } }() diff --git a/node/consensus/time/data_time_reel.go b/node/consensus/time/data_time_reel.go index d0ee3f2..f6002ad 100644 --- a/node/consensus/time/data_time_reel.go +++ b/node/consensus/time/data_time_reel.go @@ -2,6 +2,7 @@ package time import ( "bytes" + "context" "encoding/hex" "math/big" "os" @@ -32,7 +33,9 @@ type pendingFrame struct { type DataTimeReel struct { rwMutex sync.RWMutex - running bool + + ctx context.Context + cancel context.CancelFunc filter []byte engineConfig *config.EngineConfig @@ -61,7 +64,6 @@ type DataTimeReel struct { frames chan *pendingFrame newFrameCh chan *protobufs.ClockFrame badFrameCh chan *protobufs.ClockFrame - done chan bool alwaysSend bool restore func() []*tries.RollingFrecencyCritbitTrie } @@ -115,8 +117,10 @@ func NewDataTimeReel( panic(err) } + ctx, cancel := context.WithCancel(context.Background()) return &DataTimeReel{ - running: false, + ctx: ctx, + cancel: cancel, logger: logger, filter: filter, engineConfig: engineConfig, @@ -129,10 +133,9 @@ func NewDataTimeReel( lruFrames: cache, // pending: make(map[uint64][]*pendingFrame), incompleteForks: make(map[uint64][]*pendingFrame), - frames: make(chan *pendingFrame), + frames: make(chan *pendingFrame, 65536), newFrameCh: make(chan *protobufs.ClockFrame), badFrameCh: make(chan *protobufs.ClockFrame), - done: make(chan bool), alwaysSend: alwaysSend, restore: restore, } @@ -172,17 +175,12 @@ func (d *DataTimeReel) Start() error { d.headDistance, err = d.GetDistance(frame) } - d.running = true go d.runLoop() return nil } func (d *DataTimeReel) SetHead(frame *protobufs.ClockFrame) { - if d.running == true { - panic("internal test function should never be called outside of tests") - } - d.head = frame } @@ -193,9 +191,9 @@ func (d *DataTimeReel) Head() (*protobufs.ClockFrame, error) { // Insert enqueues a structurally valid frame into the time reel. If the frame // is the next one in sequence, it advances the reel head forward and emits a // new frame on the new frame channel. -func (d *DataTimeReel) Insert(frame *protobufs.ClockFrame, isSync bool) error { - if !d.running { - return nil +func (d *DataTimeReel) Insert(ctx context.Context, frame *protobufs.ClockFrame, isSync bool) error { + if err := d.ctx.Err(); err != nil { + return err } d.logger.Debug( @@ -222,13 +220,17 @@ func (d *DataTimeReel) Insert(frame *protobufs.ClockFrame, isSync bool) error { d.storePending(selector, parent, distance, frame) if d.head.FrameNumber+1 == frame.FrameNumber { - go func() { - d.frames <- &pendingFrame{ - selector: selector, - parentSelector: parent, - frameNumber: frame.FrameNumber, - } - }() + select { + case <-ctx.Done(): + return ctx.Err() + case <-d.ctx.Done(): + return d.ctx.Err() + case d.frames <- &pendingFrame{ + selector: selector, + parentSelector: parent, + frameNumber: frame.FrameNumber, + }: + } } } @@ -250,7 +252,7 @@ func (d *DataTimeReel) BadFrameCh() <-chan *protobufs.ClockFrame { } func (d *DataTimeReel) Stop() { - d.done <- true + d.cancel() } func (d *DataTimeReel) createGenesisFrame() ( @@ -336,8 +338,10 @@ func (d *DataTimeReel) createGenesisFrame() ( // Main data consensus loop func (d *DataTimeReel) runLoop() { - for d.running { + for { select { + case <-d.ctx.Done(): + return case frame := <-d.frames: rawFrame, err := d.clockStore.GetStagedDataClockFrame( d.filter, @@ -459,9 +463,6 @@ func (d *DataTimeReel) runLoop() { // } // } } - case <-d.done: - d.running = false - return } } } @@ -563,8 +564,7 @@ func (d *DataTimeReel) processPending( for { select { - case <-d.done: - d.running = false + case <-d.ctx.Done(): return default: } @@ -686,14 +686,19 @@ func (d *DataTimeReel) setHead(frame *protobufs.ClockFrame, distance *big.Int) e d.headDistance = distance if d.alwaysSend { - d.newFrameCh <- frame - } - go func() { select { + case <-d.ctx.Done(): + return d.ctx.Err() + case d.newFrameCh <- frame: + } + } else { + select { + case <-d.ctx.Done(): + return d.ctx.Err() case d.newFrameCh <- frame: default: } - }() + } return nil } @@ -992,12 +997,11 @@ func (d *DataTimeReel) forkChoice( d.totalDistance, ) - go func() { - select { - case d.newFrameCh <- frame: - default: - } - }() + select { + case <-d.ctx.Done(): + case d.newFrameCh <- frame: + default: + } } func (d *DataTimeReel) GetTotalDistance() *big.Int { diff --git a/node/consensus/time/data_time_reel_test.go b/node/consensus/time/data_time_reel_test.go index 263e9dc..43cc07d 100644 --- a/node/consensus/time/data_time_reel_test.go +++ b/node/consensus/time/data_time_reel_test.go @@ -2,6 +2,7 @@ package time_test import ( "bytes" + "context" "fmt" "math/rand" "strings" @@ -108,6 +109,7 @@ func generateTestProvers() ( } func TestDataTimeReel(t *testing.T) { + ctx := context.Background() logger, _ := zap.NewDevelopment() db := store.NewInMemKVDB() clockStore := store.NewPebbleClockStore(db, logger) @@ -231,7 +233,7 @@ func TestDataTimeReel(t *testing.T) { i+1, 10, ) - d.Insert(frame, false) + d.Insert(ctx, frame, false) prevBI, _ := frame.GetSelector() prev = prevBI.FillBytes(make([]byte, 32)) } @@ -262,7 +264,7 @@ func TestDataTimeReel(t *testing.T) { } for i := 99; i >= 0; i-- { - err := d.Insert(insertFrames[i], false) + err := d.Insert(ctx, insertFrames[i], false) assert.NoError(t, err) } @@ -284,7 +286,7 @@ func TestDataTimeReel(t *testing.T) { i+1, 10, ) - d.Insert(frame, false) + d.Insert(ctx, frame, false) prevBI, _ := frame.GetSelector() prev = prevBI.FillBytes(make([]byte, 32)) @@ -332,7 +334,7 @@ func TestDataTimeReel(t *testing.T) { } for i := 99; i >= 0; i-- { - err := d.Insert(insertFrames[i], false) + err := d.Insert(ctx, insertFrames[i], false) assert.NoError(t, err) } @@ -395,7 +397,7 @@ func TestDataTimeReel(t *testing.T) { // Someone is honest, but running backwards: for i := 99; i >= 0; i-- { - err := d.Insert(insertFrames[i], false) + err := d.Insert(ctx, insertFrames[i], false) gotime.Sleep(1 * gotime.Second) assert.NoError(t, err) } diff --git a/node/consensus/time/master_time_reel.go b/node/consensus/time/master_time_reel.go index 4b67cad..70ef4d3 100644 --- a/node/consensus/time/master_time_reel.go +++ b/node/consensus/time/master_time_reel.go @@ -1,6 +1,7 @@ package time import ( + "context" "encoding/hex" "errors" "math/big" @@ -120,6 +121,7 @@ func (m *MasterTimeReel) Head() (*protobufs.ClockFrame, error) { // is the next one in sequence, it advances the reel head forward and emits a // new frame on the new frame channel. func (m *MasterTimeReel) Insert( + ctx context.Context, frame *protobufs.ClockFrame, isSync bool, ) error { diff --git a/node/consensus/time/master_time_reel_test.go b/node/consensus/time/master_time_reel_test.go index 62e2d44..6332d3f 100644 --- a/node/consensus/time/master_time_reel_test.go +++ b/node/consensus/time/master_time_reel_test.go @@ -1,6 +1,7 @@ package time_test import ( + "context" "strings" "sync" "testing" @@ -15,6 +16,7 @@ import ( ) func TestMasterTimeReel(t *testing.T) { + ctx := context.Background() logger, _ := zap.NewProduction() db := store.NewInMemKVDB() clockStore := store.NewPebbleClockStore(db, logger) @@ -59,7 +61,7 @@ func TestMasterTimeReel(t *testing.T) { ) assert.NoError(t, err) - err := m.Insert(frame, false) + err := m.Insert(ctx, frame, false) assert.NoError(t, err) } @@ -79,7 +81,7 @@ func TestMasterTimeReel(t *testing.T) { } for i := 99; i >= 0; i-- { - err := m.Insert(insertFrames[i], false) + err := m.Insert(ctx, insertFrames[i], false) assert.NoError(t, err) } diff --git a/node/consensus/time/time_reel.go b/node/consensus/time/time_reel.go index 3f60338..489b892 100644 --- a/node/consensus/time/time_reel.go +++ b/node/consensus/time/time_reel.go @@ -1,13 +1,15 @@ package time import ( + "context" + "source.quilibrium.com/quilibrium/monorepo/node/protobufs" ) type TimeReel interface { Start() error Stop() - Insert(frame *protobufs.ClockFrame, isSync bool) error + Insert(ctx context.Context, frame *protobufs.ClockFrame, isSync bool) error Head() (*protobufs.ClockFrame, error) NewFrameCh() <-chan *protobufs.ClockFrame BadFrameCh() <-chan *protobufs.ClockFrame diff --git a/node/execution/intrinsics/token/token_execution_engine.go b/node/execution/intrinsics/token/token_execution_engine.go index 2ae742c..fee6f6e 100644 --- a/node/execution/intrinsics/token/token_execution_engine.go +++ b/node/execution/intrinsics/token/token_execution_engine.go @@ -2,6 +2,7 @@ package token import ( "bytes" + "context" "crypto" "encoding/binary" "encoding/hex" @@ -80,6 +81,8 @@ func (p PeerSeniorityItem) Priority() uint64 { } type TokenExecutionEngine struct { + ctx context.Context + cancel context.CancelFunc logger *zap.Logger clock *data.DataClockConsensusEngine clockStore store.ClockStore @@ -205,7 +208,10 @@ func NewTokenExecutionEngine( LoadAggregatedSeniorityMap(uint(cfg.P2P.Network)) } + ctx, cancel := context.WithCancel(context.Background()) e := &TokenExecutionEngine{ + ctx: ctx, + cancel: cancel, logger: logger, engineConfig: cfg.Engine, keyManager: keyManager, @@ -364,14 +370,19 @@ func NewTokenExecutionEngine( } // need to wait for peering + waitPeers: for { - gotime.Sleep(30 * gotime.Second) - peerMap := e.pubSub.GetBitmaskPeers() - if peers, ok := peerMap[string( - append([]byte{0x00}, e.intrinsicFilter...), - )]; ok { - if len(peers) >= 3 { - break + select { + case <-e.ctx.Done(): + return + case <-gotime.After(30 * gotime.Second): + peerMap := e.pubSub.GetBitmaskPeers() + if peers, ok := peerMap[string( + append([]byte{0x00}, e.intrinsicFilter...), + )]; ok { + if len(peers) >= 3 { + break waitPeers + } } } } @@ -441,6 +452,8 @@ func (e *TokenExecutionEngine) Start() <-chan error { // Stop implements ExecutionEngine func (e *TokenExecutionEngine) Stop(force bool) <-chan error { + e.cancel() + errChan := make(chan error) go func() { From ca5d0579ea8e9faa6b6e8c81f6c30d91ffa582b6 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Thu, 21 Nov 2024 19:38:21 -0600 Subject: [PATCH 4/9] bump version --- node/config/version.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/node/config/version.go b/node/config/version.go index 391dccd..5abb21b 100644 --- a/node/config/version.go +++ b/node/config/version.go @@ -10,7 +10,7 @@ func GetMinimumVersionCutoff() time.Time { } func GetMinimumVersion() []byte { - return []byte{0x02, 0x00, 0x03} + return []byte{0x02, 0x00, 0x04} } func GetVersion() []byte { @@ -36,9 +36,9 @@ func FormatVersion(version []byte) string { } func GetPatchNumber() byte { - return 0x00 + return 0x01 } func GetRCNumber() byte { - return 0x00 + return 0x01 } From f2a3bd2d8ecde102e52d94375449bf62a618901a Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Fri, 22 Nov 2024 01:52:36 -0600 Subject: [PATCH 5/9] don't try to make workers run if they aren't in ready state --- node/consensus/data/data_clock_consensus_engine.go | 3 +++ node/consensus/data/main_data_loop.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index 4ffa4c7..426efea 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -563,6 +563,9 @@ func (e *DataClockConsensusEngine) PerformTimeProof( }) } } + if len(actives) < 3 { + return []mt.DataBlock{} + } output := make([]mt.DataBlock, len(actives)) e.logger.Info( "creating data shard ring proof", diff --git a/node/consensus/data/main_data_loop.go b/node/consensus/data/main_data_loop.go index 195c9fa..8285e45 100644 --- a/node/consensus/data/main_data_loop.go +++ b/node/consensus/data/main_data_loop.go @@ -292,7 +292,7 @@ func (e *DataClockConsensusEngine) processFrame( outputs := e.PerformTimeProof(latestFrame, latestFrame.Difficulty, ring) if outputs == nil || len(outputs) < 3 { - e.logger.Error("could not successfully build proof, reattempting") + e.logger.Info("workers not yet available for proving") return latestFrame } modulo := len(outputs) From 292383be750c93ba1093b86f31d3fc4afd073e72 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Fri, 22 Nov 2024 01:59:04 -0600 Subject: [PATCH 6/9] don't prune what is already pruned --- node/store/clock.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/store/clock.go b/node/store/clock.go index c9a70f3..035d6cc 100644 --- a/node/store/clock.go +++ b/node/store/clock.go @@ -1058,7 +1058,7 @@ func (p *PebbleClockStore) DeleteDataClockFrameRange( for i := fromFrameNumber; i < toFrameNumber; i++ { frames, err := p.GetStagedDataClockFramesForFrameNumber(filter, i) if err != nil { - return errors.Wrap(err, "delete data clock frame range") + continue } for _, frame := range frames { From 849feddf846808383575d49eb947f2564379b3cc Mon Sep 17 00:00:00 2001 From: Black Swan Date: Fri, 22 Nov 2024 18:29:48 +0200 Subject: [PATCH 7/9] enhance testnet frame pruning (#377) * reduce testnet frame pruning delay to 1 minute * set seniority repair cutoff frame for testnet to 25745 * add log message to confirm frame pruning is enabled --- node/consensus/data/main_data_loop.go | 4 +++- .../intrinsics/token/application/token_handle_mint.go | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/node/consensus/data/main_data_loop.go b/node/consensus/data/main_data_loop.go index 8285e45..b4473fc 100644 --- a/node/consensus/data/main_data_loop.go +++ b/node/consensus/data/main_data_loop.go @@ -58,11 +58,13 @@ func (e *DataClockConsensusEngine) runFramePruning() { return } + e.logger.Info("frame pruning enabled, waiting for delay timeout expiry") + for { select { case <-e.ctx.Done(): return - case <-time.After(1 * time.Hour): + case <-time.After(1 * time.Minute): head, err := e.dataTimeReel.Head() if err != nil { panic(err) diff --git a/node/execution/intrinsics/token/application/token_handle_mint.go b/node/execution/intrinsics/token/application/token_handle_mint.go index 4caf103..c2f7a93 100644 --- a/node/execution/intrinsics/token/application/token_handle_mint.go +++ b/node/execution/intrinsics/token/application/token_handle_mint.go @@ -23,7 +23,7 @@ const PROOF_FRAME_CUTOFF = 1 const PROOF_FRAME_RING_RESET = 5750 const PROOF_FRAME_RING_RESET_2 = 7650 const PROOF_FRAME_RING_RESET_3 = 13369 -const PROOF_FRAME_SENIORITY_REPAIR = 59029 +const PROOF_FRAME_SENIORITY_REPAIR = 25745 func (a *TokenApplication) handleMint( currentFrameNumber uint64, From eaf565c15a1fe241083812fb2db7f0c4add020b4 Mon Sep 17 00:00:00 2001 From: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> Date: Fri, 22 Nov 2024 17:30:33 +0100 Subject: [PATCH 8/9] Fix gossip (#378) * Observe control messages message count * Fix copyRPC * Fix message cache --- dashboards/grafana/BlossomSub.json | 925 +++++++++++++++++++++- go-libp2p-blossomsub/comm.go | 4 +- go-libp2p-blossomsub/mcache.go | 6 +- node/internal/observability/blossomsub.go | 39 + 4 files changed, 964 insertions(+), 10 deletions(-) diff --git a/dashboards/grafana/BlossomSub.json b/dashboards/grafana/BlossomSub.json index 119c283..9bb4532 100644 --- a/dashboards/grafana/BlossomSub.json +++ b/dashboards/grafana/BlossomSub.json @@ -597,6 +597,919 @@ "x": 0, "y": 19 }, + "id": 12, + "panels": [], + "title": "Gossip", + "type": "row" + }, + { + "datasource": { + "default": false, + "type": "prometheus", + "uid": "${datasource}" + }, + "description": "The number of message IDs provided in IWANT control messages sent to remote peers.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 0, + "y": 20 + }, + "id": 17, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "min", + "max", + "mean" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true, + "sortBy": "Name", + "sortDesc": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.99, rate(blossomsub_iwant_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"send\"}[$__rate_interval]))", + "hide": false, + "instant": false, + "legendFormat": "P99", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, rate(blossomsub_iwant_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"send\"}[$__rate_interval]))", + "instant": false, + "legendFormat": "P95", + "range": true, + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.5, rate(blossomsub_iwant_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"send\"}[$__rate_interval]))", + "hide": false, + "instant": false, + "legendFormat": "P50", + "range": true, + "refId": "C" + } + ], + "title": "Sent IWANT message count histogram", + "type": "timeseries" + }, + { + "datasource": { + "default": false, + "type": "prometheus", + "uid": "${datasource}" + }, + "description": "The number of message IDs in IHAVE control messages which have been successfully sent to a remote peer.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 12, + "y": 20 + }, + "id": 14, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "min", + "max", + "mean" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true, + "sortBy": "Name", + "sortDesc": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum by(bitmask, le) (rate(blossomsub_ihave_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"send\"}[$__rate_interval])))", + "instant": false, + "legendFormat": "{{bitmask}} - P95", + "range": true, + "refId": "A" + } + ], + "title": "Sent IHAVE message count histogram", + "transformations": [ + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)", + "renamePattern": "Data Frames Shard 1$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAIAAAAAAAAAAA(.*)", + "renamePattern": "Data Frames Shard 2$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQA(.*)", + "renamePattern": "Data Frames Shard 3$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AACAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)", + "renamePattern": "Data Token Requests Shard 1$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAAAAAAAA(.*)", + "renamePattern": "Data Token Requests Shard 2$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEA(.*)", + "renamePattern": "Data Token Requests Shard 3$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)", + "renamePattern": "Data Peer Announcements Shard 1$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAgAAAAAAAAAAA(.*)", + "renamePattern": "Data Peer Announcements Shard 2$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAA(.*)", + "renamePattern": "Data Peer Announcements Shard 3$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)", + "renamePattern": "Master Frames$1" + } + } + ], + "type": "timeseries" + }, + { + "datasource": { + "default": false, + "type": "prometheus", + "uid": "${datasource}" + }, + "description": "The number of message IDs provided in IWANT control messages received from remote peers.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 0, + "y": 29 + }, + "id": 13, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "min", + "max", + "mean" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true, + "sortBy": "Name", + "sortDesc": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.99, rate(blossomsub_iwant_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"recv\"}[$__rate_interval]))", + "hide": false, + "instant": false, + "legendFormat": "P99", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, rate(blossomsub_iwant_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"recv\"}[$__rate_interval]))", + "instant": false, + "legendFormat": "P95", + "range": true, + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.5, rate(blossomsub_iwant_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"recv\"}[$__rate_interval]))", + "hide": false, + "instant": false, + "legendFormat": "P50", + "range": true, + "refId": "C" + } + ], + "title": "Received IWANT message count histogram", + "type": "timeseries" + }, + { + "datasource": { + "default": false, + "type": "prometheus", + "uid": "${datasource}" + }, + "description": "The number of message IDs in IHAVE control messages which have received from a remote peer.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 12, + "y": 29 + }, + "id": 16, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "min", + "max", + "mean" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true, + "sortBy": "Name", + "sortDesc": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum by(bitmask, le) (rate(blossomsub_ihave_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"recv\"}[$__rate_interval])))", + "instant": false, + "legendFormat": "{{bitmask}} - P95", + "range": true, + "refId": "A" + } + ], + "title": "Received IHAVE message count histogram", + "transformations": [ + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)", + "renamePattern": "Data Frames Shard 1$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAIAAAAAAAAAAA(.*)", + "renamePattern": "Data Frames Shard 2$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQA(.*)", + "renamePattern": "Data Frames Shard 3$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AACAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)", + "renamePattern": "Data Token Requests Shard 1$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAAAAAAAA(.*)", + "renamePattern": "Data Token Requests Shard 2$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEA(.*)", + "renamePattern": "Data Token Requests Shard 3$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)", + "renamePattern": "Data Peer Announcements Shard 1$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAgAAAAAAAAAAA(.*)", + "renamePattern": "Data Peer Announcements Shard 2$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAA(.*)", + "renamePattern": "Data Peer Announcements Shard 3$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)", + "renamePattern": "Master Frames$1" + } + } + ], + "type": "timeseries" + }, + { + "datasource": { + "default": false, + "type": "prometheus", + "uid": "${datasource}" + }, + "description": "The number of message IDs provided in IWANT control messages which have not been sent to a remote peer due to an error (usually a full queue).", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 0, + "y": 38 + }, + "id": 18, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "min", + "max", + "mean" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true, + "sortBy": "Name", + "sortDesc": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.99, rate(blossomsub_iwant_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"drop\"}[$__rate_interval]))", + "hide": false, + "instant": false, + "legendFormat": "P99", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, rate(blossomsub_iwant_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"drop\"}[$__rate_interval]))", + "instant": false, + "legendFormat": "P95", + "range": true, + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.5, rate(blossomsub_iwant_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"drop\"}[$__rate_interval]))", + "hide": false, + "instant": false, + "legendFormat": "P50", + "range": true, + "refId": "C" + } + ], + "title": "Dropped IWANT message count histogram", + "type": "timeseries" + }, + { + "datasource": { + "default": false, + "type": "prometheus", + "uid": "${datasource}" + }, + "description": "The number of message IDs in IHAVE control messages which have not been sent to a remote peer due to an error (usually a full queue).", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 12, + "y": 38 + }, + "id": 15, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "min", + "max", + "mean" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true, + "sortBy": "Name", + "sortDesc": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum by(bitmask, le) (rate(blossomsub_ihave_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"drop\"}[$__rate_interval])))", + "instant": false, + "legendFormat": "{{bitmask}} - P95", + "range": true, + "refId": "A" + } + ], + "title": "Dropped IHAVE message count histogram", + "transformations": [ + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)", + "renamePattern": "Data Frames Shard 1$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAIAAAAAAAAAAA(.*)", + "renamePattern": "Data Frames Shard 2$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQA(.*)", + "renamePattern": "Data Frames Shard 3$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AACAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)", + "renamePattern": "Data Token Requests Shard 1$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAAAAAAAA(.*)", + "renamePattern": "Data Token Requests Shard 2$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEA(.*)", + "renamePattern": "Data Token Requests Shard 3$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)", + "renamePattern": "Data Peer Announcements Shard 1$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAgAAAAAAAAAAA(.*)", + "renamePattern": "Data Peer Announcements Shard 2$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAA(.*)", + "renamePattern": "Data Peer Announcements Shard 3$1" + } + }, + { + "id": "renameByRegex", + "options": { + "regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)", + "renamePattern": "Master Frames$1" + } + } + ], + "type": "timeseries" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 47 + }, "id": 8, "panels": [], "title": "Meshes", @@ -663,7 +1576,7 @@ "h": 9, "w": 24, "x": 0, - "y": 20 + "y": 48 }, "id": 11, "options": { @@ -836,7 +1749,7 @@ "h": 9, "w": 12, "x": 0, - "y": 29 + "y": 57 }, "id": 9, "options": { @@ -1009,7 +1922,7 @@ "h": 9, "w": 12, "x": 12, - "y": 29 + "y": 57 }, "id": 10, "options": { @@ -1126,7 +2039,7 @@ "h": 1, "w": 24, "x": 0, - "y": 38 + "y": 66 }, "id": 6, "panels": [], @@ -1195,7 +2108,7 @@ "h": 9, "w": 24, "x": 0, - "y": 39 + "y": 67 }, "id": 7, "options": { @@ -1338,6 +2251,6 @@ "timezone": "browser", "title": "BlossomSub", "uid": "ee47pcfax962ob", - "version": 29, + "version": 39, "weekStart": "" } \ No newline at end of file diff --git a/go-libp2p-blossomsub/comm.go b/go-libp2p-blossomsub/comm.go index 9bad2a5..8c847d7 100644 --- a/go-libp2p-blossomsub/comm.go +++ b/go-libp2p-blossomsub/comm.go @@ -238,7 +238,7 @@ func rpcWithControl(msgs []*pb.Message, func copyRPC(rpc *RPC) *RPC { res := new(RPC) - copiedRPC := (proto.Clone(rpc.RPC)).(*pb.RPC) - res.RPC = copiedRPC + *res = *rpc + res.RPC = (proto.Clone(rpc.RPC)).(*pb.RPC) return res } diff --git a/go-libp2p-blossomsub/mcache.go b/go-libp2p-blossomsub/mcache.go index 03b3c1a..247d4da 100644 --- a/go-libp2p-blossomsub/mcache.go +++ b/go-libp2p-blossomsub/mcache.go @@ -56,7 +56,9 @@ type CacheEntry struct { func (mc *MessageCache) Put(msg *Message) { mid := mc.msgID(msg) mc.msgs[string(mid)] = msg - mc.history[0] = append(mc.history[0], CacheEntry{mid: mid, bitmask: msg.GetBitmask()}) + for _, bitmask := range SliceBitmask(msg.GetBitmask()) { + mc.history[0] = append(mc.history[0], CacheEntry{mid: mid, bitmask: bitmask}) + } } func (mc *MessageCache) Get(mid []byte) (*Message, bool) { @@ -101,5 +103,5 @@ func (mc *MessageCache) Shift() { for i := len(mc.history) - 2; i >= 0; i-- { mc.history[i+1] = mc.history[i] } - mc.history[0] = nil + mc.history[0] = last[:0] } diff --git a/node/internal/observability/blossomsub.go b/node/internal/observability/blossomsub.go index ee417b3..5404670 100644 --- a/node/internal/observability/blossomsub.go +++ b/node/internal/observability/blossomsub.go @@ -7,6 +7,7 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" "github.com/prometheus/client_golang/prometheus" blossomsub "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub" + "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb" ) const blossomSubNamespace = "blossomsub" @@ -29,6 +30,19 @@ type blossomSubRawTracer struct { sendRPCTotal prometheus.Counter dropRPCTotal prometheus.Counter undeliverableMessageTotal *prometheus.CounterVec + iHaveMessageHistogram *prometheus.HistogramVec + iWantMessageHistogram *prometheus.HistogramVec +} + +func (b *blossomSubRawTracer) observeControl(control *pb.ControlMessage, direction string) { + labels := []string{direction} + for _, iHave := range control.GetIhave() { + labels := append(labels, binaryEncoding.EncodeToString(iHave.GetBitmask())) + b.iHaveMessageHistogram.WithLabelValues(labels...).Observe(float64(len(iHave.GetMessageIDs()))) + } + for _, iWant := range control.GetIwant() { + b.iWantMessageHistogram.WithLabelValues(labels...).Observe(float64(len(iWant.GetMessageIDs()))) + } } var _ blossomsub.RawTracer = (*blossomSubRawTracer)(nil) @@ -91,16 +105,19 @@ func (b *blossomSubRawTracer) ThrottlePeer(p peer.ID) { // RecvRPC implements blossomsub.RawTracer. func (b *blossomSubRawTracer) RecvRPC(rpc *blossomsub.RPC) { b.recvRPCTotal.Inc() + b.observeControl(rpc.GetControl(), "recv") } // SendRPC implements blossomsub.RawTracer. func (b *blossomSubRawTracer) SendRPC(rpc *blossomsub.RPC, p peer.ID) { b.sendRPCTotal.Inc() + b.observeControl(rpc.GetControl(), "send") } // DropRPC implements blossomsub.RawTracer. func (b *blossomSubRawTracer) DropRPC(rpc *blossomsub.RPC, p peer.ID) { b.dropRPCTotal.Inc() + b.observeControl(rpc.GetControl(), "drop") } // UndeliverableMessage implements blossomsub.RawTracer. @@ -127,6 +144,8 @@ func (b *blossomSubRawTracer) Describe(ch chan<- *prometheus.Desc) { b.sendRPCTotal.Describe(ch) b.dropRPCTotal.Describe(ch) b.undeliverableMessageTotal.Describe(ch) + b.iHaveMessageHistogram.Describe(ch) + b.iWantMessageHistogram.Describe(ch) } // Collect implements prometheus.Collector. @@ -146,6 +165,8 @@ func (b *blossomSubRawTracer) Collect(ch chan<- prometheus.Metric) { b.sendRPCTotal.Collect(ch) b.dropRPCTotal.Collect(ch) b.undeliverableMessageTotal.Collect(ch) + b.iHaveMessageHistogram.Collect(ch) + b.iWantMessageHistogram.Collect(ch) } type BlossomSubRawTracer interface { @@ -270,6 +291,24 @@ func NewBlossomSubRawTracer() BlossomSubRawTracer { }, []string{"bitmask"}, ), + iHaveMessageHistogram: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: blossomSubNamespace, + Name: "ihave_messages", + Help: "Histogram of the number of messages in an IHave message.", + Buckets: prometheus.ExponentialBuckets(1, 2, 14), + }, + []string{"direction", "bitmask"}, + ), + iWantMessageHistogram: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: blossomSubNamespace, + Name: "iwant_messages", + Help: "Histogram of the number of messages in an IWant message.", + Buckets: prometheus.ExponentialBuckets(1, 2, 14), + }, + []string{"direction"}, + ), } return b } From 3b754ea4fb31fb75f112e6ba0ef068c30baa03ed Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Fri, 22 Nov 2024 19:36:38 -0600 Subject: [PATCH 9/9] extend test to verify mainnet bug is fixed --- .../blossomsub_matchfn_test.go | 26 +- go-libp2p-blossomsub/blossomsub_test.go | 280 ++++++++++-------- .../validation_builtin_test.go | 2 +- go-libp2p-blossomsub/validation_test.go | 4 +- 4 files changed, 158 insertions(+), 154 deletions(-) diff --git a/go-libp2p-blossomsub/blossomsub_matchfn_test.go b/go-libp2p-blossomsub/blossomsub_matchfn_test.go index 7d85695..239569b 100644 --- a/go-libp2p-blossomsub/blossomsub_matchfn_test.go +++ b/go-libp2p-blossomsub/blossomsub_matchfn_test.go @@ -38,48 +38,34 @@ func TestBlossomSubMatchingFn(t *testing.T) { } // build the mesh - var subs []*Subscription - var bitmasks []*Bitmask + var subs [][]*Subscription + var bitmasks [][]*Bitmask for _, ps := range psubs { b, err := ps.Join([]byte{0x00, 0x80, 0x00, 0x00}) if err != nil { t.Fatal(err) } - bitmasks = append(bitmasks, b...) + bitmasks = append(bitmasks, b) sub, err := ps.Subscribe([]byte{0x00, 0x80, 0x00, 0x00}) if err != nil { t.Fatal(err) } - subs = append(subs, sub...) + subs = append(subs, sub) } time.Sleep(time.Second) // publish a message msg := []byte("message") - bitmasks[0].Publish(ctx, bitmasks[0].bitmask, msg) + bitmasks[0][0].Publish(ctx, bitmasks[0][0].bitmask, msg) assertReceive(t, subs[0], msg) assertReceive(t, subs[1], msg) // Should match via semver over CustomSub name, ignoring the version assertReceive(t, subs[2], msg) // Should match via BlossomSubID_v2 // No message should be received because customsubA and customsubB have different names - ctxTimeout, timeoutCancel := context.WithTimeout(context.Background(), 1*time.Second) - defer timeoutCancel() - received := false - for { - msg, err := subs[3].Next(ctxTimeout) - if err != nil { - break - } - if msg != nil { - received = true - } - } - if received { - t.Fatal("Should not have received a message") - } + assertNeverReceives(t, subs[2], 1*time.Second) } func protocolNameMatch(base protocol.ID) func(protocol.ID) bool { diff --git a/go-libp2p-blossomsub/blossomsub_test.go b/go-libp2p-blossomsub/blossomsub_test.go index deadcef..77f4911 100644 --- a/go-libp2p-blossomsub/blossomsub_test.go +++ b/go-libp2p-blossomsub/blossomsub_test.go @@ -38,7 +38,7 @@ func assertPeerLists(t *testing.T, bitmask []byte, hosts []host.Host, ps *PubSub } } -func checkMessageRouting(t *testing.T, ctx context.Context, bitmasks []*Bitmask, subs []*Subscription) { +func checkMessageRouting(t *testing.T, ctx context.Context, bitmasks []*Bitmask, subs [][]*Subscription) { for _, p := range bitmasks { data := make([]byte, 16) rand.Read(data) @@ -112,24 +112,48 @@ func connectAll(t *testing.T, hosts []host.Host) { } } -func assertReceive(t *testing.T, ch *Subscription, exp []byte) { - select { - case msg := <-ch.ch: - if !bytes.Equal(msg.GetData(), exp) { - t.Fatalf("got wrong message, expected %s but got %s", string(exp), string(msg.GetData())) - } - case <-time.After(time.Second * 5): - t.Logf("%#v\n", ch) +func assertReceive(t *testing.T, ch []*Subscription, exp []byte) { + received := false + var wrong *Message + wg := sync.WaitGroup{} + done, cancel := context.WithCancel(context.TODO()) + wg.Add(len(ch)) + for _, c := range ch { + c := c + go func() { + defer wg.Done() + select { + case msg := <-c.ch: + if !bytes.Equal(msg.GetData(), exp) { + wrong = msg + } else { + received = true + } + cancel() + case <-done.Done(): + case <-time.After(time.Second * 5): + t.Logf("%#v\n", ch) + } + }() + } + + wg.Wait() + if !received { t.Fatal("timed out waiting for message of: ", string(exp)) } + if wrong != nil { + t.Fatalf("got wrong message, expected %s but got %s", string(exp), string(wrong.Data)) + } } -func assertNeverReceives(t *testing.T, ch *Subscription, timeout time.Duration) { - select { - case msg := <-ch.ch: - t.Logf("%#v\n", ch) - t.Fatal("got unexpected message: ", string(msg.GetData())) - case <-time.After(timeout): +func assertNeverReceives(t *testing.T, ch []*Subscription, timeout time.Duration) { + for _, c := range ch { + select { + case msg := <-c.ch: + t.Logf("%#v\n", ch) + t.Fatal("got unexpected message: ", string(msg.GetData())) + case <-time.After(timeout): + } } } @@ -492,21 +516,21 @@ func TestBlossomSubGossip(t *testing.T) { psubs := getBlossomSubs(ctx, hosts) - var msgs []*Subscription - var bitmasks []*Bitmask + var msgs [][]*Subscription + var bitmasks [][]*Bitmask for _, ps := range psubs { - b, err := ps.Join([]byte{0x00, 0x01}) + b, err := ps.Join([]byte{0x00, 0x81}) if err != nil { t.Fatal(err) } - bitmasks = append(bitmasks, b...) - subch, err := ps.Subscribe([]byte{0x00, 0x01}) + bitmasks = append(bitmasks, b) + subch, err := ps.Subscribe([]byte{0x00, 0x81}) if err != nil { t.Fatal(err) } - msgs = append(msgs, subch...) + msgs = append(msgs, subch) } denseConnect(t, hosts) @@ -519,16 +543,10 @@ func TestBlossomSubGossip(t *testing.T) { owner := rand.Intn(len(psubs)) - bitmasks[owner].Publish(ctx, []byte{0x00, 0x01}, msg) + bitmasks[owner][0].Publish(ctx, []byte{0x00, 0x81}, msg) for _, sub := range msgs { - got, err := sub.Next(ctx) - if err != nil { - t.Fatal(sub.err) - } - if !bytes.Equal(msg, got.Data) { - t.Fatal("got wrong message!") - } + assertReceive(t, sub, msg) } // wait a bit to have some gossip interleaved @@ -1063,13 +1081,13 @@ func TestMixedBlossomSub(t *testing.T) { var msgs []*Subscription var bitmasks []*Bitmask for _, ps := range bsubs { - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } bitmasks = append(bitmasks, b...) - subch, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + subch, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -1087,7 +1105,7 @@ func TestMixedBlossomSub(t *testing.T) { owner := rand.Intn(len(bsubs)) - bitmasks[owner].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + bitmasks[owner].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) for _, sub := range msgs { got, err := sub.Next(ctx) @@ -1118,13 +1136,13 @@ func TestBlossomSubMultihops(t *testing.T) { var subs []*Subscription var bitmasks []*Bitmask for i := 1; i < 6; i++ { - b, err := psubs[i].Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := psubs[i].Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } bitmasks = append(bitmasks, b...) - ch, err := psubs[i].Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + ch, err := psubs[i].Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -1135,7 +1153,7 @@ func TestBlossomSubMultihops(t *testing.T) { time.Sleep(time.Second * 2) msg := []byte("i like cats") - err := bitmasks[0].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + err := bitmasks[0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) if err != nil { t.Fatal(err) } @@ -1178,31 +1196,31 @@ func TestBlossomSubTreeTopology(t *testing.T) { [8] -> [9] */ - var chs []*Subscription - var bitmasks []*Bitmask + var chs [][]*Subscription + var bitmasks [][]*Bitmask for _, ps := range psubs { - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - bitmasks = append(bitmasks, b...) - ch, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + bitmasks = append(bitmasks, b) + ch, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - chs = append(chs, ch...) + chs = append(chs, ch) } // wait for heartbeats to build mesh time.Sleep(time.Second * 2) - assertPeerLists(t, []byte{0x00, 0x00, 0x80, 0x00}, hosts, psubs[0], 1, 5) - assertPeerLists(t, []byte{0x00, 0x00, 0x80, 0x00}, hosts, psubs[1], 0, 2, 4) - assertPeerLists(t, []byte{0x00, 0x00, 0x80, 0x00}, hosts, psubs[2], 1, 3) + assertPeerLists(t, []byte{0x00, 0x00, 0x81, 0x00}, hosts, psubs[0], 1, 5) + assertPeerLists(t, []byte{0x00, 0x00, 0x81, 0x00}, hosts, psubs[1], 0, 2, 4) + assertPeerLists(t, []byte{0x00, 0x00, 0x81, 0x00}, hosts, psubs[2], 1, 3) - checkMessageRouting(t, ctx, []*Bitmask{bitmasks[9], bitmasks[3]}, chs) + checkMessageRouting(t, ctx, []*Bitmask{bitmasks[9][0], bitmasks[3][0]}, chs) } // this tests overlay bootstrapping through px in BlossomSub v1.2 @@ -1258,20 +1276,20 @@ func TestBlossomSubStarTopology(t *testing.T) { time.Sleep(time.Second) // build the mesh - var subs []*Subscription + var subs [][]*Subscription var bitmasks []*Bitmask for _, ps := range psubs { - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } bitmasks = append(bitmasks, b...) - sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - subs = append(subs, sub...) + subs = append(subs, sub) } // wait a bit for the mesh to build @@ -1287,7 +1305,7 @@ func TestBlossomSubStarTopology(t *testing.T) { // send a message from each peer and assert it was propagated for i := 0; i < 20; i++ { msg := []byte(fmt.Sprintf("message %d", i)) - bitmasks[i].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + bitmasks[i].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) for _, sub := range subs { assertReceive(t, sub, msg) @@ -1364,20 +1382,20 @@ func TestBlossomSubStarTopologyWithSignedPeerRecords(t *testing.T) { time.Sleep(time.Second) // build the mesh - var subs []*Subscription - var bitmasks []*Bitmask + var subs [][]*Subscription + var bitmasks [][]*Bitmask for _, ps := range psubs { - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - bitmasks = append(bitmasks, b...) - sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + bitmasks = append(bitmasks, b) + sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - subs = append(subs, sub...) + subs = append(subs, sub) } // wait a bit for the mesh to build @@ -1393,7 +1411,7 @@ func TestBlossomSubStarTopologyWithSignedPeerRecords(t *testing.T) { // send a message from each peer and assert it was propagated for i := 0; i < 20; i++ { msg := []byte(fmt.Sprintf("message %d", i)) - bitmasks[i].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + bitmasks[i][0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) for _, sub := range subs { assertReceive(t, sub, msg) @@ -1422,20 +1440,20 @@ func TestBlossomSubDirectPeers(t *testing.T) { } // build the mesh - var subs []*Subscription - var bitmasks []*Bitmask + var subs [][]*Subscription + var bitmasks [][]*Bitmask for _, ps := range psubs { - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - bitmasks = append(bitmasks, b...) - sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + bitmasks = append(bitmasks, b) + sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - subs = append(subs, sub...) + subs = append(subs, sub) } time.Sleep(time.Second) @@ -1443,7 +1461,7 @@ func TestBlossomSubDirectPeers(t *testing.T) { // publish some messages for i := 0; i < 3; i++ { msg := []byte(fmt.Sprintf("message %d", i)) - bitmasks[i].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + bitmasks[i][0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) for _, sub := range subs { assertReceive(t, sub, msg) @@ -1464,7 +1482,7 @@ func TestBlossomSubDirectPeers(t *testing.T) { // publish some messages for i := 0; i < 3; i++ { msg := []byte(fmt.Sprintf("message %d", i+3)) - bitmasks[i].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + bitmasks[i][0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) for _, sub := range subs { assertReceive(t, sub, msg) @@ -1491,34 +1509,34 @@ func TestBlossomSubPeerFilter(t *testing.T) { connect(t, h[0], h[2]) // Join all peers - var subs []*Subscription - var bitmasks []*Bitmask + var subs [][]*Subscription + var bitmasks [][]*Bitmask for _, ps := range psubs { - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - bitmasks = append(bitmasks, b...) - sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + bitmasks = append(bitmasks, b) + sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - subs = append(subs, sub...) + subs = append(subs, sub) } time.Sleep(time.Second) msg := []byte("message") - bitmasks[0].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + bitmasks[0][0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) assertReceive(t, subs[0], msg) assertReceive(t, subs[1], msg) assertNeverReceives(t, subs[2], time.Second) msg = []byte("message2") - bitmasks[1].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + bitmasks[1][0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) assertReceive(t, subs[0], msg) assertReceive(t, subs[1], msg) assertNeverReceives(t, subs[2], time.Second) @@ -1540,25 +1558,25 @@ func TestBlossomSubDirectPeersFanout(t *testing.T) { connect(t, h[0], h[2]) // Join all peers except h2 - var subs []*Subscription - var bitmasks []*Bitmask + var subs [][]*Subscription + var bitmasks [][]*Bitmask for _, ps := range psubs[:2] { - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - bitmasks = append(bitmasks, b...) - sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + bitmasks = append(bitmasks, b) + sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - subs = append(subs, sub...) + subs = append(subs, sub) } time.Sleep(time.Second) - b, err := psubs[2].Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := psubs[2].Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -1566,7 +1584,7 @@ func TestBlossomSubDirectPeersFanout(t *testing.T) { // h2 publishes some messages to build a fanout for i := 0; i < 3; i++ { msg := []byte(fmt.Sprintf("message %d", i)) - b[0].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + b[0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) for _, sub := range subs { assertReceive(t, sub, msg) @@ -1577,7 +1595,7 @@ func TestBlossomSubDirectPeersFanout(t *testing.T) { result := make(chan bool, 2) psubs[2].eval <- func() { rt := psubs[2].rt.(*BlossomSubRouter) - fanout := rt.fanout[string([]byte{0x00, 0x00, 0x80, 0x00})] + fanout := rt.fanout[string([]byte{0x00, 0x00, 0x81, 0x00})] _, ok := fanout[h[0].ID()] result <- ok _, ok = fanout[h[1].ID()] @@ -1595,7 +1613,7 @@ func TestBlossomSubDirectPeersFanout(t *testing.T) { } // now subscribe h2 too and verify tht h0 is in the mesh but not h1 - _, err = psubs[2].Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + _, err = psubs[2].Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -1604,7 +1622,7 @@ func TestBlossomSubDirectPeersFanout(t *testing.T) { psubs[2].eval <- func() { rt := psubs[2].rt.(*BlossomSubRouter) - mesh := rt.mesh[string([]byte{0x00, 0x00, 0x80, 0x00})] + mesh := rt.mesh[string([]byte{0x00, 0x00, 0x81, 0x00})] _, ok := mesh[h[0].ID()] result <- ok _, ok = mesh[h[1].ID()] @@ -1637,20 +1655,20 @@ func TestBlossomSubFloodPublish(t *testing.T) { } // build the (partial, unstable) mesh - var subs []*Subscription - var bitmasks []*Bitmask + var subs [][]*Subscription + var bitmasks [][]*Bitmask for _, ps := range psubs { - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - bitmasks = append(bitmasks, b...) - sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + bitmasks = append(bitmasks, b) + sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - subs = append(subs, sub...) + subs = append(subs, sub) } time.Sleep(time.Second) @@ -1658,7 +1676,7 @@ func TestBlossomSubFloodPublish(t *testing.T) { // send a message from the star and assert it was received for i := 0; i < 20; i++ { msg := []byte(fmt.Sprintf("message %d", i)) - bitmasks[0].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + bitmasks[0][0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) for _, sub := range subs { assertReceive(t, sub, msg) @@ -1674,7 +1692,7 @@ func TestBlossomSubEnoughPeers(t *testing.T) { psubs := getBlossomSubs(ctx, hosts) for _, ps := range psubs { - _, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + _, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -1683,7 +1701,7 @@ func TestBlossomSubEnoughPeers(t *testing.T) { // at this point we have no connections and no mesh, so EnoughPeers should return false res := make(chan bool, 1) psubs[0].eval <- func() { - res <- psubs[0].rt.EnoughPeers([]byte{0x00, 0x00, 0x80, 0x00}, 0) + res <- psubs[0].rt.EnoughPeers([]byte{0x00, 0x00, 0x81, 0x00}, 0) } enough := <-res if enough { @@ -1696,7 +1714,7 @@ func TestBlossomSubEnoughPeers(t *testing.T) { time.Sleep(3 * time.Second) psubs[0].eval <- func() { - res <- psubs[0].rt.EnoughPeers([]byte{0x00, 0x00, 0x80, 0x00}, 0) + res <- psubs[0].rt.EnoughPeers([]byte{0x00, 0x00, 0x81, 0x00}, 0) } enough = <-res if !enough { @@ -1768,13 +1786,13 @@ func TestBlossomSubNegativeScore(t *testing.T) { var subs []*Subscription var bitmasks []*Bitmask for _, ps := range psubs { - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } bitmasks = append(bitmasks, b...) - sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -1785,7 +1803,7 @@ func TestBlossomSubNegativeScore(t *testing.T) { for i := 0; i < 20; i++ { msg := []byte(fmt.Sprintf("message %d", i)) - bitmasks[i%20].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + bitmasks[i%20].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) time.Sleep(20 * time.Millisecond) } @@ -1841,7 +1859,7 @@ func TestBlossomSubScoreValidatorEx(t *testing.T) { DecayInterval: time.Second, DecayToZero: 0.01, Bitmasks: map[string]*BitmaskScoreParams{ - string([]byte{0x00, 0x00, 0x80, 0x00}): { + string([]byte{0x00, 0x00, 0x81, 0x00}): { BitmaskWeight: 1, TimeInMeshQuantum: time.Second, InvalidMessageDeliveriesWeight: -1, @@ -1857,7 +1875,7 @@ func TestBlossomSubScoreValidatorEx(t *testing.T) { connectAll(t, hosts) - err := psubs[0].RegisterBitmaskValidator([]byte{0x00, 0x00, 0x80, 0x00}, func(ctx context.Context, p peer.ID, msg *Message) ValidationResult { + err := psubs[0].RegisterBitmaskValidator([]byte{0x00, 0x00, 0x81, 0x00}, func(ctx context.Context, p peer.ID, msg *Message) ValidationResult { // we ignore host1 and reject host2 if p == hosts[1].ID() { return ValidationIgnore @@ -1872,17 +1890,17 @@ func TestBlossomSubScoreValidatorEx(t *testing.T) { t.Fatal(err) } - sub, err := psubs[0].Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + sub, err := psubs[0].Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - b1, err := psubs[1].Join([]byte{0x00, 0x00, 0x80, 0x00}) + b1, err := psubs[1].Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - b2, err := psubs[2].Join([]byte{0x00, 0x00, 0x80, 0x00}) + b2, err := psubs[2].Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -1899,8 +1917,8 @@ func TestBlossomSubScoreValidatorEx(t *testing.T) { } } - b1[0].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, []byte("i am not a walrus")) - b2[0].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, []byte("i am not a walrus either")) + b1[0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, []byte("i am not a walrus")) + b2[0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, []byte("i am not a walrus either")) // assert no messages expectNoMessage(sub[0]) @@ -1939,7 +1957,7 @@ func TestBlossomSubPiggybackControl(t *testing.T) { res := make(chan *RPC, 1) ps.eval <- func() { gs := ps.rt.(*BlossomSubRouter) - test1 := []byte{0x00, 0x80, 0x00, 0x00} + test1 := []byte{0x00, 0x81, 0x00, 0x00} test2 := []byte{0x00, 0x20, 0x00, 0x00} test3 := []byte{0x00, 0x00, 0x02, 0x00} gs.mesh[string(test1)] = make(map[peer.ID]struct{}) @@ -1961,7 +1979,7 @@ func TestBlossomSubPiggybackControl(t *testing.T) { if len(rpc.Control.Graft) != 1 { t.Fatal("expected 1 GRAFT") } - if !bytes.Equal(rpc.Control.Graft[0].GetBitmask(), []byte{0x00, 0x80, 0x00, 0x00}) { + if !bytes.Equal(rpc.Control.Graft[0].GetBitmask(), []byte{0x00, 0x81, 0x00, 0x00}) { t.Fatal("expected test1 as graft bitmask ID") } if len(rpc.Control.Prune) != 2 { @@ -1985,7 +2003,7 @@ func TestBlossomSubMultipleGraftBitmasks(t *testing.T) { time.Sleep(time.Second * 1) - firstBitmask := []byte{0x00, 0x80, 0x00, 0x00} + firstBitmask := []byte{0x00, 0x81, 0x00, 0x00} secondBitmask := []byte{0x00, 0x20, 0x00, 0x00} thirdBitmask := []byte{0x00, 0x00, 0x02, 0x00} @@ -2058,7 +2076,7 @@ func TestBlossomSubOpportunisticGrafting(t *testing.T) { DecayInterval: time.Second, DecayToZero: 0.01, Bitmasks: map[string]*BitmaskScoreParams{ - string([]byte{0x00, 0x00, 0x80, 0x00}): { + string([]byte{0x00, 0x00, 0x81, 0x00}): { BitmaskWeight: 1, TimeInMeshWeight: 0.0002777, TimeInMeshQuantum: time.Second, @@ -2099,13 +2117,13 @@ func TestBlossomSubOpportunisticGrafting(t *testing.T) { // ask the real pubsus to join the bitmask var bitmasks []*Bitmask for _, ps := range psubs { - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } bitmasks = append(bitmasks, b...) - sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -2123,7 +2141,7 @@ func TestBlossomSubOpportunisticGrafting(t *testing.T) { // publish a bunch of messages from the real hosts for i := 0; i < 1000; i++ { msg := []byte(fmt.Sprintf("message %d", i)) - bitmasks[i%10].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + bitmasks[i%10].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) time.Sleep(20 * time.Millisecond) } @@ -2137,7 +2155,7 @@ func TestBlossomSubOpportunisticGrafting(t *testing.T) { gs := ps.rt.(*BlossomSubRouter) count := 0 for _, h := range hosts[:10] { - _, ok := gs.mesh[string([]byte{0x00, 0x00, 0x80, 0x00})][h.ID()] + _, ok := gs.mesh[string([]byte{0x00, 0x00, 0x81, 0x00})][h.ID()] if ok { count++ } @@ -2167,7 +2185,7 @@ func TestBlossomSubLeaveBitmask(t *testing.T) { // Join all peers var subs []*Subscription for _, ps := range psubs { - sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -2181,9 +2199,9 @@ func TestBlossomSubLeaveBitmask(t *testing.T) { psubs[0].rt.(*BlossomSubRouter).p.eval <- func() { defer close(done) - psubs[0].rt.Leave([]byte{0x00, 0x00, 0x80, 0x00}) + psubs[0].rt.Leave([]byte{0x00, 0x00, 0x81, 0x00}) time.Sleep(time.Second) - peerMap := psubs[0].rt.(*BlossomSubRouter).backoff[string([]byte{0x00, 0x00, 0x80, 0x00})] + peerMap := psubs[0].rt.(*BlossomSubRouter).backoff[string([]byte{0x00, 0x00, 0x81, 0x00})] if len(peerMap) != 1 { t.Fatalf("No peer is populated in the backoff map for peer 0") } @@ -2205,7 +2223,7 @@ func TestBlossomSubLeaveBitmask(t *testing.T) { // for peer 0. psubs[1].rt.(*BlossomSubRouter).p.eval <- func() { defer close(done) - peerMap2 := psubs[1].rt.(*BlossomSubRouter).backoff[string([]byte{0x00, 0x00, 0x80, 0x00})] + peerMap2 := psubs[1].rt.(*BlossomSubRouter).backoff[string([]byte{0x00, 0x00, 0x81, 0x00})] if len(peerMap2) != 1 { t.Fatalf("No peer is populated in the backoff map for peer 1") } @@ -2243,12 +2261,12 @@ func TestBlossomSubJoinBitmask(t *testing.T) { peerMap := make(map[peer.ID]time.Time) peerMap[h[1].ID()] = time.Now().Add(router0.params.UnsubscribeBackoff) - router0.backoff[string([]byte{0x00, 0x00, 0x80, 0x00})] = peerMap + router0.backoff[string([]byte{0x00, 0x00, 0x81, 0x00})] = peerMap // Join all peers var subs []*Subscription for _, ps := range psubs { - sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -2258,7 +2276,7 @@ func TestBlossomSubJoinBitmask(t *testing.T) { time.Sleep(time.Second) router0.meshMx.RLock() - meshMap := router0.mesh[string([]byte{0x00, 0x00, 0x80, 0x00})] + meshMap := router0.mesh[string([]byte{0x00, 0x00, 0x81, 0x00})] router0.meshMx.RUnlock() if len(meshMap) != 1 { t.Fatalf("Unexpect peer included in the mesh") @@ -2287,7 +2305,7 @@ func (sq *sybilSquatter) handleStream(s network.Stream) { r := msgio.NewVarintReaderSize(s, DefaultMaxMessageSize) w := msgio.NewVarintWriter(os) truth := true - bitmask := []byte{0x00, 0x00, 0x80, 0x00} + bitmask := []byte{0x00, 0x00, 0x81, 0x00} msg := &pb.RPC{Subscriptions: []*pb.RPC_SubOpts{{Subscribe: truth, Bitmask: bitmask}}} out, err := proto.Marshal(msg) if err != nil { @@ -2326,7 +2344,7 @@ func TestBlossomSubPeerScoreInspect(t *testing.T) { WithPeerScore( &PeerScoreParams{ Bitmasks: map[string]*BitmaskScoreParams{ - string([]byte{0x00, 0x00, 0x80, 0x00}): { + string([]byte{0x00, 0x00, 0x81, 0x00}): { BitmaskWeight: 1, TimeInMeshQuantum: time.Second, FirstMessageDeliveriesWeight: 1, @@ -2352,13 +2370,13 @@ func TestBlossomSubPeerScoreInspect(t *testing.T) { connect(t, hosts[0], hosts[1]) var bitmasks []*Bitmask for _, ps := range psubs { - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } bitmasks = append(bitmasks, b...) - _, err = ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + _, err = ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -2368,7 +2386,7 @@ func TestBlossomSubPeerScoreInspect(t *testing.T) { for i := 0; i < 20; i++ { msg := []byte(fmt.Sprintf("message %d", i)) - bitmasks[i%2].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + bitmasks[i%2].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) time.Sleep(20 * time.Millisecond) } @@ -2391,7 +2409,7 @@ func TestBlossomSubPeerScoreResetBitmaskParams(t *testing.T) { WithPeerScore( &PeerScoreParams{ Bitmasks: map[string]*BitmaskScoreParams{ - string([]byte{0x00, 0x00, 0x80, 0x00}): { + string([]byte{0x00, 0x00, 0x81, 0x00}): { BitmaskWeight: 1, TimeInMeshQuantum: time.Second, FirstMessageDeliveriesWeight: 1, @@ -2411,7 +2429,7 @@ func TestBlossomSubPeerScoreResetBitmaskParams(t *testing.T) { GraylistThreshold: -1000, })) - bitmask, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + bitmask, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -2462,11 +2480,11 @@ func TestBlossomSubRPCFragmentation(t *testing.T) { connect(t, hosts[0], hosts[1]) // have the real pubsub join the test bitmask - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - _, err = ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + _, err = ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -2480,7 +2498,7 @@ func TestBlossomSubRPCFragmentation(t *testing.T) { for i := 0; i < nMessages; i++ { msg := make([]byte, msgSize) rand.Read(msg) - b[0].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + b[0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) time.Sleep(20 * time.Millisecond) } @@ -2535,7 +2553,7 @@ func (iwe *iwantEverything) handleStream(s network.Stream) { r := msgio.NewVarintReaderSize(s, DefaultMaxMessageSize) w := msgio.NewVarintWriter(os) truth := true - bitmask := []byte{0x00, 0x00, 0x80, 0x00} + bitmask := []byte{0x00, 0x00, 0x81, 0x00} msg := &pb.RPC{Subscriptions: []*pb.RPC_SubOpts{{Subscribe: truth, Bitmask: bitmask}}} out, err := proto.Marshal(msg) @@ -2610,7 +2628,7 @@ func (iwe *iwantEverything) handleStream(s network.Stream) { func TestFragmentRPCFunction(t *testing.T) { p := peer.ID("some-peer") - bitmask := []byte{0x00, 0x00, 0x80, 0x00} + bitmask := []byte{0x00, 0x00, 0x81, 0x00} rpc := &RPC{RPC: new(pb.RPC), from: p} limit := 1024 diff --git a/go-libp2p-blossomsub/validation_builtin_test.go b/go-libp2p-blossomsub/validation_builtin_test.go index 6e6ef15..3ebe11f 100644 --- a/go-libp2p-blossomsub/validation_builtin_test.go +++ b/go-libp2p-blossomsub/validation_builtin_test.go @@ -154,7 +154,7 @@ func TestBasicSeqnoValidatorReplay(t *testing.T) { } for _, sub := range msgs { - assertNeverReceives(t, sub, time.Second) + assertNeverReceives(t, []*Subscription{sub}, time.Second) } } diff --git a/go-libp2p-blossomsub/validation_test.go b/go-libp2p-blossomsub/validation_test.go index 46feb70..178911c 100644 --- a/go-libp2p-blossomsub/validation_test.go +++ b/go-libp2p-blossomsub/validation_test.go @@ -349,13 +349,13 @@ func TestValidateAssortedOptions(t *testing.T) { bitmasks1[i].Publish(ctx, bitmasks1[i].bitmask, msg) for _, sub := range subs1 { - assertReceive(t, sub, msg) + assertReceive(t, []*Subscription{sub}, msg) } msg = []byte(fmt.Sprintf("message2 %d", i)) bitmasks2[i].Publish(ctx, bitmasks2[i].bitmask, msg) for _, sub := range subs2 { - assertReceive(t, sub, msg) + assertReceive(t, []*Subscription{sub}, msg) } } }