tcpdialer.go 13 KB


  1. package fasthttp
  2. import (
  3. "context"
  4. "errors"
  5. "net"
  6. "strconv"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. )
  11. // Dial dials the given TCP addr using tcp4.
  12. //
  13. // This function has the following additional features comparing to net.Dial:
  14. //
  15. // * It reduces load on DNS resolver by caching resolved TCP addressed
  16. // for DNSCacheDuration.
  17. // * It dials all the resolved TCP addresses in round-robin manner until
  18. // connection is established. This may be useful if certain addresses
  19. // are temporarily unreachable.
  20. // * It returns ErrDialTimeout if connection cannot be established during
  21. // DefaultDialTimeout seconds. Use DialTimeout for customizing dial timeout.
  22. //
  23. // This dialer is intended for custom code wrapping before passing
  24. // to Client.Dial or HostClient.Dial.
  25. //
  26. // For instance, per-host counters and/or limits may be implemented
  27. // by such wrappers.
  28. //
  29. // The addr passed to the function must contain port. Example addr values:
  30. //
  31. // * foobar.baz:443
  32. // * foo.bar:80
  33. // * aaa.com:8080
  34. func Dial(addr string) (net.Conn, error) {
  35. return defaultDialer.Dial(addr)
  36. }
  37. // DialTimeout dials the given TCP addr using tcp4 using the given timeout.
  38. //
  39. // This function has the following additional features comparing to net.Dial:
  40. //
  41. // * It reduces load on DNS resolver by caching resolved TCP addressed
  42. // for DNSCacheDuration.
  43. // * It dials all the resolved TCP addresses in round-robin manner until
  44. // connection is established. This may be useful if certain addresses
  45. // are temporarily unreachable.
  46. //
  47. // This dialer is intended for custom code wrapping before passing
  48. // to Client.Dial or HostClient.Dial.
  49. //
  50. // For instance, per-host counters and/or limits may be implemented
  51. // by such wrappers.
  52. //
  53. // The addr passed to the function must contain port. Example addr values:
  54. //
  55. // * foobar.baz:443
  56. // * foo.bar:80
  57. // * aaa.com:8080
  58. func DialTimeout(addr string, timeout time.Duration) (net.Conn, error) {
  59. return defaultDialer.DialTimeout(addr, timeout)
  60. }
  61. // DialDualStack dials the given TCP addr using both tcp4 and tcp6.
  62. //
  63. // This function has the following additional features comparing to net.Dial:
  64. //
  65. // * It reduces load on DNS resolver by caching resolved TCP addressed
  66. // for DNSCacheDuration.
  67. // * It dials all the resolved TCP addresses in round-robin manner until
  68. // connection is established. This may be useful if certain addresses
  69. // are temporarily unreachable.
  70. // * It returns ErrDialTimeout if connection cannot be established during
  71. // DefaultDialTimeout seconds. Use DialDualStackTimeout for custom dial
  72. // timeout.
  73. //
  74. // This dialer is intended for custom code wrapping before passing
  75. // to Client.Dial or HostClient.Dial.
  76. //
  77. // For instance, per-host counters and/or limits may be implemented
  78. // by such wrappers.
  79. //
  80. // The addr passed to the function must contain port. Example addr values:
  81. //
  82. // * foobar.baz:443
  83. // * foo.bar:80
  84. // * aaa.com:8080
  85. func DialDualStack(addr string) (net.Conn, error) {
  86. return defaultDialer.DialDualStack(addr)
  87. }
  88. // DialDualStackTimeout dials the given TCP addr using both tcp4 and tcp6
  89. // using the given timeout.
  90. //
  91. // This function has the following additional features comparing to net.Dial:
  92. //
  93. // * It reduces load on DNS resolver by caching resolved TCP addressed
  94. // for DNSCacheDuration.
  95. // * It dials all the resolved TCP addresses in round-robin manner until
  96. // connection is established. This may be useful if certain addresses
  97. // are temporarily unreachable.
  98. //
  99. // This dialer is intended for custom code wrapping before passing
  100. // to Client.Dial or HostClient.Dial.
  101. //
  102. // For instance, per-host counters and/or limits may be implemented
  103. // by such wrappers.
  104. //
  105. // The addr passed to the function must contain port. Example addr values:
  106. //
  107. // * foobar.baz:443
  108. // * foo.bar:80
  109. // * aaa.com:8080
  110. func DialDualStackTimeout(addr string, timeout time.Duration) (net.Conn, error) {
  111. return defaultDialer.DialDualStackTimeout(addr, timeout)
  112. }
  113. var (
  114. defaultDialer = &TCPDialer{Concurrency: 1000}
  115. )
  116. // Resolver represents interface of the tcp resolver.
  117. type Resolver interface {
  118. LookupIPAddr(context.Context, string) (names []net.IPAddr, err error)
  119. }
  120. // TCPDialer contains options to control a group of Dial calls.
  121. type TCPDialer struct {
  122. // Concurrency controls the maximum number of concurrent Dials
  123. // that can be performed using this object.
  124. // Setting this to 0 means unlimited.
  125. //
  126. // WARNING: This can only be changed before the first Dial.
  127. // Changes made after the first Dial will not affect anything.
  128. Concurrency int
  129. // LocalAddr is the local address to use when dialing an
  130. // address.
  131. // If nil, a local address is automatically chosen.
  132. LocalAddr *net.TCPAddr
  133. // This may be used to override DNS resolving policy, like this:
  134. // var dialer = &fasthttp.TCPDialer{
  135. // Resolver: &net.Resolver{
  136. // PreferGo: true,
  137. // StrictErrors: false,
  138. // Dial: func (ctx context.Context, network, address string) (net.Conn, error) {
  139. // d := net.Dialer{}
  140. // return d.DialContext(ctx, "udp", "8.8.8.8:53")
  141. // },
  142. // },
  143. // }
  144. Resolver Resolver
  145. // DNSCacheDuration may be used to override the default DNS cache duration (DefaultDNSCacheDuration)
  146. DNSCacheDuration time.Duration
  147. tcpAddrsMap sync.Map
  148. concurrencyCh chan struct{}
  149. once sync.Once
  150. }
  151. // Dial dials the given TCP addr using tcp4.
  152. //
  153. // This function has the following additional features comparing to net.Dial:
  154. //
  155. // * It reduces load on DNS resolver by caching resolved TCP addressed
  156. // for DNSCacheDuration.
  157. // * It dials all the resolved TCP addresses in round-robin manner until
  158. // connection is established. This may be useful if certain addresses
  159. // are temporarily unreachable.
  160. // * It returns ErrDialTimeout if connection cannot be established during
  161. // DefaultDialTimeout seconds. Use DialTimeout for customizing dial timeout.
  162. //
  163. // This dialer is intended for custom code wrapping before passing
  164. // to Client.Dial or HostClient.Dial.
  165. //
  166. // For instance, per-host counters and/or limits may be implemented
  167. // by such wrappers.
  168. //
  169. // The addr passed to the function must contain port. Example addr values:
  170. //
  171. // * foobar.baz:443
  172. // * foo.bar:80
  173. // * aaa.com:8080
  174. func (d *TCPDialer) Dial(addr string) (net.Conn, error) {
  175. return d.dial(addr, false, DefaultDialTimeout)
  176. }
  177. // DialTimeout dials the given TCP addr using tcp4 using the given timeout.
  178. //
  179. // This function has the following additional features comparing to net.Dial:
  180. //
  181. // * It reduces load on DNS resolver by caching resolved TCP addressed
  182. // for DNSCacheDuration.
  183. // * It dials all the resolved TCP addresses in round-robin manner until
  184. // connection is established. This may be useful if certain addresses
  185. // are temporarily unreachable.
  186. //
  187. // This dialer is intended for custom code wrapping before passing
  188. // to Client.Dial or HostClient.Dial.
  189. //
  190. // For instance, per-host counters and/or limits may be implemented
  191. // by such wrappers.
  192. //
  193. // The addr passed to the function must contain port. Example addr values:
  194. //
  195. // * foobar.baz:443
  196. // * foo.bar:80
  197. // * aaa.com:8080
  198. func (d *TCPDialer) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) {
  199. return d.dial(addr, false, timeout)
  200. }
  201. // DialDualStack dials the given TCP addr using both tcp4 and tcp6.
  202. //
  203. // This function has the following additional features comparing to net.Dial:
  204. //
  205. // * It reduces load on DNS resolver by caching resolved TCP addressed
  206. // for DNSCacheDuration.
  207. // * It dials all the resolved TCP addresses in round-robin manner until
  208. // connection is established. This may be useful if certain addresses
  209. // are temporarily unreachable.
  210. // * It returns ErrDialTimeout if connection cannot be established during
  211. // DefaultDialTimeout seconds. Use DialDualStackTimeout for custom dial
  212. // timeout.
  213. //
  214. // This dialer is intended for custom code wrapping before passing
  215. // to Client.Dial or HostClient.Dial.
  216. //
  217. // For instance, per-host counters and/or limits may be implemented
  218. // by such wrappers.
  219. //
  220. // The addr passed to the function must contain port. Example addr values:
  221. //
  222. // * foobar.baz:443
  223. // * foo.bar:80
  224. // * aaa.com:8080
  225. func (d *TCPDialer) DialDualStack(addr string) (net.Conn, error) {
  226. return d.dial(addr, true, DefaultDialTimeout)
  227. }
  228. // DialDualStackTimeout dials the given TCP addr using both tcp4 and tcp6
  229. // using the given timeout.
  230. //
  231. // This function has the following additional features comparing to net.Dial:
  232. //
  233. // * It reduces load on DNS resolver by caching resolved TCP addressed
  234. // for DNSCacheDuration.
  235. // * It dials all the resolved TCP addresses in round-robin manner until
  236. // connection is established. This may be useful if certain addresses
  237. // are temporarily unreachable.
  238. //
  239. // This dialer is intended for custom code wrapping before passing
  240. // to Client.Dial or HostClient.Dial.
  241. //
  242. // For instance, per-host counters and/or limits may be implemented
  243. // by such wrappers.
  244. //
  245. // The addr passed to the function must contain port. Example addr values:
  246. //
  247. // * foobar.baz:443
  248. // * foo.bar:80
  249. // * aaa.com:8080
  250. func (d *TCPDialer) DialDualStackTimeout(addr string, timeout time.Duration) (net.Conn, error) {
  251. return d.dial(addr, true, timeout)
  252. }
  253. func (d *TCPDialer) dial(addr string, dualStack bool, timeout time.Duration) (net.Conn, error) {
  254. d.once.Do(func() {
  255. if d.Concurrency > 0 {
  256. d.concurrencyCh = make(chan struct{}, d.Concurrency)
  257. }
  258. if d.DNSCacheDuration == 0 {
  259. d.DNSCacheDuration = DefaultDNSCacheDuration
  260. }
  261. go d.tcpAddrsClean()
  262. })
  263. addrs, idx, err := d.getTCPAddrs(addr, dualStack)
  264. if err != nil {
  265. return nil, err
  266. }
  267. network := "tcp4"
  268. if dualStack {
  269. network = "tcp"
  270. }
  271. var conn net.Conn
  272. n := uint32(len(addrs))
  273. deadline := time.Now().Add(timeout)
  274. for n > 0 {
  275. conn, err = d.tryDial(network, &addrs[idx%n], deadline, d.concurrencyCh)
  276. if err == nil {
  277. return conn, nil
  278. }
  279. if err == ErrDialTimeout {
  280. return nil, err
  281. }
  282. idx++
  283. n--
  284. }
  285. return nil, err
  286. }
  287. func (d *TCPDialer) tryDial(network string, addr *net.TCPAddr, deadline time.Time, concurrencyCh chan struct{}) (net.Conn, error) {
  288. timeout := -time.Since(deadline)
  289. if timeout <= 0 {
  290. return nil, ErrDialTimeout
  291. }
  292. if concurrencyCh != nil {
  293. select {
  294. case concurrencyCh <- struct{}{}:
  295. default:
  296. tc := AcquireTimer(timeout)
  297. isTimeout := false
  298. select {
  299. case concurrencyCh <- struct{}{}:
  300. case <-tc.C:
  301. isTimeout = true
  302. }
  303. ReleaseTimer(tc)
  304. if isTimeout {
  305. return nil, ErrDialTimeout
  306. }
  307. }
  308. defer func() { <-concurrencyCh }()
  309. }
  310. dialer := net.Dialer{}
  311. if d.LocalAddr != nil {
  312. dialer.LocalAddr = d.LocalAddr
  313. }
  314. ctx, cancel_ctx := context.WithDeadline(context.Background(), deadline)
  315. defer cancel_ctx()
  316. conn, err := dialer.DialContext(ctx, network, addr.String())
  317. if err != nil && ctx.Err() == context.DeadlineExceeded {
  318. return nil, ErrDialTimeout
  319. }
  320. return conn, err
  321. }
  322. // ErrDialTimeout is returned when TCP dialing is timed out.
  323. var ErrDialTimeout = errors.New("dialing to the given TCP address timed out")
  324. // DefaultDialTimeout is timeout used by Dial and DialDualStack
  325. // for establishing TCP connections.
  326. const DefaultDialTimeout = 3 * time.Second
  327. type tcpAddrEntry struct {
  328. addrs []net.TCPAddr
  329. addrsIdx uint32
  330. pending int32
  331. resolveTime time.Time
  332. }
  333. // DefaultDNSCacheDuration is the duration for caching resolved TCP addresses
  334. // by Dial* functions.
  335. const DefaultDNSCacheDuration = time.Minute
  336. func (d *TCPDialer) tcpAddrsClean() {
  337. expireDuration := 2 * d.DNSCacheDuration
  338. for {
  339. time.Sleep(time.Second)
  340. t := time.Now()
  341. d.tcpAddrsMap.Range(func(k, v interface{}) bool {
  342. if e, ok := v.(*tcpAddrEntry); ok && t.Sub(e.resolveTime) > expireDuration {
  343. d.tcpAddrsMap.Delete(k)
  344. }
  345. return true
  346. })
  347. }
  348. }
  349. func (d *TCPDialer) getTCPAddrs(addr string, dualStack bool) ([]net.TCPAddr, uint32, error) {
  350. item, exist := d.tcpAddrsMap.Load(addr)
  351. e, ok := item.(*tcpAddrEntry)
  352. if exist && ok && e != nil && time.Since(e.resolveTime) > d.DNSCacheDuration {
  353. // Only let one goroutine re-resolve at a time.
  354. if atomic.SwapInt32(&e.pending, 1) == 0 {
  355. e = nil
  356. }
  357. }
  358. if e == nil {
  359. addrs, err := resolveTCPAddrs(addr, dualStack, d.Resolver)
  360. if err != nil {
  361. item, exist := d.tcpAddrsMap.Load(addr)
  362. e, ok = item.(*tcpAddrEntry)
  363. if exist && ok && e != nil {
  364. // Set pending to 0 so another goroutine can retry.
  365. atomic.StoreInt32(&e.pending, 0)
  366. }
  367. return nil, 0, err
  368. }
  369. e = &tcpAddrEntry{
  370. addrs: addrs,
  371. resolveTime: time.Now(),
  372. }
  373. d.tcpAddrsMap.Store(addr, e)
  374. }
  375. idx := atomic.AddUint32(&e.addrsIdx, 1)
  376. return e.addrs, idx, nil
  377. }
  378. func resolveTCPAddrs(addr string, dualStack bool, resolver Resolver) ([]net.TCPAddr, error) {
  379. host, portS, err := net.SplitHostPort(addr)
  380. if err != nil {
  381. return nil, err
  382. }
  383. port, err := strconv.Atoi(portS)
  384. if err != nil {
  385. return nil, err
  386. }
  387. if resolver == nil {
  388. resolver = net.DefaultResolver
  389. }
  390. ctx := context.Background()
  391. ipaddrs, err := resolver.LookupIPAddr(ctx, host)
  392. if err != nil {
  393. return nil, err
  394. }
  395. n := len(ipaddrs)
  396. addrs := make([]net.TCPAddr, 0, n)
  397. for i := 0; i < n; i++ {
  398. ip := ipaddrs[i]
  399. if !dualStack && ip.IP.To4() == nil {
  400. continue
  401. }
  402. addrs = append(addrs, net.TCPAddr{
  403. IP: ip.IP,
  404. Port: port,
  405. Zone: ip.Zone,
  406. })
  407. }
  408. if len(addrs) == 0 {
  409. return nil, errNoDNSEntries
  410. }
  411. return addrs, nil
  412. }
  413. var errNoDNSEntries = errors.New("couldn't find DNS entries for the given domain. Try using DialDualStack")