diff --git a/go-libp2p-blossomsub/blossomsub.go b/go-libp2p-blossomsub/blossomsub.go index 5fd4ba9..2ad0770 100644 --- a/go-libp2p-blossomsub/blossomsub.go +++ b/go-libp2p-blossomsub/blossomsub.go @@ -693,62 +693,31 @@ func (bs *BlossomSubRouter) RemovePeer(p peer.ID) { } func (bs *BlossomSubRouter) EnoughPeers(bitmask []byte, suggested int) bool { - sliced := SliceBitmask(bitmask) - // bloom: - if len(sliced) != 1 { - // check all peers in the bitmask - peers := bs.p.getPeersInBitmask(bitmask) - if len(peers) == 0 { - return false - } - - fsPeers, bsPeers := 0, 0 - - for _, p := range peers { - if !bs.feature(BlossomSubFeatureMesh, bs.peers[p]) { - fsPeers++ - } - } - - // BlossomSub peers - bsPeers = len(peers) - - if suggested == 0 { - suggested = bs.params.Dlo - } - - if fsPeers+bsPeers >= suggested || bsPeers >= bs.params.Dhi { - return true - } - - return false - } else { // classic gossip - tmap, ok := bs.p.bitmasks[string(bitmask)] - if !ok { - return false - } - - fsPeers, bsPeers := 0, 0 - // floodsub peers - for p := range tmap { - if !bs.feature(BlossomSubFeatureMesh, bs.peers[p]) { - fsPeers++ - } - } - - // BlossomSub peers - bsPeers = len(bs.mesh[string(bitmask)]) - - if suggested == 0 { - suggested = bs.params.Dlo - } - - if fsPeers+bsPeers >= suggested || bsPeers >= bs.params.Dhi { - return true - } - + tmap, ok := bs.p.bitmasks[string(bitmask)] + if !ok { return false } + + fsPeers, bsPeers := 0, 0 + // floodsub peers + for p := range tmap { + if !bs.feature(BlossomSubFeatureMesh, bs.peers[p]) { + fsPeers++ + } + } + + // BlossomSub peers + bsPeers = len(bs.mesh[string(bitmask)]) + + if suggested == 0 { + suggested = bs.params.Dlo + } + + if fsPeers+bsPeers >= suggested || bsPeers >= bs.params.Dhi { + return true + } + + return false } func (bs *BlossomSubRouter) PeerScore(p peer.ID) float64 { @@ -1052,31 +1021,15 @@ func (bs *BlossomSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) { for _, prune := range ctl.GetPrune() { bitmask := prune.GetBitmask() - sliced := SliceBitmask(bitmask) - // bloom publish: - if len(sliced) != 1 { - // any peers in all slices of the bitmask? - peers := bs.p.getPeersInBitmask(bitmask) - if len(peers) == 0 { - return - } + peers, ok := bs.mesh[string(bitmask)] + if !ok { + continue + } - for _, p := range peers { - log.Debugf("PRUNE: Remove mesh link to %s in %s", p, bitmask) - bs.tracer.Prune(p, bitmask) - delete(bs.p.bitmasks[string(bitmask)], p) - } - } else { // classic gossip mesh - peers, ok := bs.mesh[string(bitmask)] - if !ok { - continue - } - - if _, inMesh := peers[p]; inMesh { - log.Debugf("PRUNE: Remove mesh link to %s in %s", p, bitmask) - bs.tracer.Prune(p, bitmask) - delete(peers, p) - } + if _, inMesh := peers[p]; inMesh { + log.Debugf("PRUNE: Remove mesh link to %s in %s", p, bitmask) + bs.tracer.Prune(p, bitmask) + delete(peers, p) } // is there a backoff specified by the peer? if so obey it. @@ -1374,56 +1327,24 @@ func (bs *BlossomSubRouter) Join(bitmask []byte) { } func (bs *BlossomSubRouter) Leave(bitmask []byte) { - sliced := SliceBitmask(bitmask) - // bloom publish: - if len(sliced) != 1 { - // any peers in all slices of the bitmask? - peers := bs.p.getPeersInBitmask(bitmask) - if len(peers) == 0 { - return - } + gmap, ok := bs.mesh[string(bitmask)] + if !ok { + return + } - for _, s := range sliced { - _, ok := bs.mesh[string(s)] - if !ok { - continue - } + log.Debugf("LEAVE %s", bitmask) + bs.tracer.Leave(bitmask) - log.Debugf("LEAVE %s", bitmask) - bs.tracer.Leave(bitmask) + delete(bs.mesh, string(bitmask)) - delete(bs.mesh, string(bitmask)) - } - - for _, p := range peers { - log.Debugf("LEAVE: Remove mesh link to %s in %s", p, bitmask) - bs.tracer.Prune(p, bitmask) - bs.sendPrune(p, bitmask, true) - // Add a backoff to this peer to prevent us from eagerly - // re-grafting this peer into our mesh if we rejoin this - // bitmask before the backoff period ends. - bs.addBackoff(p, bitmask, true) - } - } else { // classic gossip mesh - gmap, ok := bs.mesh[string(bitmask)] - if !ok { - return - } - - log.Debugf("LEAVE %s", bitmask) - bs.tracer.Leave(bitmask) - - delete(bs.mesh, string(bitmask)) - - for p := range gmap { - log.Debugf("LEAVE: Remove mesh link to %s in %s", p, bitmask) - bs.tracer.Prune(p, bitmask) - bs.sendPrune(p, bitmask, true) - // Add a backoff to this peer to prevent us from eagerly - // re-grafting this peer into our mesh if we rejoin this - // bitmask before the backoff period ends. - bs.addBackoff(p, bitmask, true) - } + for p := range gmap { + log.Debugf("LEAVE: Remove mesh link to %s in %s", p, bitmask) + bs.tracer.Prune(p, bitmask) + bs.sendPrune(p, bitmask, true) + // Add a backoff to this peer to prevent us from eagerly + // re-grafting this peer into our mesh if we rejoin this + // bitmask before the backoff period ends. + bs.addBackoff(p, bitmask, true) } } diff --git a/go-libp2p-blossomsub/blossomsub_test.go b/go-libp2p-blossomsub/blossomsub_test.go index 10147e9..0df484d 100644 --- a/go-libp2p-blossomsub/blossomsub_test.go +++ b/go-libp2p-blossomsub/blossomsub_test.go @@ -1700,9 +1700,10 @@ func TestBlossomSubEnoughPeers(t *testing.T) { } // at this point we have no connections and no mesh, so EnoughPeers should return false + // NOTE: EnoughPeers operates with bloom filters, so we need to check for individual filters. res := make(chan bool, 1) psubs[0].eval <- func() { - res <- psubs[0].rt.EnoughPeers([]byte{0x00, 0x00, 0x81, 0x00}, 0) + res <- psubs[0].rt.EnoughPeers([]byte{0x00, 0x00, 0x80, 0x00}, 0) } enough := <-res if enough { @@ -1715,7 +1716,7 @@ func TestBlossomSubEnoughPeers(t *testing.T) { time.Sleep(5 * time.Second) psubs[0].eval <- func() { - res <- psubs[0].rt.EnoughPeers([]byte{0x00, 0x00, 0x81, 0x00}, 0) + res <- psubs[0].rt.EnoughPeers([]byte{0x00, 0x00, 0x80, 0x00}, 0) } enough = <-res if !enough { @@ -2199,11 +2200,12 @@ func TestBlossomSubLeaveBitmask(t *testing.T) { leaveTime := time.Now() done := make(chan struct{}) + // NOTE: Leave operates with bloom filters, so we need to check for individual filters. psubs[0].rt.(*BlossomSubRouter).p.eval <- func() { defer close(done) - psubs[0].rt.Leave([]byte{0x00, 0x00, 0x81, 0x00}) + psubs[0].rt.Leave([]byte{0x00, 0x00, 0x80, 0x00}) time.Sleep(time.Second) - peerMap := psubs[0].rt.(*BlossomSubRouter).backoff[string([]byte{0x00, 0x00, 0x81, 0x00})] + peerMap := psubs[0].rt.(*BlossomSubRouter).backoff[string([]byte{0x00, 0x00, 0x80, 0x00})] if len(peerMap) != 1 { t.Fatalf("No peer is populated in the backoff map for peer 0") } @@ -2225,7 +2227,7 @@ func TestBlossomSubLeaveBitmask(t *testing.T) { // for peer 0. psubs[1].rt.(*BlossomSubRouter).p.eval <- func() { defer close(done) - peerMap2 := psubs[1].rt.(*BlossomSubRouter).backoff[string([]byte{0x00, 0x00, 0x81, 0x00})] + peerMap2 := psubs[1].rt.(*BlossomSubRouter).backoff[string([]byte{0x00, 0x00, 0x80, 0x00})] if len(peerMap2) != 1 { t.Fatalf("No peer is populated in the backoff map for peer 1") }