diff --git a/net/backpressure/backpressure_test.go b/net/backpressure/backpressure_test.go index 448038909..7a92101cd 100644 --- a/net/backpressure/backpressure_test.go +++ b/net/backpressure/backpressure_test.go @@ -1,6 +1,9 @@ package backpressure_tests import ( + crand "crypto/rand" + "io" + "math/rand" "testing" "time" @@ -206,3 +209,174 @@ a problem. log.Info("handler backpressure works!") } } + +// TestStBackpressureStreamWrite tests whether streams see proper +// backpressure when writing data over the network streams. +func TestStBackpressureStreamWrite(t *testing.T) { + + // senderWrote signals that the sender wrote bytes to remote. + // the value is the count of bytes written. + senderWrote := make(chan int, 10000) + + // sender signals it's done (errored out) + senderDone := make(chan struct{}) + + // writeStats lets us listen to all the writes and return + // how many happened and how much was written + writeStats := func() (int, int) { + writes := 0 + bytes := 0 + for { + select { + case n := <-senderWrote: + writes++ + bytes = bytes + n + default: + log.Debugf("stats: sender wrote %d bytes, %d writes", bytes, writes) + return bytes, writes + } + } + } + + // sender attempts to write as fast as possible, signaling on the + // completion of every write. This makes it possible to see how + // fast it's actually writing. We pair this with a receiver + // that waits for a signal to read. + sender := func(s inet.Stream) { + defer func() { + s.Close() + senderDone <- struct{}{} + }() + + // ready a buffer of random data + buf := make([]byte, 65536) + crand.Read(buf) + + for { + // send a randomly sized subchunk + from := rand.Intn(len(buf) / 2) + to := rand.Intn(len(buf) / 2) + sendbuf := buf[from : from+to] + + n, err := s.Write(sendbuf) + if err != nil { + log.Debug("sender error. exiting:", err) + return + } + + log.Debugf("sender wrote %d bytes", n) + senderWrote <- n + } + } + + // receive a number of bytes from a stream. + // returns the number of bytes written. + receive := func(s inet.Stream, expect int) { + log.Debugf("receiver to read %d bytes", expect) + rbuf := make([]byte, expect) + n, err := io.ReadFull(s, rbuf) + if err != nil { + t.Error("read failed:", err) + } + if expect != n { + t.Error("read len differs: %d != %d", expect, n) + } + } + + // ok let's do it! + + // setup the networks + ctx := context.Background() + n1, err := GenNetwork(ctx) + if err != nil { + t.Fatal(err) + } + n2, err := GenNetwork(ctx) + if err != nil { + t.Fatal(err) + } + + // setup sender handler on 1 + n1.SetHandler(inet.ProtocolTesting, sender) + + log.Debugf("dialing %s", n2.ListenAddresses()) + if err := n1.DialPeer(ctx, n2.LocalPeer()); err != nil { + t.Fatalf("Failed to dial:", err) + } + + // open a stream, from 2->1, this is our reader + s, err := n2.NewStream(inet.ProtocolTesting, n1.LocalPeer()) + + // let's make sure r/w works. + testSenderWrote := func(bytesE int) { + bytesA, writesA := writeStats() + if bytesA != bytesE { + t.Errorf("numbers failed: %d =?= %d bytes, via %d writes", bytesA, bytesE, writesA) + } + } + + // 500ms rounds of lockstep write + drain + roundsStart := time.Now() + roundsTotal := 0 + for roundsTotal < (2 << 20) { + // let the sender fill its buffers, it will stop sending. + <-time.After(300 * time.Millisecond) + b, _ := writeStats() + testSenderWrote(0) + testSenderWrote(0) + + // drain it all, wait again + receive(s, b) + roundsTotal = roundsTotal + b + } + roundsTime := time.Now().Sub(roundsStart) + + // now read continously, while we measure stats. + stop := make(chan struct{}) + contStart := time.Now() + + go func() { + for { + select { + case <-stop: + return + default: + receive(s, 2<<15) + } + } + }() + + contTotal := 0 + for contTotal < (2 << 20) { + n := <-senderWrote + contTotal += n + } + stop <- struct{}{} + contTime := time.Now().Sub(contStart) + + if roundsTime < contTime { + t.Error("continuous should have been faster") + } + + if roundsTotal < contTotal { + t.Error("continuous should have been larger, too!") + } + + <-time.After(300 * time.Millisecond) + writeStats() + testSenderWrote(0) + testSenderWrote(0) + + // this doesn't work :(: + // // now for the sugar on top: let's tear down the receiver. it should + // // exit the sender. + // n1.Close() + // testSenderWrote(0) + // testSenderWrote(0) + // select { + // case <-time.After(2 * time.Second): + // t.Error("receiver shutdown failed to exit sender") + // case <-senderDone: + // log.Info("handler backpressure works!") + // } +}