lbclient.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package fasthttp
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "time"
  6. )
  7. // BalancingClient is the interface for clients, which may be passed
  8. // to LBClient.Clients.
  9. type BalancingClient interface {
  10. DoDeadline(req *Request, resp *Response, deadline time.Time) error
  11. PendingRequests() int
  12. }
  13. // LBClient balances requests among available LBClient.Clients.
  14. //
  15. // It has the following features:
  16. //
  17. // - Balances load among available clients using 'least loaded' + 'least total'
  18. // hybrid technique.
  19. // - Dynamically decreases load on unhealthy clients.
  20. //
  21. // It is forbidden copying LBClient instances. Create new instances instead.
  22. //
  23. // It is safe calling LBClient methods from concurrently running goroutines.
  24. type LBClient struct {
  25. noCopy noCopy //nolint:unused,structcheck
  26. // Clients must contain non-zero clients list.
  27. // Incoming requests are balanced among these clients.
  28. Clients []BalancingClient
  29. // HealthCheck is a callback called after each request.
  30. //
  31. // The request, response and the error returned by the client
  32. // is passed to HealthCheck, so the callback may determine whether
  33. // the client is healthy.
  34. //
  35. // Load on the current client is decreased if HealthCheck returns false.
  36. //
  37. // By default HealthCheck returns false if err != nil.
  38. HealthCheck func(req *Request, resp *Response, err error) bool
  39. // Timeout is the request timeout used when calling LBClient.Do.
  40. //
  41. // DefaultLBClientTimeout is used by default.
  42. Timeout time.Duration
  43. cs []*lbClient
  44. once sync.Once
  45. }
  46. // DefaultLBClientTimeout is the default request timeout used by LBClient
  47. // when calling LBClient.Do.
  48. //
  49. // The timeout may be overridden via LBClient.Timeout.
  50. const DefaultLBClientTimeout = time.Second
  51. // DoDeadline calls DoDeadline on the least loaded client
  52. func (cc *LBClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  53. return cc.get().DoDeadline(req, resp, deadline)
  54. }
  55. // DoTimeout calculates deadline and calls DoDeadline on the least loaded client
  56. func (cc *LBClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  57. deadline := time.Now().Add(timeout)
  58. return cc.get().DoDeadline(req, resp, deadline)
  59. }
  60. // Do calls calculates deadline using LBClient.Timeout and calls DoDeadline
  61. // on the least loaded client.
  62. func (cc *LBClient) Do(req *Request, resp *Response) error {
  63. timeout := cc.Timeout
  64. if timeout <= 0 {
  65. timeout = DefaultLBClientTimeout
  66. }
  67. return cc.DoTimeout(req, resp, timeout)
  68. }
  69. func (cc *LBClient) init() {
  70. if len(cc.Clients) == 0 {
  71. panic("BUG: LBClient.Clients cannot be empty")
  72. }
  73. for _, c := range cc.Clients {
  74. cc.cs = append(cc.cs, &lbClient{
  75. c: c,
  76. healthCheck: cc.HealthCheck,
  77. })
  78. }
  79. }
  80. func (cc *LBClient) get() *lbClient {
  81. cc.once.Do(cc.init)
  82. cs := cc.cs
  83. minC := cs[0]
  84. minN := minC.PendingRequests()
  85. minT := atomic.LoadUint64(&minC.total)
  86. for _, c := range cs[1:] {
  87. n := c.PendingRequests()
  88. t := atomic.LoadUint64(&c.total)
  89. if n < minN || (n == minN && t < minT) {
  90. minC = c
  91. minN = n
  92. minT = t
  93. }
  94. }
  95. return minC
  96. }
  97. type lbClient struct {
  98. c BalancingClient
  99. healthCheck func(req *Request, resp *Response, err error) bool
  100. penalty uint32
  101. // total amount of requests handled.
  102. total uint64
  103. }
  104. func (c *lbClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  105. err := c.c.DoDeadline(req, resp, deadline)
  106. if !c.isHealthy(req, resp, err) && c.incPenalty() {
  107. // Penalize the client returning error, so the next requests
  108. // are routed to another clients.
  109. time.AfterFunc(penaltyDuration, c.decPenalty)
  110. } else {
  111. atomic.AddUint64(&c.total, 1)
  112. }
  113. return err
  114. }
  115. func (c *lbClient) PendingRequests() int {
  116. n := c.c.PendingRequests()
  117. m := atomic.LoadUint32(&c.penalty)
  118. return n + int(m)
  119. }
  120. func (c *lbClient) isHealthy(req *Request, resp *Response, err error) bool {
  121. if c.healthCheck == nil {
  122. return err == nil
  123. }
  124. return c.healthCheck(req, resp, err)
  125. }
  126. func (c *lbClient) incPenalty() bool {
  127. m := atomic.AddUint32(&c.penalty, 1)
  128. if m > maxPenalty {
  129. c.decPenalty()
  130. return false
  131. }
  132. return true
  133. }
  134. func (c *lbClient) decPenalty() {
  135. atomic.AddUint32(&c.penalty, ^uint32(0))
  136. }
  137. const (
  138. maxPenalty = 300
  139. penaltyDuration = 3 * time.Second
  140. )