From 3b754ea4fb31fb75f112e6ba0ef068c30baa03ed Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Fri, 22 Nov 2024 19:36:38 -0600 Subject: [PATCH] extend test to verify mainnet bug is fixed --- .../blossomsub_matchfn_test.go | 26 +- go-libp2p-blossomsub/blossomsub_test.go | 280 ++++++++++-------- .../validation_builtin_test.go | 2 +- go-libp2p-blossomsub/validation_test.go | 4 +- 4 files changed, 158 insertions(+), 154 deletions(-) diff --git a/go-libp2p-blossomsub/blossomsub_matchfn_test.go b/go-libp2p-blossomsub/blossomsub_matchfn_test.go index 7d85695..239569b 100644 --- a/go-libp2p-blossomsub/blossomsub_matchfn_test.go +++ b/go-libp2p-blossomsub/blossomsub_matchfn_test.go @@ -38,48 +38,34 @@ func TestBlossomSubMatchingFn(t *testing.T) { } // build the mesh - var subs []*Subscription - var bitmasks []*Bitmask + var subs [][]*Subscription + var bitmasks [][]*Bitmask for _, ps := range psubs { b, err := ps.Join([]byte{0x00, 0x80, 0x00, 0x00}) if err != nil { t.Fatal(err) } - bitmasks = append(bitmasks, b...) + bitmasks = append(bitmasks, b) sub, err := ps.Subscribe([]byte{0x00, 0x80, 0x00, 0x00}) if err != nil { t.Fatal(err) } - subs = append(subs, sub...) + subs = append(subs, sub) } time.Sleep(time.Second) // publish a message msg := []byte("message") - bitmasks[0].Publish(ctx, bitmasks[0].bitmask, msg) + bitmasks[0][0].Publish(ctx, bitmasks[0][0].bitmask, msg) assertReceive(t, subs[0], msg) assertReceive(t, subs[1], msg) // Should match via semver over CustomSub name, ignoring the version assertReceive(t, subs[2], msg) // Should match via BlossomSubID_v2 // No message should be received because customsubA and customsubB have different names - ctxTimeout, timeoutCancel := context.WithTimeout(context.Background(), 1*time.Second) - defer timeoutCancel() - received := false - for { - msg, err := subs[3].Next(ctxTimeout) - if err != nil { - break - } - if msg != nil { - received = true - } - } - if received { - t.Fatal("Should not have received a message") - } + assertNeverReceives(t, subs[2], 1*time.Second) } func protocolNameMatch(base protocol.ID) func(protocol.ID) bool { diff --git a/go-libp2p-blossomsub/blossomsub_test.go b/go-libp2p-blossomsub/blossomsub_test.go index deadcef..77f4911 100644 --- a/go-libp2p-blossomsub/blossomsub_test.go +++ b/go-libp2p-blossomsub/blossomsub_test.go @@ -38,7 +38,7 @@ func assertPeerLists(t *testing.T, bitmask []byte, hosts []host.Host, ps *PubSub } } -func checkMessageRouting(t *testing.T, ctx context.Context, bitmasks []*Bitmask, subs []*Subscription) { +func checkMessageRouting(t *testing.T, ctx context.Context, bitmasks []*Bitmask, subs [][]*Subscription) { for _, p := range bitmasks { data := make([]byte, 16) rand.Read(data) @@ -112,24 +112,48 @@ func connectAll(t *testing.T, hosts []host.Host) { } } -func assertReceive(t *testing.T, ch *Subscription, exp []byte) { - select { - case msg := <-ch.ch: - if !bytes.Equal(msg.GetData(), exp) { - t.Fatalf("got wrong message, expected %s but got %s", string(exp), string(msg.GetData())) - } - case <-time.After(time.Second * 5): - t.Logf("%#v\n", ch) +func assertReceive(t *testing.T, ch []*Subscription, exp []byte) { + received := false + var wrong *Message + wg := sync.WaitGroup{} + done, cancel := context.WithCancel(context.TODO()) + wg.Add(len(ch)) + for _, c := range ch { + c := c + go func() { + defer wg.Done() + select { + case msg := <-c.ch: + if !bytes.Equal(msg.GetData(), exp) { + wrong = msg + } else { + received = true + } + cancel() + case <-done.Done(): + case <-time.After(time.Second * 5): + t.Logf("%#v\n", ch) + } + }() + } + + wg.Wait() + if !received { t.Fatal("timed out waiting for message of: ", string(exp)) } + if wrong != nil { + t.Fatalf("got wrong message, expected %s but got %s", string(exp), string(wrong.Data)) + } } -func assertNeverReceives(t *testing.T, ch *Subscription, timeout time.Duration) { - select { - case msg := <-ch.ch: - t.Logf("%#v\n", ch) - t.Fatal("got unexpected message: ", string(msg.GetData())) - case <-time.After(timeout): +func assertNeverReceives(t *testing.T, ch []*Subscription, timeout time.Duration) { + for _, c := range ch { + select { + case msg := <-c.ch: + t.Logf("%#v\n", ch) + t.Fatal("got unexpected message: ", string(msg.GetData())) + case <-time.After(timeout): + } } } @@ -492,21 +516,21 @@ func TestBlossomSubGossip(t *testing.T) { psubs := getBlossomSubs(ctx, hosts) - var msgs []*Subscription - var bitmasks []*Bitmask + var msgs [][]*Subscription + var bitmasks [][]*Bitmask for _, ps := range psubs { - b, err := ps.Join([]byte{0x00, 0x01}) + b, err := ps.Join([]byte{0x00, 0x81}) if err != nil { t.Fatal(err) } - bitmasks = append(bitmasks, b...) - subch, err := ps.Subscribe([]byte{0x00, 0x01}) + bitmasks = append(bitmasks, b) + subch, err := ps.Subscribe([]byte{0x00, 0x81}) if err != nil { t.Fatal(err) } - msgs = append(msgs, subch...) + msgs = append(msgs, subch) } denseConnect(t, hosts) @@ -519,16 +543,10 @@ func TestBlossomSubGossip(t *testing.T) { owner := rand.Intn(len(psubs)) - bitmasks[owner].Publish(ctx, []byte{0x00, 0x01}, msg) + bitmasks[owner][0].Publish(ctx, []byte{0x00, 0x81}, msg) for _, sub := range msgs { - got, err := sub.Next(ctx) - if err != nil { - t.Fatal(sub.err) - } - if !bytes.Equal(msg, got.Data) { - t.Fatal("got wrong message!") - } + assertReceive(t, sub, msg) } // wait a bit to have some gossip interleaved @@ -1063,13 +1081,13 @@ func TestMixedBlossomSub(t *testing.T) { var msgs []*Subscription var bitmasks []*Bitmask for _, ps := range bsubs { - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } bitmasks = append(bitmasks, b...) - subch, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + subch, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -1087,7 +1105,7 @@ func TestMixedBlossomSub(t *testing.T) { owner := rand.Intn(len(bsubs)) - bitmasks[owner].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + bitmasks[owner].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) for _, sub := range msgs { got, err := sub.Next(ctx) @@ -1118,13 +1136,13 @@ func TestBlossomSubMultihops(t *testing.T) { var subs []*Subscription var bitmasks []*Bitmask for i := 1; i < 6; i++ { - b, err := psubs[i].Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := psubs[i].Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } bitmasks = append(bitmasks, b...) - ch, err := psubs[i].Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + ch, err := psubs[i].Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -1135,7 +1153,7 @@ func TestBlossomSubMultihops(t *testing.T) { time.Sleep(time.Second * 2) msg := []byte("i like cats") - err := bitmasks[0].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + err := bitmasks[0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) if err != nil { t.Fatal(err) } @@ -1178,31 +1196,31 @@ func TestBlossomSubTreeTopology(t *testing.T) { [8] -> [9] */ - var chs []*Subscription - var bitmasks []*Bitmask + var chs [][]*Subscription + var bitmasks [][]*Bitmask for _, ps := range psubs { - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - bitmasks = append(bitmasks, b...) - ch, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + bitmasks = append(bitmasks, b) + ch, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - chs = append(chs, ch...) + chs = append(chs, ch) } // wait for heartbeats to build mesh time.Sleep(time.Second * 2) - assertPeerLists(t, []byte{0x00, 0x00, 0x80, 0x00}, hosts, psubs[0], 1, 5) - assertPeerLists(t, []byte{0x00, 0x00, 0x80, 0x00}, hosts, psubs[1], 0, 2, 4) - assertPeerLists(t, []byte{0x00, 0x00, 0x80, 0x00}, hosts, psubs[2], 1, 3) + assertPeerLists(t, []byte{0x00, 0x00, 0x81, 0x00}, hosts, psubs[0], 1, 5) + assertPeerLists(t, []byte{0x00, 0x00, 0x81, 0x00}, hosts, psubs[1], 0, 2, 4) + assertPeerLists(t, []byte{0x00, 0x00, 0x81, 0x00}, hosts, psubs[2], 1, 3) - checkMessageRouting(t, ctx, []*Bitmask{bitmasks[9], bitmasks[3]}, chs) + checkMessageRouting(t, ctx, []*Bitmask{bitmasks[9][0], bitmasks[3][0]}, chs) } // this tests overlay bootstrapping through px in BlossomSub v1.2 @@ -1258,20 +1276,20 @@ func TestBlossomSubStarTopology(t *testing.T) { time.Sleep(time.Second) // build the mesh - var subs []*Subscription + var subs [][]*Subscription var bitmasks []*Bitmask for _, ps := range psubs { - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } bitmasks = append(bitmasks, b...) - sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - subs = append(subs, sub...) + subs = append(subs, sub) } // wait a bit for the mesh to build @@ -1287,7 +1305,7 @@ func TestBlossomSubStarTopology(t *testing.T) { // send a message from each peer and assert it was propagated for i := 0; i < 20; i++ { msg := []byte(fmt.Sprintf("message %d", i)) - bitmasks[i].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + bitmasks[i].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) for _, sub := range subs { assertReceive(t, sub, msg) @@ -1364,20 +1382,20 @@ func TestBlossomSubStarTopologyWithSignedPeerRecords(t *testing.T) { time.Sleep(time.Second) // build the mesh - var subs []*Subscription - var bitmasks []*Bitmask + var subs [][]*Subscription + var bitmasks [][]*Bitmask for _, ps := range psubs { - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - bitmasks = append(bitmasks, b...) - sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + bitmasks = append(bitmasks, b) + sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - subs = append(subs, sub...) + subs = append(subs, sub) } // wait a bit for the mesh to build @@ -1393,7 +1411,7 @@ func TestBlossomSubStarTopologyWithSignedPeerRecords(t *testing.T) { // send a message from each peer and assert it was propagated for i := 0; i < 20; i++ { msg := []byte(fmt.Sprintf("message %d", i)) - bitmasks[i].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + bitmasks[i][0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) for _, sub := range subs { assertReceive(t, sub, msg) @@ -1422,20 +1440,20 @@ func TestBlossomSubDirectPeers(t *testing.T) { } // build the mesh - var subs []*Subscription - var bitmasks []*Bitmask + var subs [][]*Subscription + var bitmasks [][]*Bitmask for _, ps := range psubs { - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - bitmasks = append(bitmasks, b...) - sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + bitmasks = append(bitmasks, b) + sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - subs = append(subs, sub...) + subs = append(subs, sub) } time.Sleep(time.Second) @@ -1443,7 +1461,7 @@ func TestBlossomSubDirectPeers(t *testing.T) { // publish some messages for i := 0; i < 3; i++ { msg := []byte(fmt.Sprintf("message %d", i)) - bitmasks[i].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + bitmasks[i][0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) for _, sub := range subs { assertReceive(t, sub, msg) @@ -1464,7 +1482,7 @@ func TestBlossomSubDirectPeers(t *testing.T) { // publish some messages for i := 0; i < 3; i++ { msg := []byte(fmt.Sprintf("message %d", i+3)) - bitmasks[i].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + bitmasks[i][0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) for _, sub := range subs { assertReceive(t, sub, msg) @@ -1491,34 +1509,34 @@ func TestBlossomSubPeerFilter(t *testing.T) { connect(t, h[0], h[2]) // Join all peers - var subs []*Subscription - var bitmasks []*Bitmask + var subs [][]*Subscription + var bitmasks [][]*Bitmask for _, ps := range psubs { - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - bitmasks = append(bitmasks, b...) - sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + bitmasks = append(bitmasks, b) + sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - subs = append(subs, sub...) + subs = append(subs, sub) } time.Sleep(time.Second) msg := []byte("message") - bitmasks[0].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + bitmasks[0][0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) assertReceive(t, subs[0], msg) assertReceive(t, subs[1], msg) assertNeverReceives(t, subs[2], time.Second) msg = []byte("message2") - bitmasks[1].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + bitmasks[1][0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) assertReceive(t, subs[0], msg) assertReceive(t, subs[1], msg) assertNeverReceives(t, subs[2], time.Second) @@ -1540,25 +1558,25 @@ func TestBlossomSubDirectPeersFanout(t *testing.T) { connect(t, h[0], h[2]) // Join all peers except h2 - var subs []*Subscription - var bitmasks []*Bitmask + var subs [][]*Subscription + var bitmasks [][]*Bitmask for _, ps := range psubs[:2] { - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - bitmasks = append(bitmasks, b...) - sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + bitmasks = append(bitmasks, b) + sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - subs = append(subs, sub...) + subs = append(subs, sub) } time.Sleep(time.Second) - b, err := psubs[2].Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := psubs[2].Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -1566,7 +1584,7 @@ func TestBlossomSubDirectPeersFanout(t *testing.T) { // h2 publishes some messages to build a fanout for i := 0; i < 3; i++ { msg := []byte(fmt.Sprintf("message %d", i)) - b[0].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + b[0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) for _, sub := range subs { assertReceive(t, sub, msg) @@ -1577,7 +1595,7 @@ func TestBlossomSubDirectPeersFanout(t *testing.T) { result := make(chan bool, 2) psubs[2].eval <- func() { rt := psubs[2].rt.(*BlossomSubRouter) - fanout := rt.fanout[string([]byte{0x00, 0x00, 0x80, 0x00})] + fanout := rt.fanout[string([]byte{0x00, 0x00, 0x81, 0x00})] _, ok := fanout[h[0].ID()] result <- ok _, ok = fanout[h[1].ID()] @@ -1595,7 +1613,7 @@ func TestBlossomSubDirectPeersFanout(t *testing.T) { } // now subscribe h2 too and verify tht h0 is in the mesh but not h1 - _, err = psubs[2].Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + _, err = psubs[2].Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -1604,7 +1622,7 @@ func TestBlossomSubDirectPeersFanout(t *testing.T) { psubs[2].eval <- func() { rt := psubs[2].rt.(*BlossomSubRouter) - mesh := rt.mesh[string([]byte{0x00, 0x00, 0x80, 0x00})] + mesh := rt.mesh[string([]byte{0x00, 0x00, 0x81, 0x00})] _, ok := mesh[h[0].ID()] result <- ok _, ok = mesh[h[1].ID()] @@ -1637,20 +1655,20 @@ func TestBlossomSubFloodPublish(t *testing.T) { } // build the (partial, unstable) mesh - var subs []*Subscription - var bitmasks []*Bitmask + var subs [][]*Subscription + var bitmasks [][]*Bitmask for _, ps := range psubs { - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - bitmasks = append(bitmasks, b...) - sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + bitmasks = append(bitmasks, b) + sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - subs = append(subs, sub...) + subs = append(subs, sub) } time.Sleep(time.Second) @@ -1658,7 +1676,7 @@ func TestBlossomSubFloodPublish(t *testing.T) { // send a message from the star and assert it was received for i := 0; i < 20; i++ { msg := []byte(fmt.Sprintf("message %d", i)) - bitmasks[0].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + bitmasks[0][0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) for _, sub := range subs { assertReceive(t, sub, msg) @@ -1674,7 +1692,7 @@ func TestBlossomSubEnoughPeers(t *testing.T) { psubs := getBlossomSubs(ctx, hosts) for _, ps := range psubs { - _, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + _, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -1683,7 +1701,7 @@ func TestBlossomSubEnoughPeers(t *testing.T) { // at this point we have no connections and no mesh, so EnoughPeers should return false res := make(chan bool, 1) psubs[0].eval <- func() { - res <- psubs[0].rt.EnoughPeers([]byte{0x00, 0x00, 0x80, 0x00}, 0) + res <- psubs[0].rt.EnoughPeers([]byte{0x00, 0x00, 0x81, 0x00}, 0) } enough := <-res if enough { @@ -1696,7 +1714,7 @@ func TestBlossomSubEnoughPeers(t *testing.T) { time.Sleep(3 * time.Second) psubs[0].eval <- func() { - res <- psubs[0].rt.EnoughPeers([]byte{0x00, 0x00, 0x80, 0x00}, 0) + res <- psubs[0].rt.EnoughPeers([]byte{0x00, 0x00, 0x81, 0x00}, 0) } enough = <-res if !enough { @@ -1768,13 +1786,13 @@ func TestBlossomSubNegativeScore(t *testing.T) { var subs []*Subscription var bitmasks []*Bitmask for _, ps := range psubs { - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } bitmasks = append(bitmasks, b...) - sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -1785,7 +1803,7 @@ func TestBlossomSubNegativeScore(t *testing.T) { for i := 0; i < 20; i++ { msg := []byte(fmt.Sprintf("message %d", i)) - bitmasks[i%20].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + bitmasks[i%20].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) time.Sleep(20 * time.Millisecond) } @@ -1841,7 +1859,7 @@ func TestBlossomSubScoreValidatorEx(t *testing.T) { DecayInterval: time.Second, DecayToZero: 0.01, Bitmasks: map[string]*BitmaskScoreParams{ - string([]byte{0x00, 0x00, 0x80, 0x00}): { + string([]byte{0x00, 0x00, 0x81, 0x00}): { BitmaskWeight: 1, TimeInMeshQuantum: time.Second, InvalidMessageDeliveriesWeight: -1, @@ -1857,7 +1875,7 @@ func TestBlossomSubScoreValidatorEx(t *testing.T) { connectAll(t, hosts) - err := psubs[0].RegisterBitmaskValidator([]byte{0x00, 0x00, 0x80, 0x00}, func(ctx context.Context, p peer.ID, msg *Message) ValidationResult { + err := psubs[0].RegisterBitmaskValidator([]byte{0x00, 0x00, 0x81, 0x00}, func(ctx context.Context, p peer.ID, msg *Message) ValidationResult { // we ignore host1 and reject host2 if p == hosts[1].ID() { return ValidationIgnore @@ -1872,17 +1890,17 @@ func TestBlossomSubScoreValidatorEx(t *testing.T) { t.Fatal(err) } - sub, err := psubs[0].Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + sub, err := psubs[0].Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - b1, err := psubs[1].Join([]byte{0x00, 0x00, 0x80, 0x00}) + b1, err := psubs[1].Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - b2, err := psubs[2].Join([]byte{0x00, 0x00, 0x80, 0x00}) + b2, err := psubs[2].Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -1899,8 +1917,8 @@ func TestBlossomSubScoreValidatorEx(t *testing.T) { } } - b1[0].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, []byte("i am not a walrus")) - b2[0].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, []byte("i am not a walrus either")) + b1[0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, []byte("i am not a walrus")) + b2[0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, []byte("i am not a walrus either")) // assert no messages expectNoMessage(sub[0]) @@ -1939,7 +1957,7 @@ func TestBlossomSubPiggybackControl(t *testing.T) { res := make(chan *RPC, 1) ps.eval <- func() { gs := ps.rt.(*BlossomSubRouter) - test1 := []byte{0x00, 0x80, 0x00, 0x00} + test1 := []byte{0x00, 0x81, 0x00, 0x00} test2 := []byte{0x00, 0x20, 0x00, 0x00} test3 := []byte{0x00, 0x00, 0x02, 0x00} gs.mesh[string(test1)] = make(map[peer.ID]struct{}) @@ -1961,7 +1979,7 @@ func TestBlossomSubPiggybackControl(t *testing.T) { if len(rpc.Control.Graft) != 1 { t.Fatal("expected 1 GRAFT") } - if !bytes.Equal(rpc.Control.Graft[0].GetBitmask(), []byte{0x00, 0x80, 0x00, 0x00}) { + if !bytes.Equal(rpc.Control.Graft[0].GetBitmask(), []byte{0x00, 0x81, 0x00, 0x00}) { t.Fatal("expected test1 as graft bitmask ID") } if len(rpc.Control.Prune) != 2 { @@ -1985,7 +2003,7 @@ func TestBlossomSubMultipleGraftBitmasks(t *testing.T) { time.Sleep(time.Second * 1) - firstBitmask := []byte{0x00, 0x80, 0x00, 0x00} + firstBitmask := []byte{0x00, 0x81, 0x00, 0x00} secondBitmask := []byte{0x00, 0x20, 0x00, 0x00} thirdBitmask := []byte{0x00, 0x00, 0x02, 0x00} @@ -2058,7 +2076,7 @@ func TestBlossomSubOpportunisticGrafting(t *testing.T) { DecayInterval: time.Second, DecayToZero: 0.01, Bitmasks: map[string]*BitmaskScoreParams{ - string([]byte{0x00, 0x00, 0x80, 0x00}): { + string([]byte{0x00, 0x00, 0x81, 0x00}): { BitmaskWeight: 1, TimeInMeshWeight: 0.0002777, TimeInMeshQuantum: time.Second, @@ -2099,13 +2117,13 @@ func TestBlossomSubOpportunisticGrafting(t *testing.T) { // ask the real pubsus to join the bitmask var bitmasks []*Bitmask for _, ps := range psubs { - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } bitmasks = append(bitmasks, b...) - sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -2123,7 +2141,7 @@ func TestBlossomSubOpportunisticGrafting(t *testing.T) { // publish a bunch of messages from the real hosts for i := 0; i < 1000; i++ { msg := []byte(fmt.Sprintf("message %d", i)) - bitmasks[i%10].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + bitmasks[i%10].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) time.Sleep(20 * time.Millisecond) } @@ -2137,7 +2155,7 @@ func TestBlossomSubOpportunisticGrafting(t *testing.T) { gs := ps.rt.(*BlossomSubRouter) count := 0 for _, h := range hosts[:10] { - _, ok := gs.mesh[string([]byte{0x00, 0x00, 0x80, 0x00})][h.ID()] + _, ok := gs.mesh[string([]byte{0x00, 0x00, 0x81, 0x00})][h.ID()] if ok { count++ } @@ -2167,7 +2185,7 @@ func TestBlossomSubLeaveBitmask(t *testing.T) { // Join all peers var subs []*Subscription for _, ps := range psubs { - sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -2181,9 +2199,9 @@ func TestBlossomSubLeaveBitmask(t *testing.T) { psubs[0].rt.(*BlossomSubRouter).p.eval <- func() { defer close(done) - psubs[0].rt.Leave([]byte{0x00, 0x00, 0x80, 0x00}) + psubs[0].rt.Leave([]byte{0x00, 0x00, 0x81, 0x00}) time.Sleep(time.Second) - peerMap := psubs[0].rt.(*BlossomSubRouter).backoff[string([]byte{0x00, 0x00, 0x80, 0x00})] + peerMap := psubs[0].rt.(*BlossomSubRouter).backoff[string([]byte{0x00, 0x00, 0x81, 0x00})] if len(peerMap) != 1 { t.Fatalf("No peer is populated in the backoff map for peer 0") } @@ -2205,7 +2223,7 @@ func TestBlossomSubLeaveBitmask(t *testing.T) { // for peer 0. psubs[1].rt.(*BlossomSubRouter).p.eval <- func() { defer close(done) - peerMap2 := psubs[1].rt.(*BlossomSubRouter).backoff[string([]byte{0x00, 0x00, 0x80, 0x00})] + peerMap2 := psubs[1].rt.(*BlossomSubRouter).backoff[string([]byte{0x00, 0x00, 0x81, 0x00})] if len(peerMap2) != 1 { t.Fatalf("No peer is populated in the backoff map for peer 1") } @@ -2243,12 +2261,12 @@ func TestBlossomSubJoinBitmask(t *testing.T) { peerMap := make(map[peer.ID]time.Time) peerMap[h[1].ID()] = time.Now().Add(router0.params.UnsubscribeBackoff) - router0.backoff[string([]byte{0x00, 0x00, 0x80, 0x00})] = peerMap + router0.backoff[string([]byte{0x00, 0x00, 0x81, 0x00})] = peerMap // Join all peers var subs []*Subscription for _, ps := range psubs { - sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -2258,7 +2276,7 @@ func TestBlossomSubJoinBitmask(t *testing.T) { time.Sleep(time.Second) router0.meshMx.RLock() - meshMap := router0.mesh[string([]byte{0x00, 0x00, 0x80, 0x00})] + meshMap := router0.mesh[string([]byte{0x00, 0x00, 0x81, 0x00})] router0.meshMx.RUnlock() if len(meshMap) != 1 { t.Fatalf("Unexpect peer included in the mesh") @@ -2287,7 +2305,7 @@ func (sq *sybilSquatter) handleStream(s network.Stream) { r := msgio.NewVarintReaderSize(s, DefaultMaxMessageSize) w := msgio.NewVarintWriter(os) truth := true - bitmask := []byte{0x00, 0x00, 0x80, 0x00} + bitmask := []byte{0x00, 0x00, 0x81, 0x00} msg := &pb.RPC{Subscriptions: []*pb.RPC_SubOpts{{Subscribe: truth, Bitmask: bitmask}}} out, err := proto.Marshal(msg) if err != nil { @@ -2326,7 +2344,7 @@ func TestBlossomSubPeerScoreInspect(t *testing.T) { WithPeerScore( &PeerScoreParams{ Bitmasks: map[string]*BitmaskScoreParams{ - string([]byte{0x00, 0x00, 0x80, 0x00}): { + string([]byte{0x00, 0x00, 0x81, 0x00}): { BitmaskWeight: 1, TimeInMeshQuantum: time.Second, FirstMessageDeliveriesWeight: 1, @@ -2352,13 +2370,13 @@ func TestBlossomSubPeerScoreInspect(t *testing.T) { connect(t, hosts[0], hosts[1]) var bitmasks []*Bitmask for _, ps := range psubs { - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } bitmasks = append(bitmasks, b...) - _, err = ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + _, err = ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -2368,7 +2386,7 @@ func TestBlossomSubPeerScoreInspect(t *testing.T) { for i := 0; i < 20; i++ { msg := []byte(fmt.Sprintf("message %d", i)) - bitmasks[i%2].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + bitmasks[i%2].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) time.Sleep(20 * time.Millisecond) } @@ -2391,7 +2409,7 @@ func TestBlossomSubPeerScoreResetBitmaskParams(t *testing.T) { WithPeerScore( &PeerScoreParams{ Bitmasks: map[string]*BitmaskScoreParams{ - string([]byte{0x00, 0x00, 0x80, 0x00}): { + string([]byte{0x00, 0x00, 0x81, 0x00}): { BitmaskWeight: 1, TimeInMeshQuantum: time.Second, FirstMessageDeliveriesWeight: 1, @@ -2411,7 +2429,7 @@ func TestBlossomSubPeerScoreResetBitmaskParams(t *testing.T) { GraylistThreshold: -1000, })) - bitmask, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + bitmask, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -2462,11 +2480,11 @@ func TestBlossomSubRPCFragmentation(t *testing.T) { connect(t, hosts[0], hosts[1]) // have the real pubsub join the test bitmask - b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00}) + b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } - _, err = ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00}) + _, err = ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00}) if err != nil { t.Fatal(err) } @@ -2480,7 +2498,7 @@ func TestBlossomSubRPCFragmentation(t *testing.T) { for i := 0; i < nMessages; i++ { msg := make([]byte, msgSize) rand.Read(msg) - b[0].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg) + b[0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg) time.Sleep(20 * time.Millisecond) } @@ -2535,7 +2553,7 @@ func (iwe *iwantEverything) handleStream(s network.Stream) { r := msgio.NewVarintReaderSize(s, DefaultMaxMessageSize) w := msgio.NewVarintWriter(os) truth := true - bitmask := []byte{0x00, 0x00, 0x80, 0x00} + bitmask := []byte{0x00, 0x00, 0x81, 0x00} msg := &pb.RPC{Subscriptions: []*pb.RPC_SubOpts{{Subscribe: truth, Bitmask: bitmask}}} out, err := proto.Marshal(msg) @@ -2610,7 +2628,7 @@ func (iwe *iwantEverything) handleStream(s network.Stream) { func TestFragmentRPCFunction(t *testing.T) { p := peer.ID("some-peer") - bitmask := []byte{0x00, 0x00, 0x80, 0x00} + bitmask := []byte{0x00, 0x00, 0x81, 0x00} rpc := &RPC{RPC: new(pb.RPC), from: p} limit := 1024 diff --git a/go-libp2p-blossomsub/validation_builtin_test.go b/go-libp2p-blossomsub/validation_builtin_test.go index 6e6ef15..3ebe11f 100644 --- a/go-libp2p-blossomsub/validation_builtin_test.go +++ b/go-libp2p-blossomsub/validation_builtin_test.go @@ -154,7 +154,7 @@ func TestBasicSeqnoValidatorReplay(t *testing.T) { } for _, sub := range msgs { - assertNeverReceives(t, sub, time.Second) + assertNeverReceives(t, []*Subscription{sub}, time.Second) } } diff --git a/go-libp2p-blossomsub/validation_test.go b/go-libp2p-blossomsub/validation_test.go index 46feb70..178911c 100644 --- a/go-libp2p-blossomsub/validation_test.go +++ b/go-libp2p-blossomsub/validation_test.go @@ -349,13 +349,13 @@ func TestValidateAssortedOptions(t *testing.T) { bitmasks1[i].Publish(ctx, bitmasks1[i].bitmask, msg) for _, sub := range subs1 { - assertReceive(t, sub, msg) + assertReceive(t, []*Subscription{sub}, msg) } msg = []byte(fmt.Sprintf("message2 %d", i)) bitmasks2[i].Publish(ctx, bitmasks2[i].bitmask, msg) for _, sub := range subs2 { - assertReceive(t, sub, msg) + assertReceive(t, []*Subscription{sub}, msg) } } }