Browse Source

Use fasthttp.PipeConns instead of io.Pipe in StreamReader

This improves StreamReader performance by more than 2x.
Aliaksandr Valialkin 5 years ago
parent
commit
ce9d1d2224
3 changed files with 83 additions and 8 deletions
  1. 5 1
      stream.go
  2. 8 7
      stream_test.go
  3. 70 0
      stream_timing_test.go

+ 5 - 1
stream.go

@@ -5,6 +5,8 @@ import (
 	"io"
 	"runtime/debug"
 	"sync"
+
+	"github.com/valyala/fasthttp/fasthttputil"
 )
 
 // StreamWriter must write data to w.
@@ -26,7 +28,9 @@ type StreamWriter func(w *bufio.Writer)
 //
 // See also Response.SetBodyStreamWriter.
 func NewStreamReader(sw StreamWriter) io.ReadCloser {
-	pr, pw := io.Pipe()
+	pc := fasthttputil.NewPipeConns()
+	pw := pc.Conn1()
+	pr := pc.Conn2()
 
 	var bw *bufio.Writer
 	v := streamWriterBufPool.Get()

+ 8 - 7
stream_test.go

@@ -39,18 +39,16 @@ func TestNewStreamReader(t *testing.T) {
 
 func TestStreamReaderClose(t *testing.T) {
 	firstLine := "the first line must pass"
-	ch := make(chan struct{})
+	ch := make(chan error, 1)
 	r := NewStreamReader(func(w *bufio.Writer) {
 		fmt.Fprintf(w, "%s", firstLine)
 		if err := w.Flush(); err != nil {
-			t.Fatalf("unexpected error: %s", err)
+			ch <- fmt.Errorf("unexpected error on first flush: %s", err)
+			return
 		}
 
 		fmt.Fprintf(w, "the second line must fail")
-		if err := w.Flush(); err == nil {
-			t.Fatalf("expecting error")
-		}
-		close(ch)
+		ch <- nil
 	})
 
 	result := firstLine + "the"
@@ -71,7 +69,10 @@ func TestStreamReaderClose(t *testing.T) {
 	}
 
 	select {
-	case <-ch:
+	case err := <-ch:
+		if err != nil {
+			t.Fatalf("error returned from stream reader: %s", err)
+		}
 	case <-time.After(time.Second):
 		t.Fatalf("timeout")
 	}

+ 70 - 0
stream_timing_test.go

@@ -0,0 +1,70 @@
+package fasthttp
+
+import (
+	"bufio"
+	"io"
+	"testing"
+	"time"
+)
+
+func BenchmarkStreamReader1(b *testing.B) {
+	benchmarkStreamReader(b, 1)
+}
+
+func BenchmarkStreamReader10(b *testing.B) {
+	benchmarkStreamReader(b, 10)
+}
+
+func BenchmarkStreamReader100(b *testing.B) {
+	benchmarkStreamReader(b, 100)
+}
+
+func BenchmarkStreamReader1K(b *testing.B) {
+	benchmarkStreamReader(b, 1000)
+}
+
+func BenchmarkStreamReader10K(b *testing.B) {
+	benchmarkStreamReader(b, 10000)
+}
+
+func benchmarkStreamReader(b *testing.B, size int) {
+	src := createFixedBody(size)
+	b.SetBytes(int64(size))
+
+	b.RunParallel(func(pb *testing.PB) {
+		dst := make([]byte, size)
+		ch := make(chan error, 1)
+		sr := NewStreamReader(func(w *bufio.Writer) {
+			for pb.Next() {
+				if _, err := w.Write(src); err != nil {
+					ch <- err
+					return
+				}
+				if err := w.Flush(); err != nil {
+					ch <- err
+					return
+				}
+			}
+			ch <- nil
+		})
+		for {
+			if _, err := sr.Read(dst); err != nil {
+				if err == io.EOF {
+					break
+				}
+				b.Fatalf("unexpected error when reading from stream reader: %s", err)
+			}
+		}
+		if err := sr.Close(); err != nil {
+			b.Fatalf("unexpected error when closing stream reader: %s", err)
+		}
+		select {
+		case err := <-ch:
+			if err != nil {
+				b.Fatalf("unexpected error from stream reader: %s", err)
+			}
+		case <-time.After(time.Second):
+			b.Fatalf("timeout")
+		}
+	})
+}