mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-23 19:37:46 +08:00
net: sending backpressure test
This commit is contained in:
parent
ccc17d2740
commit
db7d7ae891
@ -1,6 +1,9 @@
|
||||
package backpressure_tests
|
||||
|
||||
import (
|
||||
crand "crypto/rand"
|
||||
"io"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -206,3 +209,174 @@ a problem.
|
||||
log.Info("handler backpressure works!")
|
||||
}
|
||||
}
|
||||
|
||||
// TestStBackpressureStreamWrite tests whether streams see proper
|
||||
// backpressure when writing data over the network streams.
|
||||
func TestStBackpressureStreamWrite(t *testing.T) {
|
||||
|
||||
// senderWrote signals that the sender wrote bytes to remote.
|
||||
// the value is the count of bytes written.
|
||||
senderWrote := make(chan int, 10000)
|
||||
|
||||
// sender signals it's done (errored out)
|
||||
senderDone := make(chan struct{})
|
||||
|
||||
// writeStats lets us listen to all the writes and return
|
||||
// how many happened and how much was written
|
||||
writeStats := func() (int, int) {
|
||||
writes := 0
|
||||
bytes := 0
|
||||
for {
|
||||
select {
|
||||
case n := <-senderWrote:
|
||||
writes++
|
||||
bytes = bytes + n
|
||||
default:
|
||||
log.Debugf("stats: sender wrote %d bytes, %d writes", bytes, writes)
|
||||
return bytes, writes
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sender attempts to write as fast as possible, signaling on the
|
||||
// completion of every write. This makes it possible to see how
|
||||
// fast it's actually writing. We pair this with a receiver
|
||||
// that waits for a signal to read.
|
||||
sender := func(s inet.Stream) {
|
||||
defer func() {
|
||||
s.Close()
|
||||
senderDone <- struct{}{}
|
||||
}()
|
||||
|
||||
// ready a buffer of random data
|
||||
buf := make([]byte, 65536)
|
||||
crand.Read(buf)
|
||||
|
||||
for {
|
||||
// send a randomly sized subchunk
|
||||
from := rand.Intn(len(buf) / 2)
|
||||
to := rand.Intn(len(buf) / 2)
|
||||
sendbuf := buf[from : from+to]
|
||||
|
||||
n, err := s.Write(sendbuf)
|
||||
if err != nil {
|
||||
log.Debug("sender error. exiting:", err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("sender wrote %d bytes", n)
|
||||
senderWrote <- n
|
||||
}
|
||||
}
|
||||
|
||||
// receive a number of bytes from a stream.
|
||||
// returns the number of bytes written.
|
||||
receive := func(s inet.Stream, expect int) {
|
||||
log.Debugf("receiver to read %d bytes", expect)
|
||||
rbuf := make([]byte, expect)
|
||||
n, err := io.ReadFull(s, rbuf)
|
||||
if err != nil {
|
||||
t.Error("read failed:", err)
|
||||
}
|
||||
if expect != n {
|
||||
t.Error("read len differs: %d != %d", expect, n)
|
||||
}
|
||||
}
|
||||
|
||||
// ok let's do it!
|
||||
|
||||
// setup the networks
|
||||
ctx := context.Background()
|
||||
n1, err := GenNetwork(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
n2, err := GenNetwork(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// setup sender handler on 1
|
||||
n1.SetHandler(inet.ProtocolTesting, sender)
|
||||
|
||||
log.Debugf("dialing %s", n2.ListenAddresses())
|
||||
if err := n1.DialPeer(ctx, n2.LocalPeer()); err != nil {
|
||||
t.Fatalf("Failed to dial:", err)
|
||||
}
|
||||
|
||||
// open a stream, from 2->1, this is our reader
|
||||
s, err := n2.NewStream(inet.ProtocolTesting, n1.LocalPeer())
|
||||
|
||||
// let's make sure r/w works.
|
||||
testSenderWrote := func(bytesE int) {
|
||||
bytesA, writesA := writeStats()
|
||||
if bytesA != bytesE {
|
||||
t.Errorf("numbers failed: %d =?= %d bytes, via %d writes", bytesA, bytesE, writesA)
|
||||
}
|
||||
}
|
||||
|
||||
// 500ms rounds of lockstep write + drain
|
||||
roundsStart := time.Now()
|
||||
roundsTotal := 0
|
||||
for roundsTotal < (2 << 20) {
|
||||
// let the sender fill its buffers, it will stop sending.
|
||||
<-time.After(300 * time.Millisecond)
|
||||
b, _ := writeStats()
|
||||
testSenderWrote(0)
|
||||
testSenderWrote(0)
|
||||
|
||||
// drain it all, wait again
|
||||
receive(s, b)
|
||||
roundsTotal = roundsTotal + b
|
||||
}
|
||||
roundsTime := time.Now().Sub(roundsStart)
|
||||
|
||||
// now read continously, while we measure stats.
|
||||
stop := make(chan struct{})
|
||||
contStart := time.Now()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
default:
|
||||
receive(s, 2<<15)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
contTotal := 0
|
||||
for contTotal < (2 << 20) {
|
||||
n := <-senderWrote
|
||||
contTotal += n
|
||||
}
|
||||
stop <- struct{}{}
|
||||
contTime := time.Now().Sub(contStart)
|
||||
|
||||
if roundsTime < contTime {
|
||||
t.Error("continuous should have been faster")
|
||||
}
|
||||
|
||||
if roundsTotal < contTotal {
|
||||
t.Error("continuous should have been larger, too!")
|
||||
}
|
||||
|
||||
<-time.After(300 * time.Millisecond)
|
||||
writeStats()
|
||||
testSenderWrote(0)
|
||||
testSenderWrote(0)
|
||||
|
||||
// this doesn't work :(:
|
||||
// // now for the sugar on top: let's tear down the receiver. it should
|
||||
// // exit the sender.
|
||||
// n1.Close()
|
||||
// testSenderWrote(0)
|
||||
// testSenderWrote(0)
|
||||
// select {
|
||||
// case <-time.After(2 * time.Second):
|
||||
// t.Error("receiver shutdown failed to exit sender")
|
||||
// case <-senderDone:
|
||||
// log.Info("handler backpressure works!")
|
||||
// }
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user