workerpool.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. package fasthttp
  2. import (
  3. "net"
  4. "runtime"
  5. "strings"
  6. "sync"
  7. "time"
  8. )
  9. // workerPool serves incoming connections via a pool of workers
  10. // in FILO order, i.e. the most recently stopped worker will serve the next
  11. // incoming connection.
  12. //
  13. // Such a scheme keeps CPU caches hot (in theory).
  14. type workerPool struct {
  15. // Function for serving server connections.
  16. // It must leave c unclosed.
  17. WorkerFunc ServeHandler
  18. MaxWorkersCount int
  19. LogAllErrors bool
  20. MaxIdleWorkerDuration time.Duration
  21. Logger Logger
  22. lock sync.Mutex
  23. workersCount int
  24. mustStop bool
  25. ready []*workerChan
  26. stopCh chan struct{}
  27. workerChanPool sync.Pool
  28. connState func(net.Conn, ConnState)
  29. }
  30. type workerChan struct {
  31. lastUseTime time.Time
  32. ch chan net.Conn
  33. }
  34. func (wp *workerPool) Start() {
  35. if wp.stopCh != nil {
  36. panic("BUG: workerPool already started")
  37. }
  38. wp.stopCh = make(chan struct{})
  39. stopCh := wp.stopCh
  40. wp.workerChanPool.New = func() interface{} {
  41. return &workerChan{
  42. ch: make(chan net.Conn, workerChanCap),
  43. }
  44. }
  45. go func() {
  46. var scratch []*workerChan
  47. for {
  48. wp.clean(&scratch)
  49. select {
  50. case <-stopCh:
  51. return
  52. default:
  53. time.Sleep(wp.getMaxIdleWorkerDuration())
  54. }
  55. }
  56. }()
  57. }
  58. func (wp *workerPool) Stop() {
  59. if wp.stopCh == nil {
  60. panic("BUG: workerPool wasn't started")
  61. }
  62. close(wp.stopCh)
  63. wp.stopCh = nil
  64. // Stop all the workers waiting for incoming connections.
  65. // Do not wait for busy workers - they will stop after
  66. // serving the connection and noticing wp.mustStop = true.
  67. wp.lock.Lock()
  68. ready := wp.ready
  69. for i := range ready {
  70. ready[i].ch <- nil
  71. ready[i] = nil
  72. }
  73. wp.ready = ready[:0]
  74. wp.mustStop = true
  75. wp.lock.Unlock()
  76. }
  77. func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
  78. if wp.MaxIdleWorkerDuration <= 0 {
  79. return 10 * time.Second
  80. }
  81. return wp.MaxIdleWorkerDuration
  82. }
  83. func (wp *workerPool) clean(scratch *[]*workerChan) {
  84. maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()
  85. // Clean least recently used workers if they didn't serve connections
  86. // for more than maxIdleWorkerDuration.
  87. criticalTime := time.Now().Add(-maxIdleWorkerDuration)
  88. wp.lock.Lock()
  89. ready := wp.ready
  90. n := len(ready)
  91. // Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up.
  92. l, r, mid := 0, n-1, 0
  93. for l <= r {
  94. mid = (l + r) / 2
  95. if criticalTime.After(wp.ready[mid].lastUseTime) {
  96. l = mid + 1
  97. } else {
  98. r = mid - 1
  99. }
  100. }
  101. i := r
  102. if i == -1 {
  103. wp.lock.Unlock()
  104. return
  105. }
  106. *scratch = append((*scratch)[:0], ready[:i+1]...)
  107. m := copy(ready, ready[i+1:])
  108. for i = m; i < n; i++ {
  109. ready[i] = nil
  110. }
  111. wp.ready = ready[:m]
  112. wp.lock.Unlock()
  113. // Notify obsolete workers to stop.
  114. // This notification must be outside the wp.lock, since ch.ch
  115. // may be blocking and may consume a lot of time if many workers
  116. // are located on non-local CPUs.
  117. tmp := *scratch
  118. for i := range tmp {
  119. tmp[i].ch <- nil
  120. tmp[i] = nil
  121. }
  122. }
  123. func (wp *workerPool) Serve(c net.Conn) bool {
  124. ch := wp.getCh()
  125. if ch == nil {
  126. return false
  127. }
  128. ch.ch <- c
  129. return true
  130. }
  131. var workerChanCap = func() int {
  132. // Use blocking workerChan if GOMAXPROCS=1.
  133. // This immediately switches Serve to WorkerFunc, which results
  134. // in higher performance (under go1.5 at least).
  135. if runtime.GOMAXPROCS(0) == 1 {
  136. return 0
  137. }
  138. // Use non-blocking workerChan if GOMAXPROCS>1,
  139. // since otherwise the Serve caller (Acceptor) may lag accepting
  140. // new connections if WorkerFunc is CPU-bound.
  141. return 1
  142. }()
  143. func (wp *workerPool) getCh() *workerChan {
  144. var ch *workerChan
  145. createWorker := false
  146. wp.lock.Lock()
  147. ready := wp.ready
  148. n := len(ready) - 1
  149. if n < 0 {
  150. if wp.workersCount < wp.MaxWorkersCount {
  151. createWorker = true
  152. wp.workersCount++
  153. }
  154. } else {
  155. ch = ready[n]
  156. ready[n] = nil
  157. wp.ready = ready[:n]
  158. }
  159. wp.lock.Unlock()
  160. if ch == nil {
  161. if !createWorker {
  162. return nil
  163. }
  164. vch := wp.workerChanPool.Get()
  165. ch = vch.(*workerChan)
  166. go func() {
  167. wp.workerFunc(ch)
  168. wp.workerChanPool.Put(vch)
  169. }()
  170. }
  171. return ch
  172. }
  173. func (wp *workerPool) release(ch *workerChan) bool {
  174. ch.lastUseTime = time.Now()
  175. wp.lock.Lock()
  176. if wp.mustStop {
  177. wp.lock.Unlock()
  178. return false
  179. }
  180. wp.ready = append(wp.ready, ch)
  181. wp.lock.Unlock()
  182. return true
  183. }
  184. func (wp *workerPool) workerFunc(ch *workerChan) {
  185. var c net.Conn
  186. var err error
  187. for c = range ch.ch {
  188. if c == nil {
  189. break
  190. }
  191. if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
  192. errStr := err.Error()
  193. if wp.LogAllErrors || !(strings.Contains(errStr, "broken pipe") ||
  194. strings.Contains(errStr, "reset by peer") ||
  195. strings.Contains(errStr, "request headers: small read buffer") ||
  196. strings.Contains(errStr, "unexpected EOF") ||
  197. strings.Contains(errStr, "i/o timeout")) {
  198. wp.Logger.Printf("error when serving connection %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err)
  199. }
  200. }
  201. if err == errHijacked {
  202. wp.connState(c, StateHijacked)
  203. } else {
  204. _ = c.Close()
  205. wp.connState(c, StateClosed)
  206. }
  207. c = nil
  208. if !wp.release(ch) {
  209. break
  210. }
  211. }
  212. wp.lock.Lock()
  213. wp.workersCount--
  214. wp.lock.Unlock()
  215. }