client_test.go 60 KB


  1. package fasthttp
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/tls"
  6. "fmt"
  7. "io"
  8. "net"
  9. "net/url"
  10. "os"
  11. "regexp"
  12. "runtime"
  13. "strings"
  14. "sync"
  15. "sync/atomic"
  16. "testing"
  17. "time"
  18. "github.com/valyala/fasthttp/fasthttputil"
  19. )
  20. func TestCloseIdleConnections(t *testing.T) {
  21. t.Parallel()
  22. ln := fasthttputil.NewInmemoryListener()
  23. s := &Server{
  24. Handler: func(ctx *RequestCtx) {
  25. },
  26. }
  27. go func() {
  28. if err := s.Serve(ln); err != nil {
  29. t.Error(err)
  30. }
  31. }()
  32. c := &Client{
  33. Dial: func(addr string) (net.Conn, error) {
  34. return ln.Dial()
  35. },
  36. }
  37. if _, _, err := c.Get(nil, "http://google.com"); err != nil {
  38. t.Fatal(err)
  39. }
  40. connsLen := func() int {
  41. c.mLock.Lock()
  42. defer c.mLock.Unlock()
  43. if _, ok := c.m["google.com"]; !ok {
  44. return 0
  45. }
  46. c.m["google.com"].connsLock.Lock()
  47. defer c.m["google.com"].connsLock.Unlock()
  48. return len(c.m["google.com"].conns)
  49. }
  50. if conns := connsLen(); conns > 1 {
  51. t.Errorf("expected 1 conns got %d", conns)
  52. }
  53. c.CloseIdleConnections()
  54. if conns := connsLen(); conns > 0 {
  55. t.Errorf("expected 0 conns got %d", conns)
  56. }
  57. }
  58. func TestPipelineClientSetUserAgent(t *testing.T) {
  59. t.Parallel()
  60. testPipelineClientSetUserAgent(t, 0)
  61. }
  62. func TestPipelineClientSetUserAgentTimeout(t *testing.T) {
  63. t.Parallel()
  64. testPipelineClientSetUserAgent(t, time.Second)
  65. }
  66. func testPipelineClientSetUserAgent(t *testing.T, timeout time.Duration) {
  67. ln := fasthttputil.NewInmemoryListener()
  68. userAgentSeen := ""
  69. s := &Server{
  70. Handler: func(ctx *RequestCtx) {
  71. userAgentSeen = string(ctx.UserAgent())
  72. },
  73. }
  74. go s.Serve(ln) //nolint:errcheck
  75. userAgent := "I'm not fasthttp"
  76. c := &HostClient{
  77. Name: userAgent,
  78. Dial: func(addr string) (net.Conn, error) {
  79. return ln.Dial()
  80. },
  81. }
  82. req := AcquireRequest()
  83. res := AcquireResponse()
  84. req.SetRequestURI("http://example.com")
  85. var err error
  86. if timeout <= 0 {
  87. err = c.Do(req, res)
  88. } else {
  89. err = c.DoTimeout(req, res, timeout)
  90. }
  91. if err != nil {
  92. t.Fatal(err)
  93. }
  94. if userAgentSeen != userAgent {
  95. t.Fatalf("User-Agent defers %q != %q", userAgentSeen, userAgent)
  96. }
  97. }
  98. func TestPipelineClientIssue832(t *testing.T) {
  99. t.Parallel()
  100. ln := fasthttputil.NewInmemoryListener()
  101. req := AcquireRequest()
  102. // Don't defer ReleaseRequest as we use it in a goroutine that might not be done at the end.
  103. req.SetHost("example.com")
  104. res := AcquireResponse()
  105. // Don't defer ReleaseResponse as we use it in a goroutine that might not be done at the end.
  106. client := PipelineClient{
  107. Dial: func(addr string) (net.Conn, error) {
  108. return ln.Dial()
  109. },
  110. ReadTimeout: time.Millisecond * 10,
  111. Logger: &testLogger{}, // Ignore log output.
  112. }
  113. attempts := 10
  114. go func() {
  115. for i := 0; i < attempts; i++ {
  116. c, err := ln.Accept()
  117. if err != nil {
  118. t.Error(err)
  119. }
  120. if c != nil {
  121. go func() {
  122. time.Sleep(time.Millisecond * 50)
  123. c.Close()
  124. }()
  125. }
  126. }
  127. }()
  128. done := make(chan int)
  129. go func() {
  130. defer close(done)
  131. for i := 0; i < attempts; i++ {
  132. if err := client.Do(req, res); err == nil {
  133. t.Error("error expected")
  134. }
  135. }
  136. }()
  137. select {
  138. case <-time.After(time.Second * 2):
  139. t.Fatal("PipelineClient did not restart worker")
  140. case <-done:
  141. }
  142. }
  143. func TestClientInvalidURI(t *testing.T) {
  144. t.Parallel()
  145. ln := fasthttputil.NewInmemoryListener()
  146. requests := int64(0)
  147. s := &Server{
  148. Handler: func(ctx *RequestCtx) {
  149. atomic.AddInt64(&requests, 1)
  150. },
  151. }
  152. go s.Serve(ln) //nolint:errcheck
  153. c := &Client{
  154. Dial: func(addr string) (net.Conn, error) {
  155. return ln.Dial()
  156. },
  157. }
  158. req, res := AcquireRequest(), AcquireResponse()
  159. defer func() {
  160. ReleaseRequest(req)
  161. ReleaseResponse(res)
  162. }()
  163. req.Header.SetMethod(MethodGet)
  164. req.SetRequestURI("http://example.com\r\n\r\nGET /\r\n\r\n")
  165. err := c.Do(req, res)
  166. if err == nil {
  167. t.Fatal("expected error (missing required Host header in request)")
  168. }
  169. if n := atomic.LoadInt64(&requests); n != 0 {
  170. t.Fatalf("0 requests expected, got %d", n)
  171. }
  172. }
  173. func TestClientGetWithBody(t *testing.T) {
  174. t.Parallel()
  175. ln := fasthttputil.NewInmemoryListener()
  176. s := &Server{
  177. Handler: func(ctx *RequestCtx) {
  178. body := ctx.Request.Body()
  179. ctx.Write(body) //nolint:errcheck
  180. },
  181. }
  182. go s.Serve(ln) //nolint:errcheck
  183. c := &Client{
  184. Dial: func(addr string) (net.Conn, error) {
  185. return ln.Dial()
  186. },
  187. }
  188. req, res := AcquireRequest(), AcquireResponse()
  189. defer func() {
  190. ReleaseRequest(req)
  191. ReleaseResponse(res)
  192. }()
  193. req.Header.SetMethod(MethodGet)
  194. req.SetRequestURI("http://example.com")
  195. req.SetBodyString("test")
  196. err := c.Do(req, res)
  197. if err != nil {
  198. t.Fatal(err)
  199. }
  200. if len(res.Body()) == 0 {
  201. t.Fatal("missing request body")
  202. }
  203. }
  204. func TestClientURLAuth(t *testing.T) {
  205. t.Parallel()
  206. cases := map[string]string{
  207. "user:[email protected]": "Basic dXNlcjpwYXNz",
  208. "foo:@": "Basic Zm9vOg==",
  209. ":@": "",
  210. "@": "",
  211. "": "",
  212. }
  213. ch := make(chan string, 1)
  214. ln := fasthttputil.NewInmemoryListener()
  215. s := &Server{
  216. Handler: func(ctx *RequestCtx) {
  217. ch <- string(ctx.Request.Header.Peek(HeaderAuthorization))
  218. },
  219. }
  220. go s.Serve(ln) //nolint:errcheck
  221. c := &Client{
  222. Dial: func(addr string) (net.Conn, error) {
  223. return ln.Dial()
  224. },
  225. }
  226. for up, expected := range cases {
  227. req := AcquireRequest()
  228. req.Header.SetMethod(MethodGet)
  229. req.SetRequestURI("http://" + up + "example.com/foo/bar")
  230. if err := c.Do(req, nil); err != nil {
  231. t.Fatal(err)
  232. }
  233. val := <-ch
  234. if val != expected {
  235. t.Fatalf("wrong %s header: %s expected %s", HeaderAuthorization, val, expected)
  236. }
  237. }
  238. }
  239. func TestClientNilResp(t *testing.T) {
  240. t.Parallel()
  241. ln := fasthttputil.NewInmemoryListener()
  242. s := &Server{
  243. Handler: func(ctx *RequestCtx) {
  244. },
  245. }
  246. go s.Serve(ln) //nolint:errcheck
  247. c := &Client{
  248. Dial: func(addr string) (net.Conn, error) {
  249. return ln.Dial()
  250. },
  251. }
  252. req := AcquireRequest()
  253. req.Header.SetMethod(MethodGet)
  254. req.SetRequestURI("http://example.com")
  255. if err := c.Do(req, nil); err != nil {
  256. t.Fatal(err)
  257. }
  258. if err := c.DoTimeout(req, nil, time.Second); err != nil {
  259. t.Fatal(err)
  260. }
  261. ln.Close()
  262. }
  263. func TestPipelineClientNilResp(t *testing.T) {
  264. t.Parallel()
  265. ln := fasthttputil.NewInmemoryListener()
  266. s := &Server{
  267. Handler: func(ctx *RequestCtx) {
  268. },
  269. }
  270. go s.Serve(ln) //nolint:errcheck
  271. c := &PipelineClient{
  272. Dial: func(addr string) (net.Conn, error) {
  273. return ln.Dial()
  274. },
  275. }
  276. req := AcquireRequest()
  277. req.Header.SetMethod(MethodGet)
  278. req.SetRequestURI("http://example.com")
  279. if err := c.Do(req, nil); err != nil {
  280. t.Fatal(err)
  281. }
  282. if err := c.DoTimeout(req, nil, time.Second); err != nil {
  283. t.Fatal(err)
  284. }
  285. if err := c.DoDeadline(req, nil, time.Now().Add(time.Second)); err != nil {
  286. t.Fatal(err)
  287. }
  288. }
  289. func TestClientParseConn(t *testing.T) {
  290. t.Parallel()
  291. network := "tcp"
  292. ln, _ := net.Listen(network, "127.0.0.1:0")
  293. s := &Server{
  294. Handler: func(ctx *RequestCtx) {
  295. },
  296. }
  297. go s.Serve(ln) //nolint:errcheck
  298. host := ln.Addr().String()
  299. c := &Client{}
  300. req, res := AcquireRequest(), AcquireResponse()
  301. defer func() {
  302. ReleaseRequest(req)
  303. ReleaseResponse(res)
  304. }()
  305. req.SetRequestURI("http://" + host + "")
  306. if err := c.Do(req, res); err != nil {
  307. t.Fatal(err)
  308. }
  309. if res.RemoteAddr().Network() != network {
  310. t.Fatalf("req RemoteAddr parse network fail: %s, hope: %s", res.RemoteAddr().Network(), network)
  311. }
  312. if host != res.RemoteAddr().String() {
  313. t.Fatalf("req RemoteAddr parse addr fail: %s, hope: %s", res.RemoteAddr().String(), host)
  314. }
  315. if !regexp.MustCompile(`^127\.0\.0\.1:[0-9]{4,5}$`).MatchString(res.LocalAddr().String()) {
  316. t.Fatalf("res LocalAddr addr match fail: %s, hope match: %s", res.LocalAddr().String(), "^127.0.0.1:[0-9]{4,5}$")
  317. }
  318. }
  319. func TestClientPostArgs(t *testing.T) {
  320. t.Parallel()
  321. ln := fasthttputil.NewInmemoryListener()
  322. s := &Server{
  323. Handler: func(ctx *RequestCtx) {
  324. body := ctx.Request.Body()
  325. if len(body) == 0 {
  326. return
  327. }
  328. ctx.Write(body) //nolint:errcheck
  329. },
  330. }
  331. go s.Serve(ln) //nolint:errcheck
  332. c := &Client{
  333. Dial: func(addr string) (net.Conn, error) {
  334. return ln.Dial()
  335. },
  336. }
  337. req, res := AcquireRequest(), AcquireResponse()
  338. defer func() {
  339. ReleaseRequest(req)
  340. ReleaseResponse(res)
  341. }()
  342. args := req.PostArgs()
  343. args.Add("addhttp2", "support")
  344. args.Add("fast", "http")
  345. req.Header.SetMethod(MethodPost)
  346. req.SetRequestURI("http://make.fasthttp.great?again")
  347. err := c.Do(req, res)
  348. if err != nil {
  349. t.Fatal(err)
  350. }
  351. if len(res.Body()) == 0 {
  352. t.Fatal("cannot set args as body")
  353. }
  354. }
  355. func TestClientRedirectSameSchema(t *testing.T) {
  356. t.Parallel()
  357. listenHTTPS1 := testClientRedirectListener(t, true)
  358. defer listenHTTPS1.Close()
  359. listenHTTPS2 := testClientRedirectListener(t, true)
  360. defer listenHTTPS2.Close()
  361. sHTTPS1 := testClientRedirectChangingSchemaServer(t, listenHTTPS1, listenHTTPS1, true)
  362. defer sHTTPS1.Stop()
  363. sHTTPS2 := testClientRedirectChangingSchemaServer(t, listenHTTPS2, listenHTTPS2, false)
  364. defer sHTTPS2.Stop()
  365. destURL := fmt.Sprintf("https://%s/baz", listenHTTPS1.Addr().String())
  366. urlParsed, err := url.Parse(destURL)
  367. if err != nil {
  368. t.Fatal(err)
  369. return
  370. }
  371. reqClient := &HostClient{
  372. IsTLS: true,
  373. Addr: urlParsed.Host,
  374. TLSConfig: &tls.Config{
  375. InsecureSkipVerify: true,
  376. },
  377. }
  378. statusCode, _, err := reqClient.GetTimeout(nil, destURL, 4000*time.Millisecond)
  379. if err != nil {
  380. t.Fatalf("HostClient error: %s", err)
  381. return
  382. }
  383. if statusCode != 200 {
  384. t.Fatalf("HostClient error code response %d", statusCode)
  385. return
  386. }
  387. }
  388. func TestClientRedirectClientChangingSchemaHttp2Https(t *testing.T) {
  389. t.Parallel()
  390. listenHTTPS := testClientRedirectListener(t, true)
  391. defer listenHTTPS.Close()
  392. listenHTTP := testClientRedirectListener(t, false)
  393. defer listenHTTP.Close()
  394. sHTTPS := testClientRedirectChangingSchemaServer(t, listenHTTPS, listenHTTP, true)
  395. defer sHTTPS.Stop()
  396. sHTTP := testClientRedirectChangingSchemaServer(t, listenHTTPS, listenHTTP, false)
  397. defer sHTTP.Stop()
  398. destURL := fmt.Sprintf("http://%s/baz", listenHTTP.Addr().String())
  399. reqClient := &Client{
  400. TLSConfig: &tls.Config{
  401. InsecureSkipVerify: true,
  402. },
  403. }
  404. statusCode, _, err := reqClient.GetTimeout(nil, destURL, 4000*time.Millisecond)
  405. if err != nil {
  406. t.Fatalf("HostClient error: %s", err)
  407. return
  408. }
  409. if statusCode != 200 {
  410. t.Fatalf("HostClient error code response %d", statusCode)
  411. return
  412. }
  413. }
  414. func TestClientRedirectHostClientChangingSchemaHttp2Https(t *testing.T) {
  415. t.Parallel()
  416. listenHTTPS := testClientRedirectListener(t, true)
  417. defer listenHTTPS.Close()
  418. listenHTTP := testClientRedirectListener(t, false)
  419. defer listenHTTP.Close()
  420. sHTTPS := testClientRedirectChangingSchemaServer(t, listenHTTPS, listenHTTP, true)
  421. defer sHTTPS.Stop()
  422. sHTTP := testClientRedirectChangingSchemaServer(t, listenHTTPS, listenHTTP, false)
  423. defer sHTTP.Stop()
  424. destURL := fmt.Sprintf("http://%s/baz", listenHTTP.Addr().String())
  425. urlParsed, err := url.Parse(destURL)
  426. if err != nil {
  427. t.Fatal(err)
  428. return
  429. }
  430. reqClient := &HostClient{
  431. Addr: urlParsed.Host,
  432. TLSConfig: &tls.Config{
  433. InsecureSkipVerify: true,
  434. },
  435. }
  436. _, _, err = reqClient.GetTimeout(nil, destURL, 4000*time.Millisecond)
  437. if err != ErrHostClientRedirectToDifferentScheme {
  438. t.Fatal("expected HostClient error")
  439. }
  440. }
  441. func testClientRedirectListener(t *testing.T, isTLS bool) net.Listener {
  442. var ln net.Listener
  443. var err error
  444. var tlsConfig *tls.Config
  445. if isTLS {
  446. certData, keyData, kerr := GenerateTestCertificate("localhost")
  447. if kerr != nil {
  448. t.Fatal(kerr)
  449. }
  450. cert, kerr := tls.X509KeyPair(certData, keyData)
  451. if kerr != nil {
  452. t.Fatal(kerr)
  453. }
  454. tlsConfig = &tls.Config{
  455. Certificates: []tls.Certificate{cert},
  456. }
  457. ln, err = tls.Listen("tcp", "localhost:0", tlsConfig)
  458. } else {
  459. ln, err = net.Listen("tcp", "localhost:0")
  460. }
  461. if err != nil {
  462. t.Fatalf("cannot listen isTLS %v: %s", isTLS, err)
  463. }
  464. return ln
  465. }
  466. func testClientRedirectChangingSchemaServer(t *testing.T, https, http net.Listener, isTLS bool) *testEchoServer {
  467. s := &Server{
  468. Handler: func(ctx *RequestCtx) {
  469. if ctx.IsTLS() {
  470. ctx.SetStatusCode(200)
  471. } else {
  472. ctx.Redirect(fmt.Sprintf("https://%s/baz", https.Addr().String()), 301)
  473. }
  474. },
  475. }
  476. var ln net.Listener
  477. if isTLS {
  478. ln = https
  479. } else {
  480. ln = http
  481. }
  482. ch := make(chan struct{})
  483. go func() {
  484. err := s.Serve(ln)
  485. if err != nil {
  486. t.Errorf("unexpected error returned from Serve(): %s", err)
  487. }
  488. close(ch)
  489. }()
  490. return &testEchoServer{
  491. s: s,
  492. ln: ln,
  493. ch: ch,
  494. t: t,
  495. }
  496. }
  497. func TestClientHeaderCase(t *testing.T) {
  498. t.Parallel()
  499. ln := fasthttputil.NewInmemoryListener()
  500. defer ln.Close()
  501. go func() {
  502. c, err := ln.Accept()
  503. if err != nil {
  504. t.Error(err)
  505. }
  506. c.Write([]byte("HTTP/1.1 200 OK\r\n" + //nolint:errcheck
  507. "content-type: text/plain\r\n" +
  508. "transfer-encoding: chunked\r\n\r\n" +
  509. "24\r\nThis is the data in the first chunk \r\n" +
  510. "1B\r\nand this is the second one \r\n" +
  511. "0\r\n\r\n",
  512. ))
  513. }()
  514. c := &Client{
  515. Dial: func(addr string) (net.Conn, error) {
  516. return ln.Dial()
  517. },
  518. ReadTimeout: time.Millisecond * 10,
  519. // Even without name normalizing we should parse headers correctly.
  520. DisableHeaderNamesNormalizing: true,
  521. }
  522. code, body, err := c.Get(nil, "http://example.com")
  523. if err != nil {
  524. t.Error(err)
  525. } else if code != 200 {
  526. t.Errorf("expected status code 200 got %d", code)
  527. } else if string(body) != "This is the data in the first chunk and this is the second one " {
  528. t.Errorf("wrong body: %q", body)
  529. }
  530. }
  531. func TestClientReadTimeout(t *testing.T) {
  532. if runtime.GOOS == "windows" {
  533. t.SkipNow()
  534. }
  535. t.Parallel()
  536. ln := fasthttputil.NewInmemoryListener()
  537. timeout := false
  538. s := &Server{
  539. Handler: func(ctx *RequestCtx) {
  540. if timeout {
  541. time.Sleep(time.Second)
  542. } else {
  543. timeout = true
  544. }
  545. },
  546. Logger: &testLogger{}, // Don't print closed pipe errors.
  547. }
  548. go s.Serve(ln) //nolint:errcheck
  549. c := &HostClient{
  550. ReadTimeout: time.Millisecond * 400,
  551. MaxIdemponentCallAttempts: 1,
  552. Dial: func(addr string) (net.Conn, error) {
  553. return ln.Dial()
  554. },
  555. }
  556. req := AcquireRequest()
  557. res := AcquireResponse()
  558. req.SetRequestURI("http://localhost")
  559. // Setting Connection: Close will make the connection be
  560. // returned to the pool.
  561. req.SetConnectionClose()
  562. if err := c.Do(req, res); err != nil {
  563. t.Fatal(err)
  564. }
  565. ReleaseRequest(req)
  566. ReleaseResponse(res)
  567. done := make(chan struct{})
  568. go func() {
  569. req := AcquireRequest()
  570. res := AcquireResponse()
  571. req.SetRequestURI("http://localhost")
  572. req.SetConnectionClose()
  573. if err := c.Do(req, res); err != ErrTimeout {
  574. t.Errorf("expected ErrTimeout got %#v", err)
  575. }
  576. ReleaseRequest(req)
  577. ReleaseResponse(res)
  578. close(done)
  579. }()
  580. select {
  581. case <-done:
  582. // This shouldn't take longer than the timeout times the number of requests it is going to try to do.
  583. // Give it an extra second just to be sure.
  584. case <-time.After(c.ReadTimeout*time.Duration(c.MaxIdemponentCallAttempts) + time.Second):
  585. t.Fatal("Client.ReadTimeout didn't work")
  586. }
  587. }
  588. func TestClientDefaultUserAgent(t *testing.T) {
  589. t.Parallel()
  590. ln := fasthttputil.NewInmemoryListener()
  591. userAgentSeen := ""
  592. s := &Server{
  593. Handler: func(ctx *RequestCtx) {
  594. userAgentSeen = string(ctx.UserAgent())
  595. },
  596. }
  597. go s.Serve(ln) //nolint:errcheck
  598. c := &Client{
  599. Dial: func(addr string) (net.Conn, error) {
  600. return ln.Dial()
  601. },
  602. }
  603. req := AcquireRequest()
  604. res := AcquireResponse()
  605. req.SetRequestURI("http://example.com")
  606. err := c.Do(req, res)
  607. if err != nil {
  608. t.Fatal(err)
  609. }
  610. if userAgentSeen != string(defaultUserAgent) {
  611. t.Fatalf("User-Agent defers %q != %q", userAgentSeen, defaultUserAgent)
  612. }
  613. }
  614. func TestClientSetUserAgent(t *testing.T) {
  615. t.Parallel()
  616. ln := fasthttputil.NewInmemoryListener()
  617. userAgentSeen := ""
  618. s := &Server{
  619. Handler: func(ctx *RequestCtx) {
  620. userAgentSeen = string(ctx.UserAgent())
  621. },
  622. }
  623. go s.Serve(ln) //nolint:errcheck
  624. userAgent := "I'm not fasthttp"
  625. c := &Client{
  626. Name: userAgent,
  627. Dial: func(addr string) (net.Conn, error) {
  628. return ln.Dial()
  629. },
  630. }
  631. req := AcquireRequest()
  632. res := AcquireResponse()
  633. req.SetRequestURI("http://example.com")
  634. err := c.Do(req, res)
  635. if err != nil {
  636. t.Fatal(err)
  637. }
  638. if userAgentSeen != userAgent {
  639. t.Fatalf("User-Agent defers %q != %q", userAgentSeen, userAgent)
  640. }
  641. }
  642. func TestClientNoUserAgent(t *testing.T) {
  643. ln := fasthttputil.NewInmemoryListener()
  644. userAgentSeen := ""
  645. s := &Server{
  646. Handler: func(ctx *RequestCtx) {
  647. userAgentSeen = string(ctx.UserAgent())
  648. },
  649. }
  650. go s.Serve(ln) //nolint:errcheck
  651. c := &Client{
  652. NoDefaultUserAgentHeader: true,
  653. Dial: func(addr string) (net.Conn, error) {
  654. return ln.Dial()
  655. },
  656. }
  657. req := AcquireRequest()
  658. res := AcquireResponse()
  659. req.SetRequestURI("http://example.com")
  660. err := c.Do(req, res)
  661. if err != nil {
  662. t.Fatal(err)
  663. }
  664. if userAgentSeen != "" {
  665. t.Fatalf("User-Agent wrong %q != %q", userAgentSeen, "")
  666. }
  667. }
  668. func TestClientDoWithCustomHeaders(t *testing.T) {
  669. t.Parallel()
  670. // make sure that the client sends all the request headers and body.
  671. ln := fasthttputil.NewInmemoryListener()
  672. c := &Client{
  673. Dial: func(addr string) (net.Conn, error) {
  674. return ln.Dial()
  675. },
  676. }
  677. uri := "/foo/bar/baz?a=b&cd=12"
  678. headers := map[string]string{
  679. "Foo": "bar",
  680. "Host": "xxx.com",
  681. "Content-Type": "asdfsdf",
  682. "a-b-c-d-f": "",
  683. }
  684. body := "request body"
  685. ch := make(chan error)
  686. go func() {
  687. conn, err := ln.Accept()
  688. if err != nil {
  689. ch <- fmt.Errorf("cannot accept client connection: %w", err)
  690. return
  691. }
  692. br := bufio.NewReader(conn)
  693. var req Request
  694. if err = req.Read(br); err != nil {
  695. ch <- fmt.Errorf("cannot read client request: %w", err)
  696. return
  697. }
  698. if string(req.Header.Method()) != MethodPost {
  699. ch <- fmt.Errorf("unexpected request method: %q. Expecting %q", req.Header.Method(), MethodPost)
  700. return
  701. }
  702. reqURI := req.RequestURI()
  703. if string(reqURI) != uri {
  704. ch <- fmt.Errorf("unexpected request uri: %q. Expecting %q", reqURI, uri)
  705. return
  706. }
  707. for k, v := range headers {
  708. hv := req.Header.Peek(k)
  709. if string(hv) != v {
  710. ch <- fmt.Errorf("unexpected value for header %q: %q. Expecting %q", k, hv, v)
  711. return
  712. }
  713. }
  714. cl := req.Header.ContentLength()
  715. if cl != len(body) {
  716. ch <- fmt.Errorf("unexpected content-length %d. Expecting %d", cl, len(body))
  717. return
  718. }
  719. reqBody := req.Body()
  720. if string(reqBody) != body {
  721. ch <- fmt.Errorf("unexpected request body: %q. Expecting %q", reqBody, body)
  722. return
  723. }
  724. var resp Response
  725. bw := bufio.NewWriter(conn)
  726. if err = resp.Write(bw); err != nil {
  727. ch <- fmt.Errorf("cannot send response: %w", err)
  728. return
  729. }
  730. if err = bw.Flush(); err != nil {
  731. ch <- fmt.Errorf("cannot flush response: %w", err)
  732. return
  733. }
  734. ch <- nil
  735. }()
  736. var req Request
  737. req.Header.SetMethod(MethodPost)
  738. req.SetRequestURI(uri)
  739. for k, v := range headers {
  740. req.Header.Set(k, v)
  741. }
  742. req.SetBodyString(body)
  743. var resp Response
  744. err := c.DoTimeout(&req, &resp, time.Second)
  745. if err != nil {
  746. t.Fatalf("error when doing request: %s", err)
  747. }
  748. select {
  749. case <-ch:
  750. case <-time.After(5 * time.Second):
  751. t.Fatalf("timeout")
  752. }
  753. }
  754. func TestPipelineClientDoSerial(t *testing.T) {
  755. t.Parallel()
  756. testPipelineClientDoConcurrent(t, 1, 0, 0)
  757. }
  758. func TestPipelineClientDoConcurrent(t *testing.T) {
  759. t.Parallel()
  760. testPipelineClientDoConcurrent(t, 10, 0, 1)
  761. }
  762. func TestPipelineClientDoBatchDelayConcurrent(t *testing.T) {
  763. t.Parallel()
  764. testPipelineClientDoConcurrent(t, 10, 5*time.Millisecond, 1)
  765. }
  766. func TestPipelineClientDoBatchDelayConcurrentMultiConn(t *testing.T) {
  767. t.Parallel()
  768. testPipelineClientDoConcurrent(t, 10, 5*time.Millisecond, 3)
  769. }
  770. func testPipelineClientDoConcurrent(t *testing.T, concurrency int, maxBatchDelay time.Duration, maxConns int) {
  771. ln := fasthttputil.NewInmemoryListener()
  772. s := &Server{
  773. Handler: func(ctx *RequestCtx) {
  774. ctx.WriteString("OK") //nolint:errcheck
  775. },
  776. }
  777. serverStopCh := make(chan struct{})
  778. go func() {
  779. if err := s.Serve(ln); err != nil {
  780. t.Errorf("unexpected error: %s", err)
  781. }
  782. close(serverStopCh)
  783. }()
  784. c := &PipelineClient{
  785. Dial: func(addr string) (net.Conn, error) {
  786. return ln.Dial()
  787. },
  788. MaxConns: maxConns,
  789. MaxPendingRequests: concurrency,
  790. MaxBatchDelay: maxBatchDelay,
  791. Logger: &testLogger{},
  792. }
  793. clientStopCh := make(chan struct{}, concurrency)
  794. for i := 0; i < concurrency; i++ {
  795. go func() {
  796. testPipelineClientDo(t, c)
  797. clientStopCh <- struct{}{}
  798. }()
  799. }
  800. for i := 0; i < concurrency; i++ {
  801. select {
  802. case <-clientStopCh:
  803. case <-time.After(3 * time.Second):
  804. t.Fatalf("timeout")
  805. }
  806. }
  807. if c.PendingRequests() != 0 {
  808. t.Fatalf("unexpected number of pending requests: %d. Expecting zero", c.PendingRequests())
  809. }
  810. if err := ln.Close(); err != nil {
  811. t.Fatalf("unexpected error: %s", err)
  812. }
  813. select {
  814. case <-serverStopCh:
  815. case <-time.After(time.Second):
  816. t.Fatalf("timeout")
  817. }
  818. }
  819. func testPipelineClientDo(t *testing.T, c *PipelineClient) {
  820. var err error
  821. req := AcquireRequest()
  822. req.SetRequestURI("http://foobar/baz")
  823. resp := AcquireResponse()
  824. for i := 0; i < 10; i++ {
  825. if i&1 == 0 {
  826. err = c.DoTimeout(req, resp, time.Second)
  827. } else {
  828. err = c.Do(req, resp)
  829. }
  830. if err != nil {
  831. if err == ErrPipelineOverflow {
  832. time.Sleep(10 * time.Millisecond)
  833. continue
  834. }
  835. t.Fatalf("unexpected error on iteration %d: %s", i, err)
  836. }
  837. if resp.StatusCode() != StatusOK {
  838. t.Fatalf("unexpected status code: %d. Expecting %d", resp.StatusCode(), StatusOK)
  839. }
  840. body := string(resp.Body())
  841. if body != "OK" {
  842. t.Fatalf("unexpected body: %q. Expecting %q", body, "OK")
  843. }
  844. // sleep for a while, so the connection to the host may expire.
  845. if i%5 == 0 {
  846. time.Sleep(30 * time.Millisecond)
  847. }
  848. }
  849. ReleaseRequest(req)
  850. ReleaseResponse(resp)
  851. }
  852. func TestPipelineClientDoDisableHeaderNamesNormalizing(t *testing.T) {
  853. t.Parallel()
  854. testPipelineClientDisableHeaderNamesNormalizing(t, 0)
  855. }
  856. func TestPipelineClientDoTimeoutDisableHeaderNamesNormalizing(t *testing.T) {
  857. t.Parallel()
  858. testPipelineClientDisableHeaderNamesNormalizing(t, time.Second)
  859. }
  860. func testPipelineClientDisableHeaderNamesNormalizing(t *testing.T, timeout time.Duration) {
  861. ln := fasthttputil.NewInmemoryListener()
  862. s := &Server{
  863. Handler: func(ctx *RequestCtx) {
  864. ctx.Response.Header.Set("foo-BAR", "baz")
  865. },
  866. DisableHeaderNamesNormalizing: true,
  867. }
  868. serverStopCh := make(chan struct{})
  869. go func() {
  870. if err := s.Serve(ln); err != nil {
  871. t.Errorf("unexpected error: %s", err)
  872. }
  873. close(serverStopCh)
  874. }()
  875. c := &PipelineClient{
  876. Dial: func(addr string) (net.Conn, error) {
  877. return ln.Dial()
  878. },
  879. DisableHeaderNamesNormalizing: true,
  880. }
  881. var req Request
  882. req.SetRequestURI("http://aaaai.com/bsdf?sddfsd")
  883. var resp Response
  884. for i := 0; i < 5; i++ {
  885. if timeout > 0 {
  886. if err := c.DoTimeout(&req, &resp, timeout); err != nil {
  887. t.Fatalf("unexpected error: %s", err)
  888. }
  889. } else {
  890. if err := c.Do(&req, &resp); err != nil {
  891. t.Fatalf("unexpected error: %s", err)
  892. }
  893. }
  894. hv := resp.Header.Peek("foo-BAR")
  895. if string(hv) != "baz" {
  896. t.Fatalf("unexpected header value: %q. Expecting %q", hv, "baz")
  897. }
  898. hv = resp.Header.Peek("Foo-Bar")
  899. if len(hv) > 0 {
  900. t.Fatalf("unexpected non-empty header value %q", hv)
  901. }
  902. }
  903. if err := ln.Close(); err != nil {
  904. t.Fatalf("unexpected error: %s", err)
  905. }
  906. select {
  907. case <-serverStopCh:
  908. case <-time.After(time.Second):
  909. t.Fatalf("timeout")
  910. }
  911. }
  912. func TestClientDoTimeoutDisableHeaderNamesNormalizing(t *testing.T) {
  913. t.Parallel()
  914. ln := fasthttputil.NewInmemoryListener()
  915. s := &Server{
  916. Handler: func(ctx *RequestCtx) {
  917. ctx.Response.Header.Set("foo-BAR", "baz")
  918. },
  919. DisableHeaderNamesNormalizing: true,
  920. }
  921. serverStopCh := make(chan struct{})
  922. go func() {
  923. if err := s.Serve(ln); err != nil {
  924. t.Errorf("unexpected error: %s", err)
  925. }
  926. close(serverStopCh)
  927. }()
  928. c := &Client{
  929. Dial: func(addr string) (net.Conn, error) {
  930. return ln.Dial()
  931. },
  932. DisableHeaderNamesNormalizing: true,
  933. }
  934. var req Request
  935. req.SetRequestURI("http://aaaai.com/bsdf?sddfsd")
  936. var resp Response
  937. for i := 0; i < 5; i++ {
  938. if err := c.DoTimeout(&req, &resp, time.Second); err != nil {
  939. t.Fatalf("unexpected error: %s", err)
  940. }
  941. hv := resp.Header.Peek("foo-BAR")
  942. if string(hv) != "baz" {
  943. t.Fatalf("unexpected header value: %q. Expecting %q", hv, "baz")
  944. }
  945. hv = resp.Header.Peek("Foo-Bar")
  946. if len(hv) > 0 {
  947. t.Fatalf("unexpected non-empty header value %q", hv)
  948. }
  949. }
  950. if err := ln.Close(); err != nil {
  951. t.Fatalf("unexpected error: %s", err)
  952. }
  953. select {
  954. case <-serverStopCh:
  955. case <-time.After(time.Second):
  956. t.Fatalf("timeout")
  957. }
  958. }
  959. func TestClientDoTimeoutDisablePathNormalizing(t *testing.T) {
  960. t.Parallel()
  961. ln := fasthttputil.NewInmemoryListener()
  962. s := &Server{
  963. Handler: func(ctx *RequestCtx) {
  964. uri := ctx.URI()
  965. uri.DisablePathNormalizing = true
  966. ctx.Response.Header.Set("received-uri", string(uri.FullURI()))
  967. },
  968. }
  969. serverStopCh := make(chan struct{})
  970. go func() {
  971. if err := s.Serve(ln); err != nil {
  972. t.Errorf("unexpected error: %s", err)
  973. }
  974. close(serverStopCh)
  975. }()
  976. c := &Client{
  977. Dial: func(addr string) (net.Conn, error) {
  978. return ln.Dial()
  979. },
  980. DisablePathNormalizing: true,
  981. }
  982. urlWithEncodedPath := "http://example.com/encoded/Y%2BY%2FY%3D/stuff"
  983. var req Request
  984. req.SetRequestURI(urlWithEncodedPath)
  985. var resp Response
  986. for i := 0; i < 5; i++ {
  987. if err := c.DoTimeout(&req, &resp, time.Second); err != nil {
  988. t.Fatalf("unexpected error: %s", err)
  989. }
  990. hv := resp.Header.Peek("received-uri")
  991. if string(hv) != urlWithEncodedPath {
  992. t.Fatalf("request uri was normalized: %q. Expecting %q", hv, urlWithEncodedPath)
  993. }
  994. }
  995. if err := ln.Close(); err != nil {
  996. t.Fatalf("unexpected error: %s", err)
  997. }
  998. select {
  999. case <-serverStopCh:
  1000. case <-time.After(time.Second):
  1001. t.Fatalf("timeout")
  1002. }
  1003. }
  1004. func TestHostClientPendingRequests(t *testing.T) {
  1005. t.Parallel()
  1006. const concurrency = 10
  1007. doneCh := make(chan struct{})
  1008. readyCh := make(chan struct{}, concurrency)
  1009. s := &Server{
  1010. Handler: func(ctx *RequestCtx) {
  1011. readyCh <- struct{}{}
  1012. <-doneCh
  1013. },
  1014. }
  1015. ln := fasthttputil.NewInmemoryListener()
  1016. serverStopCh := make(chan struct{})
  1017. go func() {
  1018. if err := s.Serve(ln); err != nil {
  1019. t.Errorf("unexpected error: %s", err)
  1020. }
  1021. close(serverStopCh)
  1022. }()
  1023. c := &HostClient{
  1024. Addr: "foobar",
  1025. Dial: func(addr string) (net.Conn, error) {
  1026. return ln.Dial()
  1027. },
  1028. }
  1029. pendingRequests := c.PendingRequests()
  1030. if pendingRequests != 0 {
  1031. t.Fatalf("non-zero pendingRequests: %d", pendingRequests)
  1032. }
  1033. resultCh := make(chan error, concurrency)
  1034. for i := 0; i < concurrency; i++ {
  1035. go func() {
  1036. req := AcquireRequest()
  1037. req.SetRequestURI("http://foobar/baz")
  1038. resp := AcquireResponse()
  1039. if err := c.DoTimeout(req, resp, 10*time.Second); err != nil {
  1040. resultCh <- fmt.Errorf("unexpected error: %w", err)
  1041. return
  1042. }
  1043. if resp.StatusCode() != StatusOK {
  1044. resultCh <- fmt.Errorf("unexpected status code %d. Expecting %d", resp.StatusCode(), StatusOK)
  1045. return
  1046. }
  1047. resultCh <- nil
  1048. }()
  1049. }
  1050. // wait while all the requests reach server
  1051. for i := 0; i < concurrency; i++ {
  1052. select {
  1053. case <-readyCh:
  1054. case <-time.After(time.Second):
  1055. t.Fatalf("timeout")
  1056. }
  1057. }
  1058. pendingRequests = c.PendingRequests()
  1059. if pendingRequests != concurrency {
  1060. t.Fatalf("unexpected pendingRequests: %d. Expecting %d", pendingRequests, concurrency)
  1061. }
  1062. // unblock request handlers on the server and wait until all the requests are finished.
  1063. close(doneCh)
  1064. for i := 0; i < concurrency; i++ {
  1065. select {
  1066. case err := <-resultCh:
  1067. if err != nil {
  1068. t.Fatalf("unexpected error: %s", err)
  1069. }
  1070. case <-time.After(time.Second):
  1071. t.Fatalf("timeout")
  1072. }
  1073. }
  1074. pendingRequests = c.PendingRequests()
  1075. if pendingRequests != 0 {
  1076. t.Fatalf("non-zero pendingRequests: %d", pendingRequests)
  1077. }
  1078. // stop the server
  1079. if err := ln.Close(); err != nil {
  1080. t.Fatalf("unexpected error: %s", err)
  1081. }
  1082. select {
  1083. case <-serverStopCh:
  1084. case <-time.After(time.Second):
  1085. t.Fatalf("timeout")
  1086. }
  1087. }
  1088. func TestHostClientMaxConnsWithDeadline(t *testing.T) {
  1089. t.Parallel()
  1090. var (
  1091. emptyBodyCount uint8
  1092. ln = fasthttputil.NewInmemoryListener()
  1093. timeout = 200 * time.Millisecond
  1094. wg sync.WaitGroup
  1095. )
  1096. s := &Server{
  1097. Handler: func(ctx *RequestCtx) {
  1098. if len(ctx.PostBody()) == 0 {
  1099. emptyBodyCount++
  1100. }
  1101. ctx.WriteString("foo") //nolint:errcheck
  1102. },
  1103. }
  1104. serverStopCh := make(chan struct{})
  1105. go func() {
  1106. if err := s.Serve(ln); err != nil {
  1107. t.Errorf("unexpected error: %s", err)
  1108. }
  1109. close(serverStopCh)
  1110. }()
  1111. c := &HostClient{
  1112. Addr: "foobar",
  1113. Dial: func(addr string) (net.Conn, error) {
  1114. return ln.Dial()
  1115. },
  1116. MaxConns: 1,
  1117. }
  1118. for i := 0; i < 5; i++ {
  1119. wg.Add(1)
  1120. go func() {
  1121. defer wg.Done()
  1122. req := AcquireRequest()
  1123. req.SetRequestURI("http://foobar/baz")
  1124. req.Header.SetMethod(MethodPost)
  1125. req.SetBodyString("bar")
  1126. resp := AcquireResponse()
  1127. for {
  1128. if err := c.DoDeadline(req, resp, time.Now().Add(timeout)); err != nil {
  1129. if err == ErrNoFreeConns {
  1130. time.Sleep(time.Millisecond)
  1131. continue
  1132. }
  1133. t.Errorf("unexpected error: %s", err)
  1134. }
  1135. break
  1136. }
  1137. if resp.StatusCode() != StatusOK {
  1138. t.Errorf("unexpected status code %d. Expecting %d", resp.StatusCode(), StatusOK)
  1139. }
  1140. body := resp.Body()
  1141. if string(body) != "foo" {
  1142. t.Errorf("unexpected body %q. Expecting %q", body, "abcd")
  1143. }
  1144. }()
  1145. }
  1146. wg.Wait()
  1147. if err := ln.Close(); err != nil {
  1148. t.Fatalf("unexpected error: %s", err)
  1149. }
  1150. select {
  1151. case <-serverStopCh:
  1152. case <-time.After(time.Second):
  1153. t.Fatalf("timeout")
  1154. }
  1155. if emptyBodyCount > 0 {
  1156. t.Fatalf("at least one request body was empty")
  1157. }
  1158. }
  1159. func TestHostClientMaxConnDuration(t *testing.T) {
  1160. t.Parallel()
  1161. ln := fasthttputil.NewInmemoryListener()
  1162. connectionCloseCount := uint32(0)
  1163. s := &Server{
  1164. Handler: func(ctx *RequestCtx) {
  1165. ctx.WriteString("abcd") //nolint:errcheck
  1166. if ctx.Request.ConnectionClose() {
  1167. atomic.AddUint32(&connectionCloseCount, 1)
  1168. }
  1169. },
  1170. }
  1171. serverStopCh := make(chan struct{})
  1172. go func() {
  1173. if err := s.Serve(ln); err != nil {
  1174. t.Errorf("unexpected error: %s", err)
  1175. }
  1176. close(serverStopCh)
  1177. }()
  1178. c := &HostClient{
  1179. Addr: "foobar",
  1180. Dial: func(addr string) (net.Conn, error) {
  1181. return ln.Dial()
  1182. },
  1183. MaxConnDuration: 10 * time.Millisecond,
  1184. }
  1185. for i := 0; i < 5; i++ {
  1186. statusCode, body, err := c.Get(nil, "http://aaaa.com/bbb/cc")
  1187. if err != nil {
  1188. t.Fatalf("unexpected error: %s", err)
  1189. }
  1190. if statusCode != StatusOK {
  1191. t.Fatalf("unexpected status code %d. Expecting %d", statusCode, StatusOK)
  1192. }
  1193. if string(body) != "abcd" {
  1194. t.Fatalf("unexpected body %q. Expecting %q", body, "abcd")
  1195. }
  1196. time.Sleep(c.MaxConnDuration)
  1197. }
  1198. if err := ln.Close(); err != nil {
  1199. t.Fatalf("unexpected error: %s", err)
  1200. }
  1201. select {
  1202. case <-serverStopCh:
  1203. case <-time.After(time.Second):
  1204. t.Fatalf("timeout")
  1205. }
  1206. if connectionCloseCount == 0 {
  1207. t.Fatalf("expecting at least one 'Connection: close' request header")
  1208. }
  1209. }
  1210. func TestHostClientMultipleAddrs(t *testing.T) {
  1211. t.Parallel()
  1212. ln := fasthttputil.NewInmemoryListener()
  1213. s := &Server{
  1214. Handler: func(ctx *RequestCtx) {
  1215. ctx.Write(ctx.Host()) //nolint:errcheck
  1216. ctx.SetConnectionClose()
  1217. },
  1218. }
  1219. serverStopCh := make(chan struct{})
  1220. go func() {
  1221. if err := s.Serve(ln); err != nil {
  1222. t.Errorf("unexpected error: %s", err)
  1223. }
  1224. close(serverStopCh)
  1225. }()
  1226. dialsCount := make(map[string]int)
  1227. c := &HostClient{
  1228. Addr: "foo,bar,baz",
  1229. Dial: func(addr string) (net.Conn, error) {
  1230. dialsCount[addr]++
  1231. return ln.Dial()
  1232. },
  1233. }
  1234. for i := 0; i < 9; i++ {
  1235. statusCode, body, err := c.Get(nil, "http://foobar/baz/aaa?bbb=ddd")
  1236. if err != nil {
  1237. t.Fatalf("unexpected error: %s", err)
  1238. }
  1239. if statusCode != StatusOK {
  1240. t.Fatalf("unexpected status code %d. Expecting %d", statusCode, StatusOK)
  1241. }
  1242. if string(body) != "foobar" {
  1243. t.Fatalf("unexpected body %q. Expecting %q", body, "foobar")
  1244. }
  1245. }
  1246. if err := ln.Close(); err != nil {
  1247. t.Fatalf("unexpected error: %s", err)
  1248. }
  1249. select {
  1250. case <-serverStopCh:
  1251. case <-time.After(time.Second):
  1252. t.Fatalf("timeout")
  1253. }
  1254. if len(dialsCount) != 3 {
  1255. t.Fatalf("unexpected dialsCount size %d. Expecting 3", len(dialsCount))
  1256. }
  1257. for _, k := range []string{"foo", "bar", "baz"} {
  1258. if dialsCount[k] != 3 {
  1259. t.Fatalf("unexpected dialsCount for %q. Expecting 3", k)
  1260. }
  1261. }
  1262. }
  1263. func TestClientFollowRedirects(t *testing.T) {
  1264. t.Parallel()
  1265. s := &Server{
  1266. Handler: func(ctx *RequestCtx) {
  1267. switch string(ctx.Path()) {
  1268. case "/foo":
  1269. u := ctx.URI()
  1270. u.Update("/xy?z=wer")
  1271. ctx.Redirect(u.String(), StatusFound)
  1272. case "/xy":
  1273. u := ctx.URI()
  1274. u.Update("/bar")
  1275. ctx.Redirect(u.String(), StatusFound)
  1276. default:
  1277. ctx.Success("text/plain", ctx.Path())
  1278. }
  1279. },
  1280. }
  1281. ln := fasthttputil.NewInmemoryListener()
  1282. serverStopCh := make(chan struct{})
  1283. go func() {
  1284. if err := s.Serve(ln); err != nil {
  1285. t.Errorf("unexpected error: %s", err)
  1286. }
  1287. close(serverStopCh)
  1288. }()
  1289. c := &HostClient{
  1290. Addr: "xxx",
  1291. Dial: func(addr string) (net.Conn, error) {
  1292. return ln.Dial()
  1293. },
  1294. }
  1295. for i := 0; i < 10; i++ {
  1296. statusCode, body, err := c.GetTimeout(nil, "http://xxx/foo", time.Second)
  1297. if err != nil {
  1298. t.Fatalf("unexpected error: %s", err)
  1299. }
  1300. if statusCode != StatusOK {
  1301. t.Fatalf("unexpected status code: %d", statusCode)
  1302. }
  1303. if string(body) != "/bar" {
  1304. t.Fatalf("unexpected response %q. Expecting %q", body, "/bar")
  1305. }
  1306. }
  1307. for i := 0; i < 10; i++ {
  1308. statusCode, body, err := c.Get(nil, "http://xxx/aaab/sss")
  1309. if err != nil {
  1310. t.Fatalf("unexpected error: %s", err)
  1311. }
  1312. if statusCode != StatusOK {
  1313. t.Fatalf("unexpected status code: %d", statusCode)
  1314. }
  1315. if string(body) != "/aaab/sss" {
  1316. t.Fatalf("unexpected response %q. Expecting %q", body, "/aaab/sss")
  1317. }
  1318. }
  1319. for i := 0; i < 10; i++ {
  1320. req := AcquireRequest()
  1321. resp := AcquireResponse()
  1322. req.SetRequestURI("http://xxx/foo")
  1323. err := c.DoRedirects(req, resp, 16)
  1324. if err != nil {
  1325. t.Fatalf("unexpected error: %s", err)
  1326. }
  1327. if statusCode := resp.StatusCode(); statusCode != StatusOK {
  1328. t.Fatalf("unexpected status code: %d", statusCode)
  1329. }
  1330. if body := string(resp.Body()); body != "/bar" {
  1331. t.Fatalf("unexpected response %q. Expecting %q", body, "/bar")
  1332. }
  1333. ReleaseRequest(req)
  1334. ReleaseResponse(resp)
  1335. }
  1336. req := AcquireRequest()
  1337. resp := AcquireResponse()
  1338. req.SetRequestURI("http://xxx/foo")
  1339. err := c.DoRedirects(req, resp, 0)
  1340. if have, want := err, ErrTooManyRedirects; have != want {
  1341. t.Fatalf("want error: %v, have %v", want, have)
  1342. }
  1343. ReleaseRequest(req)
  1344. ReleaseResponse(resp)
  1345. }
  1346. func TestClientGetTimeoutSuccess(t *testing.T) {
  1347. t.Parallel()
  1348. s := startEchoServer(t, "tcp", "127.0.0.1:")
  1349. defer s.Stop()
  1350. testClientGetTimeoutSuccess(t, &defaultClient, "http://"+s.Addr(), 100)
  1351. }
  1352. func TestClientGetTimeoutSuccessConcurrent(t *testing.T) {
  1353. t.Parallel()
  1354. s := startEchoServer(t, "tcp", "127.0.0.1:")
  1355. defer s.Stop()
  1356. var wg sync.WaitGroup
  1357. for i := 0; i < 10; i++ {
  1358. wg.Add(1)
  1359. go func() {
  1360. defer wg.Done()
  1361. testClientGetTimeoutSuccess(t, &defaultClient, "http://"+s.Addr(), 100)
  1362. }()
  1363. }
  1364. wg.Wait()
  1365. }
  1366. func TestClientDoTimeoutSuccess(t *testing.T) {
  1367. t.Parallel()
  1368. s := startEchoServer(t, "tcp", "127.0.0.1:")
  1369. defer s.Stop()
  1370. testClientDoTimeoutSuccess(t, &defaultClient, "http://"+s.Addr(), 100)
  1371. }
  1372. func TestClientDoTimeoutSuccessConcurrent(t *testing.T) {
  1373. t.Parallel()
  1374. s := startEchoServer(t, "tcp", "127.0.0.1:")
  1375. defer s.Stop()
  1376. var wg sync.WaitGroup
  1377. for i := 0; i < 10; i++ {
  1378. wg.Add(1)
  1379. go func() {
  1380. defer wg.Done()
  1381. testClientDoTimeoutSuccess(t, &defaultClient, "http://"+s.Addr(), 100)
  1382. }()
  1383. }
  1384. wg.Wait()
  1385. }
  1386. func TestClientGetTimeoutError(t *testing.T) {
  1387. t.Parallel()
  1388. c := &Client{
  1389. Dial: func(addr string) (net.Conn, error) {
  1390. return &readTimeoutConn{t: time.Second}, nil
  1391. },
  1392. }
  1393. testClientGetTimeoutError(t, c, 100)
  1394. }
  1395. func TestClientGetTimeoutErrorConcurrent(t *testing.T) {
  1396. t.Parallel()
  1397. c := &Client{
  1398. Dial: func(addr string) (net.Conn, error) {
  1399. return &readTimeoutConn{t: time.Second}, nil
  1400. },
  1401. MaxConnsPerHost: 1000,
  1402. }
  1403. var wg sync.WaitGroup
  1404. for i := 0; i < 10; i++ {
  1405. wg.Add(1)
  1406. go func() {
  1407. defer wg.Done()
  1408. testClientGetTimeoutError(t, c, 100)
  1409. }()
  1410. }
  1411. wg.Wait()
  1412. }
  1413. func TestClientDoTimeoutError(t *testing.T) {
  1414. t.Parallel()
  1415. c := &Client{
  1416. Dial: func(addr string) (net.Conn, error) {
  1417. return &readTimeoutConn{t: time.Second}, nil
  1418. },
  1419. }
  1420. testClientDoTimeoutError(t, c, 100)
  1421. }
  1422. func TestClientDoTimeoutErrorConcurrent(t *testing.T) {
  1423. t.Parallel()
  1424. c := &Client{
  1425. Dial: func(addr string) (net.Conn, error) {
  1426. return &readTimeoutConn{t: time.Second}, nil
  1427. },
  1428. MaxConnsPerHost: 1000,
  1429. }
  1430. var wg sync.WaitGroup
  1431. for i := 0; i < 10; i++ {
  1432. wg.Add(1)
  1433. go func() {
  1434. defer wg.Done()
  1435. testClientDoTimeoutError(t, c, 100)
  1436. }()
  1437. }
  1438. wg.Wait()
  1439. }
  1440. func testClientDoTimeoutError(t *testing.T, c *Client, n int) {
  1441. var req Request
  1442. var resp Response
  1443. req.SetRequestURI("http://foobar.com/baz")
  1444. for i := 0; i < n; i++ {
  1445. err := c.DoTimeout(&req, &resp, time.Millisecond)
  1446. if err == nil {
  1447. t.Fatalf("expecting error")
  1448. }
  1449. if err != ErrTimeout {
  1450. t.Fatalf("unexpected error: %s. Expecting %s", err, ErrTimeout)
  1451. }
  1452. }
  1453. }
  1454. func testClientGetTimeoutError(t *testing.T, c *Client, n int) {
  1455. buf := make([]byte, 10)
  1456. for i := 0; i < n; i++ {
  1457. statusCode, body, err := c.GetTimeout(buf, "http://foobar.com/baz", time.Millisecond)
  1458. if err == nil {
  1459. t.Fatalf("expecting error")
  1460. }
  1461. if err != ErrTimeout {
  1462. t.Fatalf("unexpected error: %s. Expecting %s", err, ErrTimeout)
  1463. }
  1464. if statusCode != 0 {
  1465. t.Fatalf("unexpected statusCode=%d. Expecting %d", statusCode, 0)
  1466. }
  1467. if body == nil {
  1468. t.Fatalf("body must be non-nil")
  1469. }
  1470. }
  1471. }
  1472. type readTimeoutConn struct {
  1473. net.Conn
  1474. t time.Duration
  1475. }
  1476. func (r *readTimeoutConn) Read(p []byte) (int, error) {
  1477. time.Sleep(r.t)
  1478. return 0, io.EOF
  1479. }
  1480. func (r *readTimeoutConn) Write(p []byte) (int, error) {
  1481. return len(p), nil
  1482. }
  1483. func (r *readTimeoutConn) Close() error {
  1484. return nil
  1485. }
  1486. func (r *readTimeoutConn) LocalAddr() net.Addr {
  1487. return nil
  1488. }
  1489. func (r *readTimeoutConn) RemoteAddr() net.Addr {
  1490. return nil
  1491. }
  1492. func TestClientNonIdempotentRetry(t *testing.T) {
  1493. t.Parallel()
  1494. dialsCount := 0
  1495. c := &Client{
  1496. Dial: func(addr string) (net.Conn, error) {
  1497. dialsCount++
  1498. switch dialsCount {
  1499. case 1, 2:
  1500. return &readErrorConn{}, nil
  1501. case 3:
  1502. return &singleReadConn{
  1503. s: "HTTP/1.1 345 OK\r\nContent-Type: foobar\r\nContent-Length: 7\r\n\r\n0123456",
  1504. }, nil
  1505. default:
  1506. t.Fatalf("unexpected number of dials: %d", dialsCount)
  1507. }
  1508. panic("unreachable")
  1509. },
  1510. }
  1511. // This POST must succeed, since the readErrorConn closes
  1512. // the connection before sending any response.
  1513. // So the client must retry non-idempotent request.
  1514. dialsCount = 0
  1515. statusCode, body, err := c.Post(nil, "http://foobar/a/b", nil)
  1516. if err != nil {
  1517. t.Fatalf("unexpected error: %s", err)
  1518. }
  1519. if statusCode != 345 {
  1520. t.Fatalf("unexpected status code: %d. Expecting 345", statusCode)
  1521. }
  1522. if string(body) != "0123456" {
  1523. t.Fatalf("unexpected body: %q. Expecting %q", body, "0123456")
  1524. }
  1525. // Verify that idempotent GET succeeds.
  1526. dialsCount = 0
  1527. statusCode, body, err = c.Get(nil, "http://foobar/a/b")
  1528. if err != nil {
  1529. t.Fatalf("unexpected error: %s", err)
  1530. }
  1531. if statusCode != 345 {
  1532. t.Fatalf("unexpected status code: %d. Expecting 345", statusCode)
  1533. }
  1534. if string(body) != "0123456" {
  1535. t.Fatalf("unexpected body: %q. Expecting %q", body, "0123456")
  1536. }
  1537. }
  1538. func TestClientNonIdempotentRetry_BodyStream(t *testing.T) {
  1539. t.Parallel()
  1540. dialsCount := 0
  1541. c := &Client{
  1542. Dial: func(addr string) (net.Conn, error) {
  1543. dialsCount++
  1544. switch dialsCount {
  1545. case 1, 2:
  1546. return &readErrorConn{}, nil
  1547. case 3:
  1548. return &singleEchoConn{
  1549. b: []byte("HTTP/1.1 345 OK\r\nContent-Type: foobar\r\n\r\n"),
  1550. }, nil
  1551. default:
  1552. t.Fatalf("unexpected number of dials: %d", dialsCount)
  1553. }
  1554. panic("unreachable")
  1555. },
  1556. }
  1557. dialsCount = 0
  1558. req := Request{}
  1559. res := Response{}
  1560. req.SetRequestURI("http://foobar/a/b")
  1561. req.Header.SetMethod("POST")
  1562. body := bytes.NewBufferString("test")
  1563. req.SetBodyStream(body, body.Len())
  1564. err := c.Do(&req, &res)
  1565. if err == nil {
  1566. t.Fatal("expected error from being unable to retry a bodyStream")
  1567. }
  1568. }
  1569. func TestClientIdempotentRequest(t *testing.T) {
  1570. t.Parallel()
  1571. dialsCount := 0
  1572. c := &Client{
  1573. Dial: func(addr string) (net.Conn, error) {
  1574. dialsCount++
  1575. switch dialsCount {
  1576. case 1:
  1577. return &singleReadConn{
  1578. s: "invalid response",
  1579. }, nil
  1580. case 2:
  1581. return &writeErrorConn{}, nil
  1582. case 3:
  1583. return &readErrorConn{}, nil
  1584. case 4:
  1585. return &singleReadConn{
  1586. s: "HTTP/1.1 345 OK\r\nContent-Type: foobar\r\nContent-Length: 7\r\n\r\n0123456",
  1587. }, nil
  1588. default:
  1589. t.Fatalf("unexpected number of dials: %d", dialsCount)
  1590. }
  1591. panic("unreachable")
  1592. },
  1593. }
  1594. // idempotent GET must succeed.
  1595. statusCode, body, err := c.Get(nil, "http://foobar/a/b")
  1596. if err != nil {
  1597. t.Fatalf("unexpected error: %s", err)
  1598. }
  1599. if statusCode != 345 {
  1600. t.Fatalf("unexpected status code: %d. Expecting 345", statusCode)
  1601. }
  1602. if string(body) != "0123456" {
  1603. t.Fatalf("unexpected body: %q. Expecting %q", body, "0123456")
  1604. }
  1605. var args Args
  1606. // non-idempotent POST must fail on incorrect singleReadConn
  1607. dialsCount = 0
  1608. _, _, err = c.Post(nil, "http://foobar/a/b", &args)
  1609. if err == nil {
  1610. t.Fatalf("expecting error")
  1611. }
  1612. // non-idempotent POST must fail on incorrect singleReadConn
  1613. dialsCount = 0
  1614. _, _, err = c.Post(nil, "http://foobar/a/b", nil)
  1615. if err == nil {
  1616. t.Fatalf("expecting error")
  1617. }
  1618. }
  1619. func TestClientRetryRequestWithCustomDecider(t *testing.T) {
  1620. t.Parallel()
  1621. dialsCount := 0
  1622. c := &Client{
  1623. Dial: func(addr string) (net.Conn, error) {
  1624. dialsCount++
  1625. switch dialsCount {
  1626. case 1:
  1627. return &singleReadConn{
  1628. s: "invalid response",
  1629. }, nil
  1630. case 2:
  1631. return &writeErrorConn{}, nil
  1632. case 3:
  1633. return &readErrorConn{}, nil
  1634. case 4:
  1635. return &singleReadConn{
  1636. s: "HTTP/1.1 345 OK\r\nContent-Type: foobar\r\nContent-Length: 7\r\n\r\n0123456",
  1637. }, nil
  1638. default:
  1639. t.Fatalf("unexpected number of dials: %d", dialsCount)
  1640. }
  1641. panic("unreachable")
  1642. },
  1643. RetryIf: func(req *Request) bool {
  1644. return req.URI().String() == "http://foobar/a/b"
  1645. },
  1646. }
  1647. var args Args
  1648. // Post must succeed for http://foobar/a/b uri.
  1649. statusCode, body, err := c.Post(nil, "http://foobar/a/b", &args)
  1650. if err != nil {
  1651. t.Fatalf("unexpected error: %s", err)
  1652. }
  1653. if statusCode != 345 {
  1654. t.Fatalf("unexpected status code: %d. Expecting 345", statusCode)
  1655. }
  1656. if string(body) != "0123456" {
  1657. t.Fatalf("unexpected body: %q. Expecting %q", body, "0123456")
  1658. }
  1659. // POST must fail for http://foobar/a/b/c uri.
  1660. dialsCount = 0
  1661. _, _, err = c.Post(nil, "http://foobar/a/b/c", &args)
  1662. if err == nil {
  1663. t.Fatalf("expecting error")
  1664. }
  1665. }
  1666. func TestHostClientTransport(t *testing.T) {
  1667. t.Parallel()
  1668. ln := fasthttputil.NewInmemoryListener()
  1669. s := &Server{
  1670. Handler: func(ctx *RequestCtx) {
  1671. ctx.WriteString("abcd") //nolint:errcheck
  1672. },
  1673. }
  1674. serverStopCh := make(chan struct{})
  1675. go func() {
  1676. if err := s.Serve(ln); err != nil {
  1677. t.Errorf("unexpected error: %s", err)
  1678. }
  1679. close(serverStopCh)
  1680. }()
  1681. c := &HostClient{
  1682. Addr: "foobar",
  1683. Transport: func() TransportFunc {
  1684. c, _ := ln.Dial()
  1685. br := bufio.NewReader(c)
  1686. bw := bufio.NewWriter(c)
  1687. return func(req *Request, res *Response) error {
  1688. if err := req.Write(bw); err != nil {
  1689. return err
  1690. }
  1691. if err := bw.Flush(); err != nil {
  1692. return err
  1693. }
  1694. return res.Read(br)
  1695. }
  1696. }(),
  1697. }
  1698. for i := 0; i < 5; i++ {
  1699. statusCode, body, err := c.Get(nil, "http://aaaa.com/bbb/cc")
  1700. if err != nil {
  1701. t.Fatalf("unexpected error: %s", err)
  1702. }
  1703. if statusCode != StatusOK {
  1704. t.Fatalf("unexpected status code %d. Expecting %d", statusCode, StatusOK)
  1705. }
  1706. if string(body) != "abcd" {
  1707. t.Fatalf("unexpected body %q. Expecting %q", body, "abcd")
  1708. }
  1709. }
  1710. if err := ln.Close(); err != nil {
  1711. t.Fatalf("unexpected error: %s", err)
  1712. }
  1713. select {
  1714. case <-serverStopCh:
  1715. case <-time.After(time.Second):
  1716. t.Fatalf("timeout")
  1717. }
  1718. }
  1719. type writeErrorConn struct {
  1720. net.Conn
  1721. }
  1722. func (w *writeErrorConn) Write(p []byte) (int, error) {
  1723. return 1, fmt.Errorf("error")
  1724. }
  1725. func (w *writeErrorConn) Close() error {
  1726. return nil
  1727. }
  1728. func (w *writeErrorConn) LocalAddr() net.Addr {
  1729. return nil
  1730. }
  1731. func (w *writeErrorConn) RemoteAddr() net.Addr {
  1732. return nil
  1733. }
  1734. type readErrorConn struct {
  1735. net.Conn
  1736. }
  1737. func (r *readErrorConn) Read(p []byte) (int, error) {
  1738. return 0, fmt.Errorf("error")
  1739. }
  1740. func (r *readErrorConn) Write(p []byte) (int, error) {
  1741. return len(p), nil
  1742. }
  1743. func (r *readErrorConn) Close() error {
  1744. return nil
  1745. }
  1746. func (r *readErrorConn) LocalAddr() net.Addr {
  1747. return nil
  1748. }
  1749. func (r *readErrorConn) RemoteAddr() net.Addr {
  1750. return nil
  1751. }
  1752. type singleReadConn struct {
  1753. net.Conn
  1754. s string
  1755. n int
  1756. }
  1757. func (r *singleReadConn) Read(p []byte) (int, error) {
  1758. if len(r.s) == r.n {
  1759. return 0, io.EOF
  1760. }
  1761. n := copy(p, []byte(r.s[r.n:]))
  1762. r.n += n
  1763. return n, nil
  1764. }
  1765. func (r *singleReadConn) Write(p []byte) (int, error) {
  1766. return len(p), nil
  1767. }
  1768. func (r *singleReadConn) Close() error {
  1769. return nil
  1770. }
  1771. func (r *singleReadConn) LocalAddr() net.Addr {
  1772. return nil
  1773. }
  1774. func (r *singleReadConn) RemoteAddr() net.Addr {
  1775. return nil
  1776. }
  1777. type singleEchoConn struct {
  1778. net.Conn
  1779. b []byte
  1780. n int
  1781. }
  1782. func (r *singleEchoConn) Read(p []byte) (int, error) {
  1783. if len(r.b) == r.n {
  1784. return 0, io.EOF
  1785. }
  1786. n := copy(p, r.b[r.n:])
  1787. r.n += n
  1788. return n, nil
  1789. }
  1790. func (r *singleEchoConn) Write(p []byte) (int, error) {
  1791. r.b = append(r.b, p...)
  1792. return len(p), nil
  1793. }
  1794. func (r *singleEchoConn) Close() error {
  1795. return nil
  1796. }
  1797. func (r *singleEchoConn) LocalAddr() net.Addr {
  1798. return nil
  1799. }
  1800. func (r *singleEchoConn) RemoteAddr() net.Addr {
  1801. return nil
  1802. }
  1803. func TestSingleEchoConn(t *testing.T) {
  1804. t.Parallel()
  1805. c := &Client{
  1806. Dial: func(addr string) (net.Conn, error) {
  1807. return &singleEchoConn{
  1808. b: []byte("HTTP/1.1 345 OK\r\nContent-Type: foobar\r\n\r\n"),
  1809. }, nil
  1810. },
  1811. }
  1812. req := Request{}
  1813. res := Response{}
  1814. req.SetRequestURI("http://foobar/a/b")
  1815. req.Header.SetMethod("POST")
  1816. req.Header.Set("Content-Type", "text/plain")
  1817. body := bytes.NewBufferString("test")
  1818. req.SetBodyStream(body, body.Len())
  1819. err := c.Do(&req, &res)
  1820. if err != nil {
  1821. t.Fatalf("unexpected error: %s", err)
  1822. }
  1823. if res.StatusCode() != 345 {
  1824. t.Fatalf("unexpected status code: %d. Expecting 345", res.StatusCode())
  1825. }
  1826. expected := "POST /a/b HTTP/1.1\r\nUser-Agent: fasthttp\r\nHost: foobar\r\nContent-Type: text/plain\r\nContent-Length: 4\r\n\r\ntest"
  1827. if string(res.Body()) != expected {
  1828. t.Fatalf("unexpected body: %q. Expecting %q", res.Body(), expected)
  1829. }
  1830. }
  1831. func TestClientHTTPSInvalidServerName(t *testing.T) {
  1832. t.Parallel()
  1833. sHTTPS := startEchoServerTLS(t, "tcp", "127.0.0.1:")
  1834. defer sHTTPS.Stop()
  1835. var c Client
  1836. for i := 0; i < 10; i++ {
  1837. _, _, err := c.GetTimeout(nil, "https://"+sHTTPS.Addr(), time.Second)
  1838. if err == nil {
  1839. t.Fatalf("expecting TLS error")
  1840. }
  1841. }
  1842. }
  1843. func TestClientHTTPSConcurrent(t *testing.T) {
  1844. t.Parallel()
  1845. sHTTP := startEchoServer(t, "tcp", "127.0.0.1:")
  1846. defer sHTTP.Stop()
  1847. sHTTPS := startEchoServerTLS(t, "tcp", "127.0.0.1:")
  1848. defer sHTTPS.Stop()
  1849. c := &Client{
  1850. TLSConfig: &tls.Config{
  1851. InsecureSkipVerify: true,
  1852. },
  1853. }
  1854. var wg sync.WaitGroup
  1855. for i := 0; i < 4; i++ {
  1856. wg.Add(1)
  1857. addr := "http://" + sHTTP.Addr()
  1858. if i&1 != 0 {
  1859. addr = "https://" + sHTTPS.Addr()
  1860. }
  1861. go func() {
  1862. defer wg.Done()
  1863. testClientGet(t, c, addr, 20)
  1864. testClientPost(t, c, addr, 10)
  1865. }()
  1866. }
  1867. wg.Wait()
  1868. }
  1869. func TestClientManyServers(t *testing.T) {
  1870. t.Parallel()
  1871. var addrs []string
  1872. for i := 0; i < 10; i++ {
  1873. s := startEchoServer(t, "tcp", "127.0.0.1:")
  1874. defer s.Stop()
  1875. addrs = append(addrs, s.Addr())
  1876. }
  1877. var wg sync.WaitGroup
  1878. for i := 0; i < 4; i++ {
  1879. wg.Add(1)
  1880. addr := "http://" + addrs[i]
  1881. go func() {
  1882. defer wg.Done()
  1883. testClientGet(t, &defaultClient, addr, 20)
  1884. testClientPost(t, &defaultClient, addr, 10)
  1885. }()
  1886. }
  1887. wg.Wait()
  1888. }
  1889. func TestClientGet(t *testing.T) {
  1890. t.Parallel()
  1891. s := startEchoServer(t, "tcp", "127.0.0.1:")
  1892. defer s.Stop()
  1893. testClientGet(t, &defaultClient, "http://"+s.Addr(), 100)
  1894. }
  1895. func TestClientPost(t *testing.T) {
  1896. t.Parallel()
  1897. s := startEchoServer(t, "tcp", "127.0.0.1:")
  1898. defer s.Stop()
  1899. testClientPost(t, &defaultClient, "http://"+s.Addr(), 100)
  1900. }
  1901. func TestClientConcurrent(t *testing.T) {
  1902. t.Parallel()
  1903. s := startEchoServer(t, "tcp", "127.0.0.1:")
  1904. defer s.Stop()
  1905. addr := "http://" + s.Addr()
  1906. var wg sync.WaitGroup
  1907. for i := 0; i < 10; i++ {
  1908. wg.Add(1)
  1909. go func() {
  1910. defer wg.Done()
  1911. testClientGet(t, &defaultClient, addr, 30)
  1912. testClientPost(t, &defaultClient, addr, 10)
  1913. }()
  1914. }
  1915. wg.Wait()
  1916. }
  1917. func skipIfNotUnix(tb testing.TB) {
  1918. switch runtime.GOOS {
  1919. case "android", "nacl", "plan9", "windows":
  1920. tb.Skipf("%s does not support unix sockets", runtime.GOOS)
  1921. }
  1922. if runtime.GOOS == "darwin" && (runtime.GOARCH == "arm" || runtime.GOARCH == "arm64") {
  1923. tb.Skip("iOS does not support unix, unixgram")
  1924. }
  1925. }
  1926. func TestHostClientGet(t *testing.T) {
  1927. t.Parallel()
  1928. skipIfNotUnix(t)
  1929. addr := "TestHostClientGet.unix"
  1930. s := startEchoServer(t, "unix", addr)
  1931. defer s.Stop()
  1932. c := createEchoClient(t, "unix", addr)
  1933. testHostClientGet(t, c, 100)
  1934. }
  1935. func TestHostClientPost(t *testing.T) {
  1936. t.Parallel()
  1937. skipIfNotUnix(t)
  1938. addr := "./TestHostClientPost.unix"
  1939. s := startEchoServer(t, "unix", addr)
  1940. defer s.Stop()
  1941. c := createEchoClient(t, "unix", addr)
  1942. testHostClientPost(t, c, 100)
  1943. }
  1944. func TestHostClientConcurrent(t *testing.T) {
  1945. t.Parallel()
  1946. skipIfNotUnix(t)
  1947. addr := "./TestHostClientConcurrent.unix"
  1948. s := startEchoServer(t, "unix", addr)
  1949. defer s.Stop()
  1950. c := createEchoClient(t, "unix", addr)
  1951. var wg sync.WaitGroup
  1952. for i := 0; i < 10; i++ {
  1953. wg.Add(1)
  1954. go func() {
  1955. defer wg.Done()
  1956. testHostClientGet(t, c, 30)
  1957. testHostClientPost(t, c, 10)
  1958. }()
  1959. }
  1960. wg.Wait()
  1961. }
  1962. func testClientGet(t *testing.T, c clientGetter, addr string, n int) {
  1963. var buf []byte
  1964. for i := 0; i < n; i++ {
  1965. uri := fmt.Sprintf("%s/foo/%d?bar=baz", addr, i)
  1966. statusCode, body, err := c.Get(buf, uri)
  1967. buf = body
  1968. if err != nil {
  1969. t.Fatalf("unexpected error when doing http request: %s", err)
  1970. }
  1971. if statusCode != StatusOK {
  1972. t.Fatalf("unexpected status code: %d. Expecting %d", statusCode, StatusOK)
  1973. }
  1974. resultURI := string(body)
  1975. if resultURI != uri {
  1976. t.Fatalf("unexpected uri %q. Expecting %q", resultURI, uri)
  1977. }
  1978. }
  1979. }
  1980. func testClientDoTimeoutSuccess(t *testing.T, c *Client, addr string, n int) {
  1981. var req Request
  1982. var resp Response
  1983. for i := 0; i < n; i++ {
  1984. uri := fmt.Sprintf("%s/foo/%d?bar=baz", addr, i)
  1985. req.SetRequestURI(uri)
  1986. if err := c.DoTimeout(&req, &resp, time.Second); err != nil {
  1987. t.Fatalf("unexpected error: %s", err)
  1988. }
  1989. if resp.StatusCode() != StatusOK {
  1990. t.Fatalf("unexpected status code: %d. Expecting %d", resp.StatusCode(), StatusOK)
  1991. }
  1992. resultURI := string(resp.Body())
  1993. if strings.HasPrefix(uri, "https") {
  1994. resultURI = uri[:5] + resultURI[4:]
  1995. }
  1996. if resultURI != uri {
  1997. t.Fatalf("unexpected uri %q. Expecting %q", resultURI, uri)
  1998. }
  1999. }
  2000. }
  2001. func testClientGetTimeoutSuccess(t *testing.T, c *Client, addr string, n int) {
  2002. var buf []byte
  2003. for i := 0; i < n; i++ {
  2004. uri := fmt.Sprintf("%s/foo/%d?bar=baz", addr, i)
  2005. statusCode, body, err := c.GetTimeout(buf, uri, time.Second)
  2006. buf = body
  2007. if err != nil {
  2008. t.Fatalf("unexpected error when doing http request: %s", err)
  2009. }
  2010. if statusCode != StatusOK {
  2011. t.Fatalf("unexpected status code: %d. Expecting %d", statusCode, StatusOK)
  2012. }
  2013. resultURI := string(body)
  2014. if strings.HasPrefix(uri, "https") {
  2015. resultURI = uri[:5] + resultURI[4:]
  2016. }
  2017. if resultURI != uri {
  2018. t.Fatalf("unexpected uri %q. Expecting %q", resultURI, uri)
  2019. }
  2020. }
  2021. }
  2022. func testClientPost(t *testing.T, c clientPoster, addr string, n int) {
  2023. var buf []byte
  2024. var args Args
  2025. for i := 0; i < n; i++ {
  2026. uri := fmt.Sprintf("%s/foo/%d?bar=baz", addr, i)
  2027. args.Set("xx", fmt.Sprintf("yy%d", i))
  2028. args.Set("zzz", fmt.Sprintf("qwe_%d", i))
  2029. argsS := args.String()
  2030. statusCode, body, err := c.Post(buf, uri, &args)
  2031. buf = body
  2032. if err != nil {
  2033. t.Fatalf("unexpected error when doing http request: %s", err)
  2034. }
  2035. if statusCode != StatusOK {
  2036. t.Fatalf("unexpected status code: %d. Expecting %d", statusCode, StatusOK)
  2037. }
  2038. s := string(body)
  2039. if s != argsS {
  2040. t.Fatalf("unexpected response %q. Expecting %q", s, argsS)
  2041. }
  2042. }
  2043. }
  2044. func testHostClientGet(t *testing.T, c *HostClient, n int) {
  2045. testClientGet(t, c, "http://google.com", n)
  2046. }
  2047. func testHostClientPost(t *testing.T, c *HostClient, n int) {
  2048. testClientPost(t, c, "http://post-host.com", n)
  2049. }
  2050. type clientPoster interface {
  2051. Post(dst []byte, uri string, postArgs *Args) (int, []byte, error)
  2052. }
  2053. type clientGetter interface {
  2054. Get(dst []byte, uri string) (int, []byte, error)
  2055. }
  2056. func createEchoClient(t *testing.T, network, addr string) *HostClient {
  2057. return &HostClient{
  2058. Addr: addr,
  2059. Dial: func(addr string) (net.Conn, error) {
  2060. return net.Dial(network, addr)
  2061. },
  2062. }
  2063. }
  2064. type testEchoServer struct {
  2065. s *Server
  2066. ln net.Listener
  2067. ch chan struct{}
  2068. t *testing.T
  2069. }
  2070. func (s *testEchoServer) Stop() {
  2071. s.ln.Close()
  2072. select {
  2073. case <-s.ch:
  2074. case <-time.After(time.Second):
  2075. s.t.Fatalf("timeout when waiting for server close")
  2076. }
  2077. }
  2078. func (s *testEchoServer) Addr() string {
  2079. return s.ln.Addr().String()
  2080. }
  2081. func startEchoServerTLS(t *testing.T, network, addr string) *testEchoServer {
  2082. return startEchoServerExt(t, network, addr, true)
  2083. }
  2084. func startEchoServer(t *testing.T, network, addr string) *testEchoServer {
  2085. return startEchoServerExt(t, network, addr, false)
  2086. }
  2087. func startEchoServerExt(t *testing.T, network, addr string, isTLS bool) *testEchoServer {
  2088. if network == "unix" {
  2089. os.Remove(addr)
  2090. }
  2091. var ln net.Listener
  2092. var err error
  2093. if isTLS {
  2094. certData, keyData, kerr := GenerateTestCertificate("localhost")
  2095. if kerr != nil {
  2096. t.Fatal(kerr)
  2097. }
  2098. cert, kerr := tls.X509KeyPair(certData, keyData)
  2099. if kerr != nil {
  2100. t.Fatal(kerr)
  2101. }
  2102. tlsConfig := &tls.Config{
  2103. Certificates: []tls.Certificate{cert},
  2104. }
  2105. ln, err = tls.Listen(network, addr, tlsConfig)
  2106. } else {
  2107. ln, err = net.Listen(network, addr)
  2108. }
  2109. if err != nil {
  2110. t.Fatalf("cannot listen %q: %s", addr, err)
  2111. }
  2112. s := &Server{
  2113. Handler: func(ctx *RequestCtx) {
  2114. if ctx.IsGet() {
  2115. ctx.Success("text/plain", ctx.URI().FullURI())
  2116. } else if ctx.IsPost() {
  2117. ctx.PostArgs().WriteTo(ctx) //nolint:errcheck
  2118. }
  2119. },
  2120. Logger: &testLogger{}, // Ignore log output.
  2121. }
  2122. ch := make(chan struct{})
  2123. go func() {
  2124. err := s.Serve(ln)
  2125. if err != nil {
  2126. t.Errorf("unexpected error returned from Serve(): %s", err)
  2127. }
  2128. close(ch)
  2129. }()
  2130. return &testEchoServer{
  2131. s: s,
  2132. ln: ln,
  2133. ch: ch,
  2134. t: t,
  2135. }
  2136. }
  2137. func TestClientTLSHandshakeTimeout(t *testing.T) {
  2138. t.Parallel()
  2139. listener, err := net.Listen("tcp", "127.0.0.1:0")
  2140. if err != nil {
  2141. t.Fatal(err)
  2142. }
  2143. addr := listener.Addr().String()
  2144. defer listener.Close()
  2145. complete := make(chan bool)
  2146. defer close(complete)
  2147. go func() {
  2148. conn, err := listener.Accept()
  2149. if err != nil {
  2150. t.Error(err)
  2151. return
  2152. }
  2153. <-complete
  2154. conn.Close()
  2155. }()
  2156. client := Client{
  2157. WriteTimeout: 100 * time.Millisecond,
  2158. ReadTimeout: 100 * time.Millisecond,
  2159. }
  2160. _, _, err = client.Get(nil, "https://"+addr)
  2161. if err == nil {
  2162. t.Fatal("tlsClientHandshake completed successfully")
  2163. }
  2164. if err != ErrTLSHandshakeTimeout {
  2165. t.Errorf("resulting error not a timeout: %v\nType %T: %#v", err, err, err)
  2166. }
  2167. }
  2168. func TestHostClientMaxConnWaitTimeoutSuccess(t *testing.T) {
  2169. t.Parallel()
  2170. var (
  2171. emptyBodyCount uint8
  2172. ln = fasthttputil.NewInmemoryListener()
  2173. wg sync.WaitGroup
  2174. )
  2175. s := &Server{
  2176. Handler: func(ctx *RequestCtx) {
  2177. if len(ctx.PostBody()) == 0 {
  2178. emptyBodyCount++
  2179. }
  2180. time.Sleep(5 * time.Millisecond)
  2181. ctx.WriteString("foo") //nolint:errcheck
  2182. },
  2183. }
  2184. serverStopCh := make(chan struct{})
  2185. go func() {
  2186. if err := s.Serve(ln); err != nil {
  2187. t.Errorf("unexpected error: %s", err)
  2188. }
  2189. close(serverStopCh)
  2190. }()
  2191. c := &HostClient{
  2192. Addr: "foobar",
  2193. Dial: func(addr string) (net.Conn, error) {
  2194. return ln.Dial()
  2195. },
  2196. MaxConns: 1,
  2197. MaxConnWaitTimeout: time.Second * 2,
  2198. }
  2199. for i := 0; i < 5; i++ {
  2200. wg.Add(1)
  2201. go func() {
  2202. defer wg.Done()
  2203. req := AcquireRequest()
  2204. req.SetRequestURI("http://foobar/baz")
  2205. req.Header.SetMethod(MethodPost)
  2206. req.SetBodyString("bar")
  2207. resp := AcquireResponse()
  2208. if err := c.Do(req, resp); err != nil {
  2209. t.Errorf("unexpected error: %s", err)
  2210. }
  2211. if resp.StatusCode() != StatusOK {
  2212. t.Errorf("unexpected status code %d. Expecting %d", resp.StatusCode(), StatusOK)
  2213. }
  2214. body := resp.Body()
  2215. if string(body) != "foo" {
  2216. t.Errorf("unexpected body %q. Expecting %q", body, "abcd")
  2217. }
  2218. }()
  2219. }
  2220. wg.Wait()
  2221. if c.connsWait.len() > 0 {
  2222. t.Errorf("connsWait has %v items remaining", c.connsWait.len())
  2223. }
  2224. if err := ln.Close(); err != nil {
  2225. t.Fatalf("unexpected error: %s", err)
  2226. }
  2227. select {
  2228. case <-serverStopCh:
  2229. case <-time.After(time.Second * 5):
  2230. t.Fatalf("timeout")
  2231. }
  2232. if emptyBodyCount > 0 {
  2233. t.Fatalf("at least one request body was empty")
  2234. }
  2235. }
  2236. func TestHostClientMaxConnWaitTimeoutError(t *testing.T) {
  2237. t.Parallel()
  2238. var (
  2239. emptyBodyCount uint8
  2240. ln = fasthttputil.NewInmemoryListener()
  2241. wg sync.WaitGroup
  2242. )
  2243. s := &Server{
  2244. Handler: func(ctx *RequestCtx) {
  2245. if len(ctx.PostBody()) == 0 {
  2246. emptyBodyCount++
  2247. }
  2248. time.Sleep(5 * time.Millisecond)
  2249. ctx.WriteString("foo") //nolint:errcheck
  2250. },
  2251. }
  2252. serverStopCh := make(chan struct{})
  2253. go func() {
  2254. if err := s.Serve(ln); err != nil {
  2255. t.Errorf("unexpected error: %s", err)
  2256. }
  2257. close(serverStopCh)
  2258. }()
  2259. c := &HostClient{
  2260. Addr: "foobar",
  2261. Dial: func(addr string) (net.Conn, error) {
  2262. return ln.Dial()
  2263. },
  2264. MaxConns: 1,
  2265. MaxConnWaitTimeout: 10 * time.Millisecond,
  2266. }
  2267. var errNoFreeConnsCount uint32
  2268. for i := 0; i < 5; i++ {
  2269. wg.Add(1)
  2270. go func() {
  2271. defer wg.Done()
  2272. req := AcquireRequest()
  2273. req.SetRequestURI("http://foobar/baz")
  2274. req.Header.SetMethod(MethodPost)
  2275. req.SetBodyString("bar")
  2276. resp := AcquireResponse()
  2277. if err := c.Do(req, resp); err != nil {
  2278. if err != ErrNoFreeConns {
  2279. t.Errorf("unexpected error: %s. Expecting %s", err, ErrNoFreeConns)
  2280. }
  2281. atomic.AddUint32(&errNoFreeConnsCount, 1)
  2282. } else {
  2283. if resp.StatusCode() != StatusOK {
  2284. t.Errorf("unexpected status code %d. Expecting %d", resp.StatusCode(), StatusOK)
  2285. }
  2286. body := resp.Body()
  2287. if string(body) != "foo" {
  2288. t.Errorf("unexpected body %q. Expecting %q", body, "abcd")
  2289. }
  2290. }
  2291. }()
  2292. }
  2293. wg.Wait()
  2294. // Prevent a race condition with the conns cleaner that might still be running.
  2295. c.connsLock.Lock()
  2296. defer c.connsLock.Unlock()
  2297. if c.connsWait.len() > 0 {
  2298. t.Errorf("connsWait has %v items remaining", c.connsWait.len())
  2299. }
  2300. if errNoFreeConnsCount == 0 {
  2301. t.Errorf("unexpected errorCount: %d. Expecting > 0", errNoFreeConnsCount)
  2302. }
  2303. if err := ln.Close(); err != nil {
  2304. t.Fatalf("unexpected error: %s", err)
  2305. }
  2306. select {
  2307. case <-serverStopCh:
  2308. case <-time.After(time.Second):
  2309. t.Fatalf("timeout")
  2310. }
  2311. if emptyBodyCount > 0 {
  2312. t.Fatalf("at least one request body was empty")
  2313. }
  2314. }
  2315. func TestHostClientMaxConnWaitTimeoutWithEarlierDeadline(t *testing.T) {
  2316. t.Parallel()
  2317. var (
  2318. emptyBodyCount uint8
  2319. ln = fasthttputil.NewInmemoryListener()
  2320. wg sync.WaitGroup
  2321. // make deadline reach earlier than conns wait timeout
  2322. sleep = 100 * time.Millisecond
  2323. timeout = 10 * time.Millisecond
  2324. maxConnWaitTimeout = 50 * time.Millisecond
  2325. )
  2326. s := &Server{
  2327. Handler: func(ctx *RequestCtx) {
  2328. if len(ctx.PostBody()) == 0 {
  2329. emptyBodyCount++
  2330. }
  2331. time.Sleep(sleep)
  2332. ctx.WriteString("foo") //nolint:errcheck
  2333. },
  2334. }
  2335. serverStopCh := make(chan struct{})
  2336. go func() {
  2337. if err := s.Serve(ln); err != nil {
  2338. t.Errorf("unexpected error: %s", err)
  2339. }
  2340. close(serverStopCh)
  2341. }()
  2342. c := &HostClient{
  2343. Addr: "foobar",
  2344. Dial: func(addr string) (net.Conn, error) {
  2345. return ln.Dial()
  2346. },
  2347. MaxConns: 1,
  2348. MaxConnWaitTimeout: maxConnWaitTimeout,
  2349. }
  2350. var errTimeoutCount uint32
  2351. for i := 0; i < 5; i++ {
  2352. wg.Add(1)
  2353. go func() {
  2354. defer wg.Done()
  2355. req := AcquireRequest()
  2356. req.SetRequestURI("http://foobar/baz")
  2357. req.Header.SetMethod(MethodPost)
  2358. req.SetBodyString("bar")
  2359. resp := AcquireResponse()
  2360. if err := c.DoDeadline(req, resp, time.Now().Add(timeout)); err != nil {
  2361. if err != ErrTimeout {
  2362. t.Errorf("unexpected error: %s. Expecting %s", err, ErrTimeout)
  2363. }
  2364. atomic.AddUint32(&errTimeoutCount, 1)
  2365. } else {
  2366. if resp.StatusCode() != StatusOK {
  2367. t.Errorf("unexpected status code %d. Expecting %d", resp.StatusCode(), StatusOK)
  2368. }
  2369. body := resp.Body()
  2370. if string(body) != "foo" {
  2371. t.Errorf("unexpected body %q. Expecting %q", body, "abcd")
  2372. }
  2373. }
  2374. }()
  2375. }
  2376. wg.Wait()
  2377. c.connsLock.Lock()
  2378. for {
  2379. w := c.connsWait.popFront()
  2380. if w == nil {
  2381. break
  2382. }
  2383. w.mu.Lock()
  2384. if w.err != nil && w.err != ErrTimeout {
  2385. t.Errorf("unexpected error: %s. Expecting %s", w.err, ErrTimeout)
  2386. }
  2387. w.mu.Unlock()
  2388. }
  2389. c.connsLock.Unlock()
  2390. if errTimeoutCount == 0 {
  2391. t.Errorf("unexpected errTimeoutCount: %d. Expecting > 0", errTimeoutCount)
  2392. }
  2393. if err := ln.Close(); err != nil {
  2394. t.Fatalf("unexpected error: %s", err)
  2395. }
  2396. select {
  2397. case <-serverStopCh:
  2398. case <-time.After(time.Second):
  2399. t.Fatalf("timeout")
  2400. }
  2401. if emptyBodyCount > 0 {
  2402. t.Fatalf("at least one request body was empty")
  2403. }
  2404. }