1 | // go:build !windows || !race
|
---|
2 |
|
---|
3 | package fasthttp
|
---|
4 |
|
---|
5 | import (
|
---|
6 | "bufio"
|
---|
7 | "crypto/tls"
|
---|
8 | "errors"
|
---|
9 | "fmt"
|
---|
10 | "io"
|
---|
11 | "net"
|
---|
12 | "strconv"
|
---|
13 | "strings"
|
---|
14 | "sync"
|
---|
15 | "sync/atomic"
|
---|
16 | "time"
|
---|
17 | )
|
---|
18 |
|
---|
19 | // Do performs the given http request and fills the given http response.
|
---|
20 | //
|
---|
21 | // Request must contain at least non-zero RequestURI with full url (including
|
---|
22 | // scheme and host) or non-zero Host header + RequestURI.
|
---|
23 | //
|
---|
24 | // Client determines the server to be requested in the following order:
|
---|
25 | //
|
---|
26 | // - from RequestURI if it contains full url with scheme and host;
|
---|
27 | // - from Host header otherwise.
|
---|
28 | //
|
---|
29 | // The function doesn't follow redirects. Use Get* for following redirects.
|
---|
30 | //
|
---|
31 | // Response is ignored if resp is nil.
|
---|
32 | //
|
---|
33 | // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
|
---|
34 | // to the requested host are busy.
|
---|
35 | //
|
---|
36 | // It is recommended obtaining req and resp via AcquireRequest
|
---|
37 | // and AcquireResponse in performance-critical code.
|
---|
38 | func Do(req *Request, resp *Response) error {
|
---|
39 | return defaultClient.Do(req, resp)
|
---|
40 | }
|
---|
41 |
|
---|
42 | // DoTimeout performs the given request and waits for response during
|
---|
43 | // the given timeout duration.
|
---|
44 | //
|
---|
45 | // Request must contain at least non-zero RequestURI with full url (including
|
---|
46 | // scheme and host) or non-zero Host header + RequestURI.
|
---|
47 | //
|
---|
48 | // Client determines the server to be requested in the following order:
|
---|
49 | //
|
---|
50 | // - from RequestURI if it contains full url with scheme and host;
|
---|
51 | // - from Host header otherwise.
|
---|
52 | //
|
---|
53 | // The function doesn't follow redirects. Use Get* for following redirects.
|
---|
54 | //
|
---|
55 | // Response is ignored if resp is nil.
|
---|
56 | //
|
---|
57 | // ErrTimeout is returned if the response wasn't returned during
|
---|
58 | // the given timeout.
|
---|
59 | //
|
---|
60 | // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
|
---|
61 | // to the requested host are busy.
|
---|
62 | //
|
---|
63 | // It is recommended obtaining req and resp via AcquireRequest
|
---|
64 | // and AcquireResponse in performance-critical code.
|
---|
65 | //
|
---|
66 | // Warning: DoTimeout does not terminate the request itself. The request will
|
---|
67 | // continue in the background and the response will be discarded.
|
---|
68 | // If requests take too long and the connection pool gets filled up please
|
---|
69 | // try using a Client and setting a ReadTimeout.
|
---|
70 | func DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
|
---|
71 | return defaultClient.DoTimeout(req, resp, timeout)
|
---|
72 | }
|
---|
73 |
|
---|
74 | // DoDeadline performs the given request and waits for response until
|
---|
75 | // the given deadline.
|
---|
76 | //
|
---|
77 | // Request must contain at least non-zero RequestURI with full url (including
|
---|
78 | // scheme and host) or non-zero Host header + RequestURI.
|
---|
79 | //
|
---|
80 | // Client determines the server to be requested in the following order:
|
---|
81 | //
|
---|
82 | // - from RequestURI if it contains full url with scheme and host;
|
---|
83 | // - from Host header otherwise.
|
---|
84 | //
|
---|
85 | // The function doesn't follow redirects. Use Get* for following redirects.
|
---|
86 | //
|
---|
87 | // Response is ignored if resp is nil.
|
---|
88 | //
|
---|
89 | // ErrTimeout is returned if the response wasn't returned until
|
---|
90 | // the given deadline.
|
---|
91 | //
|
---|
92 | // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
|
---|
93 | // to the requested host are busy.
|
---|
94 | //
|
---|
95 | // It is recommended obtaining req and resp via AcquireRequest
|
---|
96 | // and AcquireResponse in performance-critical code.
|
---|
97 | func DoDeadline(req *Request, resp *Response, deadline time.Time) error {
|
---|
98 | return defaultClient.DoDeadline(req, resp, deadline)
|
---|
99 | }
|
---|
100 |
|
---|
101 | // DoRedirects performs the given http request and fills the given http response,
|
---|
102 | // following up to maxRedirectsCount redirects. When the redirect count exceeds
|
---|
103 | // maxRedirectsCount, ErrTooManyRedirects is returned.
|
---|
104 | //
|
---|
105 | // Request must contain at least non-zero RequestURI with full url (including
|
---|
106 | // scheme and host) or non-zero Host header + RequestURI.
|
---|
107 | //
|
---|
108 | // Client determines the server to be requested in the following order:
|
---|
109 | //
|
---|
110 | // - from RequestURI if it contains full url with scheme and host;
|
---|
111 | // - from Host header otherwise.
|
---|
112 | //
|
---|
113 | // Response is ignored if resp is nil.
|
---|
114 | //
|
---|
115 | // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
|
---|
116 | // to the requested host are busy.
|
---|
117 | //
|
---|
118 | // It is recommended obtaining req and resp via AcquireRequest
|
---|
119 | // and AcquireResponse in performance-critical code.
|
---|
120 | func DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
|
---|
121 | _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, &defaultClient)
|
---|
122 | return err
|
---|
123 | }
|
---|
124 |
|
---|
125 | // Get returns the status code and body of url.
|
---|
126 | //
|
---|
127 | // The contents of dst will be replaced by the body and returned, if the dst
|
---|
128 | // is too small a new slice will be allocated.
|
---|
129 | //
|
---|
130 | // The function follows redirects. Use Do* for manually handling redirects.
|
---|
131 | func Get(dst []byte, url string) (statusCode int, body []byte, err error) {
|
---|
132 | return defaultClient.Get(dst, url)
|
---|
133 | }
|
---|
134 |
|
---|
135 | // GetTimeout returns the status code and body of url.
|
---|
136 | //
|
---|
137 | // The contents of dst will be replaced by the body and returned, if the dst
|
---|
138 | // is too small a new slice will be allocated.
|
---|
139 | //
|
---|
140 | // The function follows redirects. Use Do* for manually handling redirects.
|
---|
141 | //
|
---|
142 | // ErrTimeout error is returned if url contents couldn't be fetched
|
---|
143 | // during the given timeout.
|
---|
144 | func GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
|
---|
145 | return defaultClient.GetTimeout(dst, url, timeout)
|
---|
146 | }
|
---|
147 |
|
---|
148 | // GetDeadline returns the status code and body of url.
|
---|
149 | //
|
---|
150 | // The contents of dst will be replaced by the body and returned, if the dst
|
---|
151 | // is too small a new slice will be allocated.
|
---|
152 | //
|
---|
153 | // The function follows redirects. Use Do* for manually handling redirects.
|
---|
154 | //
|
---|
155 | // ErrTimeout error is returned if url contents couldn't be fetched
|
---|
156 | // until the given deadline.
|
---|
157 | func GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
|
---|
158 | return defaultClient.GetDeadline(dst, url, deadline)
|
---|
159 | }
|
---|
160 |
|
---|
161 | // Post sends POST request to the given url with the given POST arguments.
|
---|
162 | //
|
---|
163 | // The contents of dst will be replaced by the body and returned, if the dst
|
---|
164 | // is too small a new slice will be allocated.
|
---|
165 | //
|
---|
166 | // The function follows redirects. Use Do* for manually handling redirects.
|
---|
167 | //
|
---|
168 | // Empty POST body is sent if postArgs is nil.
|
---|
169 | func Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
|
---|
170 | return defaultClient.Post(dst, url, postArgs)
|
---|
171 | }
|
---|
172 |
|
---|
173 | var defaultClient Client
|
---|
174 |
|
---|
175 | // Client implements http client.
|
---|
176 | //
|
---|
177 | // Copying Client by value is prohibited. Create new instance instead.
|
---|
178 | //
|
---|
179 | // It is safe calling Client methods from concurrently running goroutines.
|
---|
180 | //
|
---|
181 | // The fields of a Client should not be changed while it is in use.
|
---|
182 | type Client struct {
|
---|
183 | noCopy noCopy //nolint:unused,structcheck
|
---|
184 |
|
---|
185 | // Client name. Used in User-Agent request header.
|
---|
186 | //
|
---|
187 | // Default client name is used if not set.
|
---|
188 | Name string
|
---|
189 |
|
---|
190 | // NoDefaultUserAgentHeader when set to true, causes the default
|
---|
191 | // User-Agent header to be excluded from the Request.
|
---|
192 | NoDefaultUserAgentHeader bool
|
---|
193 |
|
---|
194 | // Callback for establishing new connections to hosts.
|
---|
195 | //
|
---|
196 | // Default Dial is used if not set.
|
---|
197 | Dial DialFunc
|
---|
198 |
|
---|
199 | // Attempt to connect to both ipv4 and ipv6 addresses if set to true.
|
---|
200 | //
|
---|
201 | // This option is used only if default TCP dialer is used,
|
---|
202 | // i.e. if Dial is blank.
|
---|
203 | //
|
---|
204 | // By default client connects only to ipv4 addresses,
|
---|
205 | // since unfortunately ipv6 remains broken in many networks worldwide :)
|
---|
206 | DialDualStack bool
|
---|
207 |
|
---|
208 | // TLS config for https connections.
|
---|
209 | //
|
---|
210 | // Default TLS config is used if not set.
|
---|
211 | TLSConfig *tls.Config
|
---|
212 |
|
---|
213 | // Maximum number of connections per each host which may be established.
|
---|
214 | //
|
---|
215 | // DefaultMaxConnsPerHost is used if not set.
|
---|
216 | MaxConnsPerHost int
|
---|
217 |
|
---|
218 | // Idle keep-alive connections are closed after this duration.
|
---|
219 | //
|
---|
220 | // By default idle connections are closed
|
---|
221 | // after DefaultMaxIdleConnDuration.
|
---|
222 | MaxIdleConnDuration time.Duration
|
---|
223 |
|
---|
224 | // Keep-alive connections are closed after this duration.
|
---|
225 | //
|
---|
226 | // By default connection duration is unlimited.
|
---|
227 | MaxConnDuration time.Duration
|
---|
228 |
|
---|
229 | // Maximum number of attempts for idempotent calls
|
---|
230 | //
|
---|
231 | // DefaultMaxIdemponentCallAttempts is used if not set.
|
---|
232 | MaxIdemponentCallAttempts int
|
---|
233 |
|
---|
234 | // Per-connection buffer size for responses' reading.
|
---|
235 | // This also limits the maximum header size.
|
---|
236 | //
|
---|
237 | // Default buffer size is used if 0.
|
---|
238 | ReadBufferSize int
|
---|
239 |
|
---|
240 | // Per-connection buffer size for requests' writing.
|
---|
241 | //
|
---|
242 | // Default buffer size is used if 0.
|
---|
243 | WriteBufferSize int
|
---|
244 |
|
---|
245 | // Maximum duration for full response reading (including body).
|
---|
246 | //
|
---|
247 | // By default response read timeout is unlimited.
|
---|
248 | ReadTimeout time.Duration
|
---|
249 |
|
---|
250 | // Maximum duration for full request writing (including body).
|
---|
251 | //
|
---|
252 | // By default request write timeout is unlimited.
|
---|
253 | WriteTimeout time.Duration
|
---|
254 |
|
---|
255 | // Maximum response body size.
|
---|
256 | //
|
---|
257 | // The client returns ErrBodyTooLarge if this limit is greater than 0
|
---|
258 | // and response body is greater than the limit.
|
---|
259 | //
|
---|
260 | // By default response body size is unlimited.
|
---|
261 | MaxResponseBodySize int
|
---|
262 |
|
---|
263 | // Header names are passed as-is without normalization
|
---|
264 | // if this option is set.
|
---|
265 | //
|
---|
266 | // Disabled header names' normalization may be useful only for proxying
|
---|
267 | // responses to other clients expecting case-sensitive
|
---|
268 | // header names. See https://github.com/valyala/fasthttp/issues/57
|
---|
269 | // for details.
|
---|
270 | //
|
---|
271 | // By default request and response header names are normalized, i.e.
|
---|
272 | // The first letter and the first letters following dashes
|
---|
273 | // are uppercased, while all the other letters are lowercased.
|
---|
274 | // Examples:
|
---|
275 | //
|
---|
276 | // * HOST -> Host
|
---|
277 | // * content-type -> Content-Type
|
---|
278 | // * cONTENT-lenGTH -> Content-Length
|
---|
279 | DisableHeaderNamesNormalizing bool
|
---|
280 |
|
---|
281 | // Path values are sent as-is without normalization
|
---|
282 | //
|
---|
283 | // Disabled path normalization may be useful for proxying incoming requests
|
---|
284 | // to servers that are expecting paths to be forwarded as-is.
|
---|
285 | //
|
---|
286 | // By default path values are normalized, i.e.
|
---|
287 | // extra slashes are removed, special characters are encoded.
|
---|
288 | DisablePathNormalizing bool
|
---|
289 |
|
---|
290 | // Maximum duration for waiting for a free connection.
|
---|
291 | //
|
---|
292 | // By default will not waiting, return ErrNoFreeConns immediately
|
---|
293 | MaxConnWaitTimeout time.Duration
|
---|
294 |
|
---|
295 | // RetryIf controls whether a retry should be attempted after an error.
|
---|
296 | //
|
---|
297 | // By default will use isIdempotent function
|
---|
298 | RetryIf RetryIfFunc
|
---|
299 |
|
---|
300 | // ConfigureClient configures the fasthttp.HostClient.
|
---|
301 | ConfigureClient func(hc *HostClient) error
|
---|
302 |
|
---|
303 | mLock sync.Mutex
|
---|
304 | m map[string]*HostClient
|
---|
305 | ms map[string]*HostClient
|
---|
306 | readerPool sync.Pool
|
---|
307 | writerPool sync.Pool
|
---|
308 | }
|
---|
309 |
|
---|
310 | // Get returns the status code and body of url.
|
---|
311 | //
|
---|
312 | // The contents of dst will be replaced by the body and returned, if the dst
|
---|
313 | // is too small a new slice will be allocated.
|
---|
314 | //
|
---|
315 | // The function follows redirects. Use Do* for manually handling redirects.
|
---|
316 | func (c *Client) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
|
---|
317 | return clientGetURL(dst, url, c)
|
---|
318 | }
|
---|
319 |
|
---|
320 | // GetTimeout returns the status code and body of url.
|
---|
321 | //
|
---|
322 | // The contents of dst will be replaced by the body and returned, if the dst
|
---|
323 | // is too small a new slice will be allocated.
|
---|
324 | //
|
---|
325 | // The function follows redirects. Use Do* for manually handling redirects.
|
---|
326 | //
|
---|
327 | // ErrTimeout error is returned if url contents couldn't be fetched
|
---|
328 | // during the given timeout.
|
---|
329 | func (c *Client) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
|
---|
330 | return clientGetURLTimeout(dst, url, timeout, c)
|
---|
331 | }
|
---|
332 |
|
---|
333 | // GetDeadline returns the status code and body of url.
|
---|
334 | //
|
---|
335 | // The contents of dst will be replaced by the body and returned, if the dst
|
---|
336 | // is too small a new slice will be allocated.
|
---|
337 | //
|
---|
338 | // The function follows redirects. Use Do* for manually handling redirects.
|
---|
339 | //
|
---|
340 | // ErrTimeout error is returned if url contents couldn't be fetched
|
---|
341 | // until the given deadline.
|
---|
342 | func (c *Client) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
|
---|
343 | return clientGetURLDeadline(dst, url, deadline, c)
|
---|
344 | }
|
---|
345 |
|
---|
346 | // Post sends POST request to the given url with the given POST arguments.
|
---|
347 | //
|
---|
348 | // The contents of dst will be replaced by the body and returned, if the dst
|
---|
349 | // is too small a new slice will be allocated.
|
---|
350 | //
|
---|
351 | // The function follows redirects. Use Do* for manually handling redirects.
|
---|
352 | //
|
---|
353 | // Empty POST body is sent if postArgs is nil.
|
---|
354 | func (c *Client) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
|
---|
355 | return clientPostURL(dst, url, postArgs, c)
|
---|
356 | }
|
---|
357 |
|
---|
358 | // DoTimeout performs the given request and waits for response during
|
---|
359 | // the given timeout duration.
|
---|
360 | //
|
---|
361 | // Request must contain at least non-zero RequestURI with full url (including
|
---|
362 | // scheme and host) or non-zero Host header + RequestURI.
|
---|
363 | //
|
---|
364 | // Client determines the server to be requested in the following order:
|
---|
365 | //
|
---|
366 | // - from RequestURI if it contains full url with scheme and host;
|
---|
367 | // - from Host header otherwise.
|
---|
368 | //
|
---|
369 | // The function doesn't follow redirects. Use Get* for following redirects.
|
---|
370 | //
|
---|
371 | // Response is ignored if resp is nil.
|
---|
372 | //
|
---|
373 | // ErrTimeout is returned if the response wasn't returned during
|
---|
374 | // the given timeout.
|
---|
375 | //
|
---|
376 | // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
|
---|
377 | // to the requested host are busy.
|
---|
378 | //
|
---|
379 | // It is recommended obtaining req and resp via AcquireRequest
|
---|
380 | // and AcquireResponse in performance-critical code.
|
---|
381 | //
|
---|
382 | // Warning: DoTimeout does not terminate the request itself. The request will
|
---|
383 | // continue in the background and the response will be discarded.
|
---|
384 | // If requests take too long and the connection pool gets filled up please
|
---|
385 | // try setting a ReadTimeout.
|
---|
386 | func (c *Client) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
|
---|
387 | return clientDoTimeout(req, resp, timeout, c)
|
---|
388 | }
|
---|
389 |
|
---|
390 | // DoDeadline performs the given request and waits for response until
|
---|
391 | // the given deadline.
|
---|
392 | //
|
---|
393 | // Request must contain at least non-zero RequestURI with full url (including
|
---|
394 | // scheme and host) or non-zero Host header + RequestURI.
|
---|
395 | //
|
---|
396 | // Client determines the server to be requested in the following order:
|
---|
397 | //
|
---|
398 | // - from RequestURI if it contains full url with scheme and host;
|
---|
399 | // - from Host header otherwise.
|
---|
400 | //
|
---|
401 | // The function doesn't follow redirects. Use Get* for following redirects.
|
---|
402 | //
|
---|
403 | // Response is ignored if resp is nil.
|
---|
404 | //
|
---|
405 | // ErrTimeout is returned if the response wasn't returned until
|
---|
406 | // the given deadline.
|
---|
407 | //
|
---|
408 | // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
|
---|
409 | // to the requested host are busy.
|
---|
410 | //
|
---|
411 | // It is recommended obtaining req and resp via AcquireRequest
|
---|
412 | // and AcquireResponse in performance-critical code.
|
---|
413 | func (c *Client) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
|
---|
414 | return clientDoDeadline(req, resp, deadline, c)
|
---|
415 | }
|
---|
416 |
|
---|
417 | // DoRedirects performs the given http request and fills the given http response,
|
---|
418 | // following up to maxRedirectsCount redirects. When the redirect count exceeds
|
---|
419 | // maxRedirectsCount, ErrTooManyRedirects is returned.
|
---|
420 | //
|
---|
421 | // Request must contain at least non-zero RequestURI with full url (including
|
---|
422 | // scheme and host) or non-zero Host header + RequestURI.
|
---|
423 | //
|
---|
424 | // Client determines the server to be requested in the following order:
|
---|
425 | //
|
---|
426 | // - from RequestURI if it contains full url with scheme and host;
|
---|
427 | // - from Host header otherwise.
|
---|
428 | //
|
---|
429 | // Response is ignored if resp is nil.
|
---|
430 | //
|
---|
431 | // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
|
---|
432 | // to the requested host are busy.
|
---|
433 | //
|
---|
434 | // It is recommended obtaining req and resp via AcquireRequest
|
---|
435 | // and AcquireResponse in performance-critical code.
|
---|
436 | func (c *Client) DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
|
---|
437 | _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, c)
|
---|
438 | return err
|
---|
439 | }
|
---|
440 |
|
---|
441 | // Do performs the given http request and fills the given http response.
|
---|
442 | //
|
---|
443 | // Request must contain at least non-zero RequestURI with full url (including
|
---|
444 | // scheme and host) or non-zero Host header + RequestURI.
|
---|
445 | //
|
---|
446 | // Client determines the server to be requested in the following order:
|
---|
447 | //
|
---|
448 | // - from RequestURI if it contains full url with scheme and host;
|
---|
449 | // - from Host header otherwise.
|
---|
450 | //
|
---|
451 | // Response is ignored if resp is nil.
|
---|
452 | //
|
---|
453 | // The function doesn't follow redirects. Use Get* for following redirects.
|
---|
454 | //
|
---|
455 | // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
|
---|
456 | // to the requested host are busy.
|
---|
457 | //
|
---|
458 | // It is recommended obtaining req and resp via AcquireRequest
|
---|
459 | // and AcquireResponse in performance-critical code.
|
---|
460 | func (c *Client) Do(req *Request, resp *Response) error {
|
---|
461 | uri := req.URI()
|
---|
462 | if uri == nil {
|
---|
463 | return ErrorInvalidURI
|
---|
464 | }
|
---|
465 |
|
---|
466 | host := uri.Host()
|
---|
467 |
|
---|
468 | isTLS := false
|
---|
469 | if uri.isHttps() {
|
---|
470 | isTLS = true
|
---|
471 | } else if !uri.isHttp() {
|
---|
472 | return fmt.Errorf("unsupported protocol %q. http and https are supported", uri.Scheme())
|
---|
473 | }
|
---|
474 |
|
---|
475 | startCleaner := false
|
---|
476 |
|
---|
477 | c.mLock.Lock()
|
---|
478 | m := c.m
|
---|
479 | if isTLS {
|
---|
480 | m = c.ms
|
---|
481 | }
|
---|
482 | if m == nil {
|
---|
483 | m = make(map[string]*HostClient)
|
---|
484 | if isTLS {
|
---|
485 | c.ms = m
|
---|
486 | } else {
|
---|
487 | c.m = m
|
---|
488 | }
|
---|
489 | }
|
---|
490 | hc := m[string(host)]
|
---|
491 | if hc == nil {
|
---|
492 | hc = &HostClient{
|
---|
493 | Addr: addMissingPort(string(host), isTLS),
|
---|
494 | Name: c.Name,
|
---|
495 | NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader,
|
---|
496 | Dial: c.Dial,
|
---|
497 | DialDualStack: c.DialDualStack,
|
---|
498 | IsTLS: isTLS,
|
---|
499 | TLSConfig: c.TLSConfig,
|
---|
500 | MaxConns: c.MaxConnsPerHost,
|
---|
501 | MaxIdleConnDuration: c.MaxIdleConnDuration,
|
---|
502 | MaxConnDuration: c.MaxConnDuration,
|
---|
503 | MaxIdemponentCallAttempts: c.MaxIdemponentCallAttempts,
|
---|
504 | ReadBufferSize: c.ReadBufferSize,
|
---|
505 | WriteBufferSize: c.WriteBufferSize,
|
---|
506 | ReadTimeout: c.ReadTimeout,
|
---|
507 | WriteTimeout: c.WriteTimeout,
|
---|
508 | MaxResponseBodySize: c.MaxResponseBodySize,
|
---|
509 | DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
|
---|
510 | DisablePathNormalizing: c.DisablePathNormalizing,
|
---|
511 | MaxConnWaitTimeout: c.MaxConnWaitTimeout,
|
---|
512 | RetryIf: c.RetryIf,
|
---|
513 | clientReaderPool: &c.readerPool,
|
---|
514 | clientWriterPool: &c.writerPool,
|
---|
515 | }
|
---|
516 |
|
---|
517 | if c.ConfigureClient != nil {
|
---|
518 | if err := c.ConfigureClient(hc); err != nil {
|
---|
519 | return err
|
---|
520 | }
|
---|
521 | }
|
---|
522 |
|
---|
523 | m[string(host)] = hc
|
---|
524 | if len(m) == 1 {
|
---|
525 | startCleaner = true
|
---|
526 | }
|
---|
527 | }
|
---|
528 |
|
---|
529 | atomic.AddInt32(&hc.pendingClientRequests, 1)
|
---|
530 | defer atomic.AddInt32(&hc.pendingClientRequests, -1)
|
---|
531 |
|
---|
532 | c.mLock.Unlock()
|
---|
533 |
|
---|
534 | if startCleaner {
|
---|
535 | go c.mCleaner(m)
|
---|
536 | }
|
---|
537 |
|
---|
538 | return hc.Do(req, resp)
|
---|
539 | }
|
---|
540 |
|
---|
541 | // CloseIdleConnections closes any connections which were previously
|
---|
542 | // connected from previous requests but are now sitting idle in a
|
---|
543 | // "keep-alive" state. It does not interrupt any connections currently
|
---|
544 | // in use.
|
---|
545 | func (c *Client) CloseIdleConnections() {
|
---|
546 | c.mLock.Lock()
|
---|
547 | for _, v := range c.m {
|
---|
548 | v.CloseIdleConnections()
|
---|
549 | }
|
---|
550 | for _, v := range c.ms {
|
---|
551 | v.CloseIdleConnections()
|
---|
552 | }
|
---|
553 | c.mLock.Unlock()
|
---|
554 | }
|
---|
555 |
|
---|
556 | func (c *Client) mCleaner(m map[string]*HostClient) {
|
---|
557 | mustStop := false
|
---|
558 |
|
---|
559 | sleep := c.MaxIdleConnDuration
|
---|
560 | if sleep < time.Second {
|
---|
561 | sleep = time.Second
|
---|
562 | } else if sleep > 10*time.Second {
|
---|
563 | sleep = 10 * time.Second
|
---|
564 | }
|
---|
565 |
|
---|
566 | for {
|
---|
567 | c.mLock.Lock()
|
---|
568 | for k, v := range m {
|
---|
569 | v.connsLock.Lock()
|
---|
570 | if v.connsCount == 0 && atomic.LoadInt32(&v.pendingClientRequests) == 0 {
|
---|
571 | delete(m, k)
|
---|
572 | }
|
---|
573 | v.connsLock.Unlock()
|
---|
574 | }
|
---|
575 | if len(m) == 0 {
|
---|
576 | mustStop = true
|
---|
577 | }
|
---|
578 | c.mLock.Unlock()
|
---|
579 |
|
---|
580 | if mustStop {
|
---|
581 | break
|
---|
582 | }
|
---|
583 | time.Sleep(sleep)
|
---|
584 | }
|
---|
585 | }
|
---|
586 |
|
---|
587 | // DefaultMaxConnsPerHost is the maximum number of concurrent connections
|
---|
588 | // http client may establish per host by default (i.e. if
|
---|
589 | // Client.MaxConnsPerHost isn't set).
|
---|
590 | const DefaultMaxConnsPerHost = 512
|
---|
591 |
|
---|
592 | // DefaultMaxIdleConnDuration is the default duration before idle keep-alive
|
---|
593 | // connection is closed.
|
---|
594 | const DefaultMaxIdleConnDuration = 10 * time.Second
|
---|
595 |
|
---|
596 | // DefaultMaxIdemponentCallAttempts is the default idempotent calls attempts count.
|
---|
597 | const DefaultMaxIdemponentCallAttempts = 5
|
---|
598 |
|
---|
599 | // DialFunc must establish connection to addr.
|
---|
600 | //
|
---|
601 | // There is no need in establishing TLS (SSL) connection for https.
|
---|
602 | // The client automatically converts connection to TLS
|
---|
603 | // if HostClient.IsTLS is set.
|
---|
604 | //
|
---|
605 | // TCP address passed to DialFunc always contains host and port.
|
---|
606 | // Example TCP addr values:
|
---|
607 | //
|
---|
608 | // - foobar.com:80
|
---|
609 | // - foobar.com:443
|
---|
610 | // - foobar.com:8080
|
---|
611 | type DialFunc func(addr string) (net.Conn, error)
|
---|
612 |
|
---|
613 | // RetryIfFunc signature of retry if function
|
---|
614 | //
|
---|
615 | // Request argument passed to RetryIfFunc, if there are any request errors.
|
---|
616 | type RetryIfFunc func(request *Request) bool
|
---|
617 |
|
---|
618 | // TransportFunc wraps every request/response.
|
---|
619 | type TransportFunc func(*Request, *Response) error
|
---|
620 |
|
---|
621 | // HostClient balances http requests among hosts listed in Addr.
|
---|
622 | //
|
---|
623 | // HostClient may be used for balancing load among multiple upstream hosts.
|
---|
624 | // While multiple addresses passed to HostClient.Addr may be used for balancing
|
---|
625 | // load among them, it would be better using LBClient instead, since HostClient
|
---|
626 | // may unevenly balance load among upstream hosts.
|
---|
627 | //
|
---|
628 | // It is forbidden copying HostClient instances. Create new instances instead.
|
---|
629 | //
|
---|
630 | // It is safe calling HostClient methods from concurrently running goroutines.
|
---|
631 | type HostClient struct {
|
---|
632 | noCopy noCopy //nolint:unused,structcheck
|
---|
633 |
|
---|
634 | // Comma-separated list of upstream HTTP server host addresses,
|
---|
635 | // which are passed to Dial in a round-robin manner.
|
---|
636 | //
|
---|
637 | // Each address may contain port if default dialer is used.
|
---|
638 | // For example,
|
---|
639 | //
|
---|
640 | // - foobar.com:80
|
---|
641 | // - foobar.com:443
|
---|
642 | // - foobar.com:8080
|
---|
643 | Addr string
|
---|
644 |
|
---|
645 | // Client name. Used in User-Agent request header.
|
---|
646 | Name string
|
---|
647 |
|
---|
648 | // NoDefaultUserAgentHeader when set to true, causes the default
|
---|
649 | // User-Agent header to be excluded from the Request.
|
---|
650 | NoDefaultUserAgentHeader bool
|
---|
651 |
|
---|
652 | // Callback for establishing new connection to the host.
|
---|
653 | //
|
---|
654 | // Default Dial is used if not set.
|
---|
655 | Dial DialFunc
|
---|
656 |
|
---|
657 | // Attempt to connect to both ipv4 and ipv6 host addresses
|
---|
658 | // if set to true.
|
---|
659 | //
|
---|
660 | // This option is used only if default TCP dialer is used,
|
---|
661 | // i.e. if Dial is blank.
|
---|
662 | //
|
---|
663 | // By default client connects only to ipv4 addresses,
|
---|
664 | // since unfortunately ipv6 remains broken in many networks worldwide :)
|
---|
665 | DialDualStack bool
|
---|
666 |
|
---|
667 | // Whether to use TLS (aka SSL or HTTPS) for host connections.
|
---|
668 | IsTLS bool
|
---|
669 |
|
---|
670 | // Optional TLS config.
|
---|
671 | TLSConfig *tls.Config
|
---|
672 |
|
---|
673 | // Maximum number of connections which may be established to all hosts
|
---|
674 | // listed in Addr.
|
---|
675 | //
|
---|
676 | // You can change this value while the HostClient is being used
|
---|
677 | // using HostClient.SetMaxConns(value)
|
---|
678 | //
|
---|
679 | // DefaultMaxConnsPerHost is used if not set.
|
---|
680 | MaxConns int
|
---|
681 |
|
---|
682 | // Keep-alive connections are closed after this duration.
|
---|
683 | //
|
---|
684 | // By default connection duration is unlimited.
|
---|
685 | MaxConnDuration time.Duration
|
---|
686 |
|
---|
687 | // Idle keep-alive connections are closed after this duration.
|
---|
688 | //
|
---|
689 | // By default idle connections are closed
|
---|
690 | // after DefaultMaxIdleConnDuration.
|
---|
691 | MaxIdleConnDuration time.Duration
|
---|
692 |
|
---|
693 | // Maximum number of attempts for idempotent calls
|
---|
694 | //
|
---|
695 | // DefaultMaxIdemponentCallAttempts is used if not set.
|
---|
696 | MaxIdemponentCallAttempts int
|
---|
697 |
|
---|
698 | // Per-connection buffer size for responses' reading.
|
---|
699 | // This also limits the maximum header size.
|
---|
700 | //
|
---|
701 | // Default buffer size is used if 0.
|
---|
702 | ReadBufferSize int
|
---|
703 |
|
---|
704 | // Per-connection buffer size for requests' writing.
|
---|
705 | //
|
---|
706 | // Default buffer size is used if 0.
|
---|
707 | WriteBufferSize int
|
---|
708 |
|
---|
709 | // Maximum duration for full response reading (including body).
|
---|
710 | //
|
---|
711 | // By default response read timeout is unlimited.
|
---|
712 | ReadTimeout time.Duration
|
---|
713 |
|
---|
714 | // Maximum duration for full request writing (including body).
|
---|
715 | //
|
---|
716 | // By default request write timeout is unlimited.
|
---|
717 | WriteTimeout time.Duration
|
---|
718 |
|
---|
719 | // Maximum response body size.
|
---|
720 | //
|
---|
721 | // The client returns ErrBodyTooLarge if this limit is greater than 0
|
---|
722 | // and response body is greater than the limit.
|
---|
723 | //
|
---|
724 | // By default response body size is unlimited.
|
---|
725 | MaxResponseBodySize int
|
---|
726 |
|
---|
727 | // Header names are passed as-is without normalization
|
---|
728 | // if this option is set.
|
---|
729 | //
|
---|
730 | // Disabled header names' normalization may be useful only for proxying
|
---|
731 | // responses to other clients expecting case-sensitive
|
---|
732 | // header names. See https://github.com/valyala/fasthttp/issues/57
|
---|
733 | // for details.
|
---|
734 | //
|
---|
735 | // By default request and response header names are normalized, i.e.
|
---|
736 | // The first letter and the first letters following dashes
|
---|
737 | // are uppercased, while all the other letters are lowercased.
|
---|
738 | // Examples:
|
---|
739 | //
|
---|
740 | // * HOST -> Host
|
---|
741 | // * content-type -> Content-Type
|
---|
742 | // * cONTENT-lenGTH -> Content-Length
|
---|
743 | DisableHeaderNamesNormalizing bool
|
---|
744 |
|
---|
745 | // Path values are sent as-is without normalization
|
---|
746 | //
|
---|
747 | // Disabled path normalization may be useful for proxying incoming requests
|
---|
748 | // to servers that are expecting paths to be forwarded as-is.
|
---|
749 | //
|
---|
750 | // By default path values are normalized, i.e.
|
---|
751 | // extra slashes are removed, special characters are encoded.
|
---|
752 | DisablePathNormalizing bool
|
---|
753 |
|
---|
754 | // Will not log potentially sensitive content in error logs
|
---|
755 | //
|
---|
756 | // This option is useful for servers that handle sensitive data
|
---|
757 | // in the request/response.
|
---|
758 | //
|
---|
759 | // Client logs full errors by default.
|
---|
760 | SecureErrorLogMessage bool
|
---|
761 |
|
---|
762 | // Maximum duration for waiting for a free connection.
|
---|
763 | //
|
---|
764 | // By default will not waiting, return ErrNoFreeConns immediately
|
---|
765 | MaxConnWaitTimeout time.Duration
|
---|
766 |
|
---|
767 | // RetryIf controls whether a retry should be attempted after an error.
|
---|
768 | //
|
---|
769 | // By default will use isIdempotent function
|
---|
770 | RetryIf RetryIfFunc
|
---|
771 |
|
---|
772 | // Transport defines a transport-like mechanism that wraps every request/response.
|
---|
773 | Transport TransportFunc
|
---|
774 |
|
---|
775 | clientName atomic.Value
|
---|
776 | lastUseTime uint32
|
---|
777 |
|
---|
778 | connsLock sync.Mutex
|
---|
779 | connsCount int
|
---|
780 | conns []*clientConn
|
---|
781 | connsWait *wantConnQueue
|
---|
782 |
|
---|
783 | addrsLock sync.Mutex
|
---|
784 | addrs []string
|
---|
785 | addrIdx uint32
|
---|
786 |
|
---|
787 | tlsConfigMap map[string]*tls.Config
|
---|
788 | tlsConfigMapLock sync.Mutex
|
---|
789 |
|
---|
790 | readerPool sync.Pool
|
---|
791 | writerPool sync.Pool
|
---|
792 |
|
---|
793 | clientReaderPool *sync.Pool
|
---|
794 | clientWriterPool *sync.Pool
|
---|
795 |
|
---|
796 | pendingRequests int32
|
---|
797 |
|
---|
798 | // pendingClientRequests counts the number of requests that a Client is currently running using this HostClient.
|
---|
799 | // It will be incremented ealier than pendingRequests and will be used by Client to see if the HostClient is still in use.
|
---|
800 | pendingClientRequests int32
|
---|
801 |
|
---|
802 | connsCleanerRun bool
|
---|
803 | }
|
---|
804 |
|
---|
805 | type clientConn struct {
|
---|
806 | c net.Conn
|
---|
807 |
|
---|
808 | createdTime time.Time
|
---|
809 | lastUseTime time.Time
|
---|
810 | }
|
---|
811 |
|
---|
812 | var startTimeUnix = time.Now().Unix()
|
---|
813 |
|
---|
814 | // LastUseTime returns time the client was last used
|
---|
815 | func (c *HostClient) LastUseTime() time.Time {
|
---|
816 | n := atomic.LoadUint32(&c.lastUseTime)
|
---|
817 | return time.Unix(startTimeUnix+int64(n), 0)
|
---|
818 | }
|
---|
819 |
|
---|
820 | // Get returns the status code and body of url.
|
---|
821 | //
|
---|
822 | // The contents of dst will be replaced by the body and returned, if the dst
|
---|
823 | // is too small a new slice will be allocated.
|
---|
824 | //
|
---|
825 | // The function follows redirects. Use Do* for manually handling redirects.
|
---|
826 | func (c *HostClient) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
|
---|
827 | return clientGetURL(dst, url, c)
|
---|
828 | }
|
---|
829 |
|
---|
830 | // GetTimeout returns the status code and body of url.
|
---|
831 | //
|
---|
832 | // The contents of dst will be replaced by the body and returned, if the dst
|
---|
833 | // is too small a new slice will be allocated.
|
---|
834 | //
|
---|
835 | // The function follows redirects. Use Do* for manually handling redirects.
|
---|
836 | //
|
---|
837 | // ErrTimeout error is returned if url contents couldn't be fetched
|
---|
838 | // during the given timeout.
|
---|
839 | func (c *HostClient) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
|
---|
840 | return clientGetURLTimeout(dst, url, timeout, c)
|
---|
841 | }
|
---|
842 |
|
---|
843 | // GetDeadline returns the status code and body of url.
|
---|
844 | //
|
---|
845 | // The contents of dst will be replaced by the body and returned, if the dst
|
---|
846 | // is too small a new slice will be allocated.
|
---|
847 | //
|
---|
848 | // The function follows redirects. Use Do* for manually handling redirects.
|
---|
849 | //
|
---|
850 | // ErrTimeout error is returned if url contents couldn't be fetched
|
---|
851 | // until the given deadline.
|
---|
852 | func (c *HostClient) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
|
---|
853 | return clientGetURLDeadline(dst, url, deadline, c)
|
---|
854 | }
|
---|
855 |
|
---|
856 | // Post sends POST request to the given url with the given POST arguments.
|
---|
857 | //
|
---|
858 | // The contents of dst will be replaced by the body and returned, if the dst
|
---|
859 | // is too small a new slice will be allocated.
|
---|
860 | //
|
---|
861 | // The function follows redirects. Use Do* for manually handling redirects.
|
---|
862 | //
|
---|
863 | // Empty POST body is sent if postArgs is nil.
|
---|
864 | func (c *HostClient) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
|
---|
865 | return clientPostURL(dst, url, postArgs, c)
|
---|
866 | }
|
---|
867 |
|
---|
868 | type clientDoer interface {
|
---|
869 | Do(req *Request, resp *Response) error
|
---|
870 | }
|
---|
871 |
|
---|
872 | func clientGetURL(dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
|
---|
873 | req := AcquireRequest()
|
---|
874 |
|
---|
875 | statusCode, body, err = doRequestFollowRedirectsBuffer(req, dst, url, c)
|
---|
876 |
|
---|
877 | ReleaseRequest(req)
|
---|
878 | return statusCode, body, err
|
---|
879 | }
|
---|
880 |
|
---|
881 | func clientGetURLTimeout(dst []byte, url string, timeout time.Duration, c clientDoer) (statusCode int, body []byte, err error) {
|
---|
882 | deadline := time.Now().Add(timeout)
|
---|
883 | return clientGetURLDeadline(dst, url, deadline, c)
|
---|
884 | }
|
---|
885 |
|
---|
886 | type clientURLResponse struct {
|
---|
887 | statusCode int
|
---|
888 | body []byte
|
---|
889 | err error
|
---|
890 | }
|
---|
891 |
|
---|
892 | func clientGetURLDeadline(dst []byte, url string, deadline time.Time, c clientDoer) (statusCode int, body []byte, err error) {
|
---|
893 | timeout := -time.Since(deadline)
|
---|
894 | if timeout <= 0 {
|
---|
895 | return 0, dst, ErrTimeout
|
---|
896 | }
|
---|
897 |
|
---|
898 | var ch chan clientURLResponse
|
---|
899 | chv := clientURLResponseChPool.Get()
|
---|
900 | if chv == nil {
|
---|
901 | chv = make(chan clientURLResponse, 1)
|
---|
902 | }
|
---|
903 | ch = chv.(chan clientURLResponse)
|
---|
904 |
|
---|
905 | // Note that the request continues execution on ErrTimeout until
|
---|
906 | // client-specific ReadTimeout exceeds. This helps limiting load
|
---|
907 | // on slow hosts by MaxConns* concurrent requests.
|
---|
908 | //
|
---|
909 | // Without this 'hack' the load on slow host could exceed MaxConns*
|
---|
910 | // concurrent requests, since timed out requests on client side
|
---|
911 | // usually continue execution on the host.
|
---|
912 |
|
---|
913 | var mu sync.Mutex
|
---|
914 | var timedout, responded bool
|
---|
915 |
|
---|
916 | go func() {
|
---|
917 | req := AcquireRequest()
|
---|
918 |
|
---|
919 | statusCodeCopy, bodyCopy, errCopy := doRequestFollowRedirectsBuffer(req, dst, url, c)
|
---|
920 | mu.Lock()
|
---|
921 | {
|
---|
922 | if !timedout {
|
---|
923 | ch <- clientURLResponse{
|
---|
924 | statusCode: statusCodeCopy,
|
---|
925 | body: bodyCopy,
|
---|
926 | err: errCopy,
|
---|
927 | }
|
---|
928 | responded = true
|
---|
929 | }
|
---|
930 | }
|
---|
931 | mu.Unlock()
|
---|
932 |
|
---|
933 | ReleaseRequest(req)
|
---|
934 | }()
|
---|
935 |
|
---|
936 | tc := AcquireTimer(timeout)
|
---|
937 | select {
|
---|
938 | case resp := <-ch:
|
---|
939 | statusCode = resp.statusCode
|
---|
940 | body = resp.body
|
---|
941 | err = resp.err
|
---|
942 | case <-tc.C:
|
---|
943 | mu.Lock()
|
---|
944 | {
|
---|
945 | if responded {
|
---|
946 | resp := <-ch
|
---|
947 | statusCode = resp.statusCode
|
---|
948 | body = resp.body
|
---|
949 | err = resp.err
|
---|
950 | } else {
|
---|
951 | timedout = true
|
---|
952 | err = ErrTimeout
|
---|
953 | body = dst
|
---|
954 | }
|
---|
955 | }
|
---|
956 | mu.Unlock()
|
---|
957 | }
|
---|
958 | ReleaseTimer(tc)
|
---|
959 |
|
---|
960 | clientURLResponseChPool.Put(chv)
|
---|
961 |
|
---|
962 | return statusCode, body, err
|
---|
963 | }
|
---|
964 |
|
---|
965 | var clientURLResponseChPool sync.Pool
|
---|
966 |
|
---|
967 | func clientPostURL(dst []byte, url string, postArgs *Args, c clientDoer) (statusCode int, body []byte, err error) {
|
---|
968 | req := AcquireRequest()
|
---|
969 | req.Header.SetMethod(MethodPost)
|
---|
970 | req.Header.SetContentTypeBytes(strPostArgsContentType)
|
---|
971 | if postArgs != nil {
|
---|
972 | if _, err := postArgs.WriteTo(req.BodyWriter()); err != nil {
|
---|
973 | return 0, nil, err
|
---|
974 | }
|
---|
975 | }
|
---|
976 |
|
---|
977 | statusCode, body, err = doRequestFollowRedirectsBuffer(req, dst, url, c)
|
---|
978 |
|
---|
979 | ReleaseRequest(req)
|
---|
980 | return statusCode, body, err
|
---|
981 | }
|
---|
982 |
|
---|
983 | var (
|
---|
984 | // ErrMissingLocation is returned by clients when the Location header is missing on
|
---|
985 | // an HTTP response with a redirect status code.
|
---|
986 | ErrMissingLocation = errors.New("missing Location header for http redirect")
|
---|
987 | // ErrTooManyRedirects is returned by clients when the number of redirects followed
|
---|
988 | // exceed the max count.
|
---|
989 | ErrTooManyRedirects = errors.New("too many redirects detected when doing the request")
|
---|
990 |
|
---|
991 | // HostClients are only able to follow redirects to the same protocol.
|
---|
992 | ErrHostClientRedirectToDifferentScheme = errors.New("HostClient can't follow redirects to a different protocol, please use Client instead")
|
---|
993 | )
|
---|
994 |
|
---|
995 | const defaultMaxRedirectsCount = 16
|
---|
996 |
|
---|
997 | func doRequestFollowRedirectsBuffer(req *Request, dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
|
---|
998 | resp := AcquireResponse()
|
---|
999 | bodyBuf := resp.bodyBuffer()
|
---|
1000 | resp.keepBodyBuffer = true
|
---|
1001 | oldBody := bodyBuf.B
|
---|
1002 | bodyBuf.B = dst
|
---|
1003 |
|
---|
1004 | statusCode, _, err = doRequestFollowRedirects(req, resp, url, defaultMaxRedirectsCount, c)
|
---|
1005 |
|
---|
1006 | body = bodyBuf.B
|
---|
1007 | bodyBuf.B = oldBody
|
---|
1008 | resp.keepBodyBuffer = false
|
---|
1009 | ReleaseResponse(resp)
|
---|
1010 |
|
---|
1011 | return statusCode, body, err
|
---|
1012 | }
|
---|
1013 |
|
---|
1014 | func doRequestFollowRedirects(req *Request, resp *Response, url string, maxRedirectsCount int, c clientDoer) (statusCode int, body []byte, err error) {
|
---|
1015 | redirectsCount := 0
|
---|
1016 |
|
---|
1017 | for {
|
---|
1018 | req.SetRequestURI(url)
|
---|
1019 | if err := req.parseURI(); err != nil {
|
---|
1020 | return 0, nil, err
|
---|
1021 | }
|
---|
1022 |
|
---|
1023 | if err = c.Do(req, resp); err != nil {
|
---|
1024 | break
|
---|
1025 | }
|
---|
1026 | statusCode = resp.Header.StatusCode()
|
---|
1027 | if !StatusCodeIsRedirect(statusCode) {
|
---|
1028 | break
|
---|
1029 | }
|
---|
1030 |
|
---|
1031 | redirectsCount++
|
---|
1032 | if redirectsCount > maxRedirectsCount {
|
---|
1033 | err = ErrTooManyRedirects
|
---|
1034 | break
|
---|
1035 | }
|
---|
1036 | location := resp.Header.peek(strLocation)
|
---|
1037 | if len(location) == 0 {
|
---|
1038 | err = ErrMissingLocation
|
---|
1039 | break
|
---|
1040 | }
|
---|
1041 | url = getRedirectURL(url, location)
|
---|
1042 | }
|
---|
1043 |
|
---|
1044 | return statusCode, body, err
|
---|
1045 | }
|
---|
1046 |
|
---|
1047 | func getRedirectURL(baseURL string, location []byte) string {
|
---|
1048 | u := AcquireURI()
|
---|
1049 | u.Update(baseURL)
|
---|
1050 | u.UpdateBytes(location)
|
---|
1051 | redirectURL := u.String()
|
---|
1052 | ReleaseURI(u)
|
---|
1053 | return redirectURL
|
---|
1054 | }
|
---|
1055 |
|
---|
1056 | // StatusCodeIsRedirect returns true if the status code indicates a redirect.
|
---|
1057 | func StatusCodeIsRedirect(statusCode int) bool {
|
---|
1058 | return statusCode == StatusMovedPermanently ||
|
---|
1059 | statusCode == StatusFound ||
|
---|
1060 | statusCode == StatusSeeOther ||
|
---|
1061 | statusCode == StatusTemporaryRedirect ||
|
---|
1062 | statusCode == StatusPermanentRedirect
|
---|
1063 | }
|
---|
1064 |
|
---|
1065 | var (
|
---|
1066 | requestPool sync.Pool
|
---|
1067 | responsePool sync.Pool
|
---|
1068 | )
|
---|
1069 |
|
---|
1070 | // AcquireRequest returns an empty Request instance from request pool.
|
---|
1071 | //
|
---|
1072 | // The returned Request instance may be passed to ReleaseRequest when it is
|
---|
1073 | // no longer needed. This allows Request recycling, reduces GC pressure
|
---|
1074 | // and usually improves performance.
|
---|
1075 | func AcquireRequest() *Request {
|
---|
1076 | v := requestPool.Get()
|
---|
1077 | if v == nil {
|
---|
1078 | return &Request{}
|
---|
1079 | }
|
---|
1080 | return v.(*Request)
|
---|
1081 | }
|
---|
1082 |
|
---|
1083 | // ReleaseRequest returns req acquired via AcquireRequest to request pool.
|
---|
1084 | //
|
---|
1085 | // It is forbidden accessing req and/or its' members after returning
|
---|
1086 | // it to request pool.
|
---|
1087 | func ReleaseRequest(req *Request) {
|
---|
1088 | req.Reset()
|
---|
1089 | requestPool.Put(req)
|
---|
1090 | }
|
---|
1091 |
|
---|
1092 | // AcquireResponse returns an empty Response instance from response pool.
|
---|
1093 | //
|
---|
1094 | // The returned Response instance may be passed to ReleaseResponse when it is
|
---|
1095 | // no longer needed. This allows Response recycling, reduces GC pressure
|
---|
1096 | // and usually improves performance.
|
---|
1097 | func AcquireResponse() *Response {
|
---|
1098 | v := responsePool.Get()
|
---|
1099 | if v == nil {
|
---|
1100 | return &Response{}
|
---|
1101 | }
|
---|
1102 | return v.(*Response)
|
---|
1103 | }
|
---|
1104 |
|
---|
1105 | // ReleaseResponse return resp acquired via AcquireResponse to response pool.
|
---|
1106 | //
|
---|
1107 | // It is forbidden accessing resp and/or its' members after returning
|
---|
1108 | // it to response pool.
|
---|
1109 | func ReleaseResponse(resp *Response) {
|
---|
1110 | resp.Reset()
|
---|
1111 | responsePool.Put(resp)
|
---|
1112 | }
|
---|
1113 |
|
---|
1114 | // DoTimeout performs the given request and waits for response during
|
---|
1115 | // the given timeout duration.
|
---|
1116 | //
|
---|
1117 | // Request must contain at least non-zero RequestURI with full url (including
|
---|
1118 | // scheme and host) or non-zero Host header + RequestURI.
|
---|
1119 | //
|
---|
1120 | // The function doesn't follow redirects. Use Get* for following redirects.
|
---|
1121 | //
|
---|
1122 | // Response is ignored if resp is nil.
|
---|
1123 | //
|
---|
1124 | // ErrTimeout is returned if the response wasn't returned during
|
---|
1125 | // the given timeout.
|
---|
1126 | //
|
---|
1127 | // ErrNoFreeConns is returned if all HostClient.MaxConns connections
|
---|
1128 | // to the host are busy.
|
---|
1129 | //
|
---|
1130 | // It is recommended obtaining req and resp via AcquireRequest
|
---|
1131 | // and AcquireResponse in performance-critical code.
|
---|
1132 | //
|
---|
1133 | // Warning: DoTimeout does not terminate the request itself. The request will
|
---|
1134 | // continue in the background and the response will be discarded.
|
---|
1135 | // If requests take too long and the connection pool gets filled up please
|
---|
1136 | // try setting a ReadTimeout.
|
---|
1137 | func (c *HostClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
|
---|
1138 | return clientDoTimeout(req, resp, timeout, c)
|
---|
1139 | }
|
---|
1140 |
|
---|
1141 | // DoDeadline performs the given request and waits for response until
|
---|
1142 | // the given deadline.
|
---|
1143 | //
|
---|
1144 | // Request must contain at least non-zero RequestURI with full url (including
|
---|
1145 | // scheme and host) or non-zero Host header + RequestURI.
|
---|
1146 | //
|
---|
1147 | // The function doesn't follow redirects. Use Get* for following redirects.
|
---|
1148 | //
|
---|
1149 | // Response is ignored if resp is nil.
|
---|
1150 | //
|
---|
1151 | // ErrTimeout is returned if the response wasn't returned until
|
---|
1152 | // the given deadline.
|
---|
1153 | //
|
---|
1154 | // ErrNoFreeConns is returned if all HostClient.MaxConns connections
|
---|
1155 | // to the host are busy.
|
---|
1156 | //
|
---|
1157 | // It is recommended obtaining req and resp via AcquireRequest
|
---|
1158 | // and AcquireResponse in performance-critical code.
|
---|
1159 | func (c *HostClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
|
---|
1160 | return clientDoDeadline(req, resp, deadline, c)
|
---|
1161 | }
|
---|
1162 |
|
---|
1163 | // DoRedirects performs the given http request and fills the given http response,
|
---|
1164 | // following up to maxRedirectsCount redirects. When the redirect count exceeds
|
---|
1165 | // maxRedirectsCount, ErrTooManyRedirects is returned.
|
---|
1166 | //
|
---|
1167 | // Request must contain at least non-zero RequestURI with full url (including
|
---|
1168 | // scheme and host) or non-zero Host header + RequestURI.
|
---|
1169 | //
|
---|
1170 | // Client determines the server to be requested in the following order:
|
---|
1171 | //
|
---|
1172 | // - from RequestURI if it contains full url with scheme and host;
|
---|
1173 | // - from Host header otherwise.
|
---|
1174 | //
|
---|
1175 | // Response is ignored if resp is nil.
|
---|
1176 | //
|
---|
1177 | // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
|
---|
1178 | // to the requested host are busy.
|
---|
1179 | //
|
---|
1180 | // It is recommended obtaining req and resp via AcquireRequest
|
---|
1181 | // and AcquireResponse in performance-critical code.
|
---|
1182 | func (c *HostClient) DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
|
---|
1183 | _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, c)
|
---|
1184 | return err
|
---|
1185 | }
|
---|
1186 |
|
---|
1187 | func clientDoTimeout(req *Request, resp *Response, timeout time.Duration, c clientDoer) error {
|
---|
1188 | deadline := time.Now().Add(timeout)
|
---|
1189 | return clientDoDeadline(req, resp, deadline, c)
|
---|
1190 | }
|
---|
1191 |
|
---|
1192 | func clientDoDeadline(req *Request, resp *Response, deadline time.Time, c clientDoer) error {
|
---|
1193 | timeout := -time.Since(deadline)
|
---|
1194 | if timeout <= 0 {
|
---|
1195 | return ErrTimeout
|
---|
1196 | }
|
---|
1197 |
|
---|
1198 | var ch chan error
|
---|
1199 | chv := errorChPool.Get()
|
---|
1200 | if chv == nil {
|
---|
1201 | chv = make(chan error, 1)
|
---|
1202 | }
|
---|
1203 | ch = chv.(chan error)
|
---|
1204 |
|
---|
1205 | // Make req and resp copies, since on timeout they no longer
|
---|
1206 | // may be accessed.
|
---|
1207 | reqCopy := AcquireRequest()
|
---|
1208 | req.copyToSkipBody(reqCopy)
|
---|
1209 | swapRequestBody(req, reqCopy)
|
---|
1210 | respCopy := AcquireResponse()
|
---|
1211 | if resp != nil {
|
---|
1212 | // Not calling resp.copyToSkipBody(respCopy) here to avoid
|
---|
1213 | // unexpected messing with headers
|
---|
1214 | respCopy.SkipBody = resp.SkipBody
|
---|
1215 | }
|
---|
1216 |
|
---|
1217 | // Note that the request continues execution on ErrTimeout until
|
---|
1218 | // client-specific ReadTimeout exceeds. This helps limiting load
|
---|
1219 | // on slow hosts by MaxConns* concurrent requests.
|
---|
1220 | //
|
---|
1221 | // Without this 'hack' the load on slow host could exceed MaxConns*
|
---|
1222 | // concurrent requests, since timed out requests on client side
|
---|
1223 | // usually continue execution on the host.
|
---|
1224 |
|
---|
1225 | var mu sync.Mutex
|
---|
1226 | var timedout, responded bool
|
---|
1227 |
|
---|
1228 | go func() {
|
---|
1229 | reqCopy.timeout = timeout
|
---|
1230 | errDo := c.Do(reqCopy, respCopy)
|
---|
1231 | mu.Lock()
|
---|
1232 | {
|
---|
1233 | if !timedout {
|
---|
1234 | if resp != nil {
|
---|
1235 | respCopy.copyToSkipBody(resp)
|
---|
1236 | swapResponseBody(resp, respCopy)
|
---|
1237 | }
|
---|
1238 | swapRequestBody(reqCopy, req)
|
---|
1239 | ch <- errDo
|
---|
1240 | responded = true
|
---|
1241 | }
|
---|
1242 | }
|
---|
1243 | mu.Unlock()
|
---|
1244 |
|
---|
1245 | ReleaseResponse(respCopy)
|
---|
1246 | ReleaseRequest(reqCopy)
|
---|
1247 | }()
|
---|
1248 |
|
---|
1249 | tc := AcquireTimer(timeout)
|
---|
1250 | var err error
|
---|
1251 | select {
|
---|
1252 | case err = <-ch:
|
---|
1253 | case <-tc.C:
|
---|
1254 | mu.Lock()
|
---|
1255 | {
|
---|
1256 | if responded {
|
---|
1257 | err = <-ch
|
---|
1258 | } else {
|
---|
1259 | timedout = true
|
---|
1260 | err = ErrTimeout
|
---|
1261 | }
|
---|
1262 | }
|
---|
1263 | mu.Unlock()
|
---|
1264 | }
|
---|
1265 | ReleaseTimer(tc)
|
---|
1266 |
|
---|
1267 | errorChPool.Put(chv)
|
---|
1268 |
|
---|
1269 | return err
|
---|
1270 | }
|
---|
1271 |
|
---|
1272 | var errorChPool sync.Pool
|
---|
1273 |
|
---|
1274 | // Do performs the given http request and sets the corresponding response.
|
---|
1275 | //
|
---|
1276 | // Request must contain at least non-zero RequestURI with full url (including
|
---|
1277 | // scheme and host) or non-zero Host header + RequestURI.
|
---|
1278 | //
|
---|
1279 | // The function doesn't follow redirects. Use Get* for following redirects.
|
---|
1280 | //
|
---|
1281 | // Response is ignored if resp is nil.
|
---|
1282 | //
|
---|
1283 | // ErrNoFreeConns is returned if all HostClient.MaxConns connections
|
---|
1284 | // to the host are busy.
|
---|
1285 | //
|
---|
1286 | // It is recommended obtaining req and resp via AcquireRequest
|
---|
1287 | // and AcquireResponse in performance-critical code.
|
---|
1288 | func (c *HostClient) Do(req *Request, resp *Response) error {
|
---|
1289 | var err error
|
---|
1290 | var retry bool
|
---|
1291 | maxAttempts := c.MaxIdemponentCallAttempts
|
---|
1292 | if maxAttempts <= 0 {
|
---|
1293 | maxAttempts = DefaultMaxIdemponentCallAttempts
|
---|
1294 | }
|
---|
1295 | isRequestRetryable := isIdempotent
|
---|
1296 | if c.RetryIf != nil {
|
---|
1297 | isRequestRetryable = c.RetryIf
|
---|
1298 | }
|
---|
1299 | attempts := 0
|
---|
1300 | hasBodyStream := req.IsBodyStream()
|
---|
1301 |
|
---|
1302 | atomic.AddInt32(&c.pendingRequests, 1)
|
---|
1303 | for {
|
---|
1304 | retry, err = c.do(req, resp)
|
---|
1305 | if err == nil || !retry {
|
---|
1306 | break
|
---|
1307 | }
|
---|
1308 |
|
---|
1309 | if hasBodyStream {
|
---|
1310 | break
|
---|
1311 | }
|
---|
1312 | if !isRequestRetryable(req) {
|
---|
1313 | // Retry non-idempotent requests if the server closes
|
---|
1314 | // the connection before sending the response.
|
---|
1315 | //
|
---|
1316 | // This case is possible if the server closes the idle
|
---|
1317 | // keep-alive connection on timeout.
|
---|
1318 | //
|
---|
1319 | // Apache and nginx usually do this.
|
---|
1320 | if err != io.EOF {
|
---|
1321 | break
|
---|
1322 | }
|
---|
1323 | }
|
---|
1324 | attempts++
|
---|
1325 | if attempts >= maxAttempts {
|
---|
1326 | break
|
---|
1327 | }
|
---|
1328 | }
|
---|
1329 | atomic.AddInt32(&c.pendingRequests, -1)
|
---|
1330 |
|
---|
1331 | if err == io.EOF {
|
---|
1332 | err = ErrConnectionClosed
|
---|
1333 | }
|
---|
1334 | return err
|
---|
1335 | }
|
---|
1336 |
|
---|
1337 | // PendingRequests returns the current number of requests the client
|
---|
1338 | // is executing.
|
---|
1339 | //
|
---|
1340 | // This function may be used for balancing load among multiple HostClient
|
---|
1341 | // instances.
|
---|
1342 | func (c *HostClient) PendingRequests() int {
|
---|
1343 | return int(atomic.LoadInt32(&c.pendingRequests))
|
---|
1344 | }
|
---|
1345 |
|
---|
1346 | func isIdempotent(req *Request) bool {
|
---|
1347 | return req.Header.IsGet() || req.Header.IsHead() || req.Header.IsPut()
|
---|
1348 | }
|
---|
1349 |
|
---|
1350 | func (c *HostClient) do(req *Request, resp *Response) (bool, error) {
|
---|
1351 | nilResp := false
|
---|
1352 | if resp == nil {
|
---|
1353 | nilResp = true
|
---|
1354 | resp = AcquireResponse()
|
---|
1355 | }
|
---|
1356 |
|
---|
1357 | ok, err := c.doNonNilReqResp(req, resp)
|
---|
1358 |
|
---|
1359 | if nilResp {
|
---|
1360 | ReleaseResponse(resp)
|
---|
1361 | }
|
---|
1362 |
|
---|
1363 | return ok, err
|
---|
1364 | }
|
---|
1365 |
|
---|
1366 | func (c *HostClient) doNonNilReqResp(req *Request, resp *Response) (bool, error) {
|
---|
1367 | if req == nil {
|
---|
1368 | panic("BUG: req cannot be nil")
|
---|
1369 | }
|
---|
1370 | if resp == nil {
|
---|
1371 | panic("BUG: resp cannot be nil")
|
---|
1372 | }
|
---|
1373 |
|
---|
1374 | // Secure header error logs configuration
|
---|
1375 | resp.secureErrorLogMessage = c.SecureErrorLogMessage
|
---|
1376 | resp.Header.secureErrorLogMessage = c.SecureErrorLogMessage
|
---|
1377 | req.secureErrorLogMessage = c.SecureErrorLogMessage
|
---|
1378 | req.Header.secureErrorLogMessage = c.SecureErrorLogMessage
|
---|
1379 |
|
---|
1380 | if c.IsTLS != req.URI().isHttps() {
|
---|
1381 | return false, ErrHostClientRedirectToDifferentScheme
|
---|
1382 | }
|
---|
1383 |
|
---|
1384 | atomic.StoreUint32(&c.lastUseTime, uint32(time.Now().Unix()-startTimeUnix))
|
---|
1385 |
|
---|
1386 | // Free up resources occupied by response before sending the request,
|
---|
1387 | // so the GC may reclaim these resources (e.g. response body).
|
---|
1388 |
|
---|
1389 | // backing up SkipBody in case it was set explicitly
|
---|
1390 | customSkipBody := resp.SkipBody
|
---|
1391 | resp.Reset()
|
---|
1392 | resp.SkipBody = customSkipBody
|
---|
1393 |
|
---|
1394 | req.URI().DisablePathNormalizing = c.DisablePathNormalizing
|
---|
1395 |
|
---|
1396 | userAgentOld := req.Header.UserAgent()
|
---|
1397 | if len(userAgentOld) == 0 {
|
---|
1398 | req.Header.userAgent = append(req.Header.userAgent[:0], c.getClientName()...)
|
---|
1399 | }
|
---|
1400 |
|
---|
1401 | if c.Transport != nil {
|
---|
1402 | err := c.Transport(req, resp)
|
---|
1403 | return err == nil, err
|
---|
1404 | }
|
---|
1405 |
|
---|
1406 | cc, err := c.acquireConn(req.timeout, req.ConnectionClose())
|
---|
1407 | if err != nil {
|
---|
1408 | return false, err
|
---|
1409 | }
|
---|
1410 | conn := cc.c
|
---|
1411 |
|
---|
1412 | resp.parseNetConn(conn)
|
---|
1413 |
|
---|
1414 | if c.WriteTimeout > 0 {
|
---|
1415 | // Set Deadline every time, since golang has fixed the performance issue
|
---|
1416 | // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
|
---|
1417 | currentTime := time.Now()
|
---|
1418 | if err = conn.SetWriteDeadline(currentTime.Add(c.WriteTimeout)); err != nil {
|
---|
1419 | c.closeConn(cc)
|
---|
1420 | return true, err
|
---|
1421 | }
|
---|
1422 | }
|
---|
1423 |
|
---|
1424 | resetConnection := false
|
---|
1425 | if c.MaxConnDuration > 0 && time.Since(cc.createdTime) > c.MaxConnDuration && !req.ConnectionClose() {
|
---|
1426 | req.SetConnectionClose()
|
---|
1427 | resetConnection = true
|
---|
1428 | }
|
---|
1429 |
|
---|
1430 | bw := c.acquireWriter(conn)
|
---|
1431 | err = req.Write(bw)
|
---|
1432 |
|
---|
1433 | if resetConnection {
|
---|
1434 | req.Header.ResetConnectionClose()
|
---|
1435 | }
|
---|
1436 |
|
---|
1437 | if err == nil {
|
---|
1438 | err = bw.Flush()
|
---|
1439 | }
|
---|
1440 | if err != nil {
|
---|
1441 | c.releaseWriter(bw)
|
---|
1442 | c.closeConn(cc)
|
---|
1443 | return true, err
|
---|
1444 | }
|
---|
1445 | c.releaseWriter(bw)
|
---|
1446 |
|
---|
1447 | if c.ReadTimeout > 0 {
|
---|
1448 | // Set Deadline every time, since golang has fixed the performance issue
|
---|
1449 | // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
|
---|
1450 | currentTime := time.Now()
|
---|
1451 | if err = conn.SetReadDeadline(currentTime.Add(c.ReadTimeout)); err != nil {
|
---|
1452 | c.closeConn(cc)
|
---|
1453 | return true, err
|
---|
1454 | }
|
---|
1455 | }
|
---|
1456 |
|
---|
1457 | if customSkipBody || req.Header.IsHead() {
|
---|
1458 | resp.SkipBody = true
|
---|
1459 | }
|
---|
1460 | if c.DisableHeaderNamesNormalizing {
|
---|
1461 | resp.Header.DisableNormalizing()
|
---|
1462 | }
|
---|
1463 |
|
---|
1464 | br := c.acquireReader(conn)
|
---|
1465 | if err = resp.ReadLimitBody(br, c.MaxResponseBodySize); err != nil {
|
---|
1466 | c.releaseReader(br)
|
---|
1467 | c.closeConn(cc)
|
---|
1468 | // Don't retry in case of ErrBodyTooLarge since we will just get the same again.
|
---|
1469 | retry := err != ErrBodyTooLarge
|
---|
1470 | return retry, err
|
---|
1471 | }
|
---|
1472 | c.releaseReader(br)
|
---|
1473 |
|
---|
1474 | if resetConnection || req.ConnectionClose() || resp.ConnectionClose() {
|
---|
1475 | c.closeConn(cc)
|
---|
1476 | } else {
|
---|
1477 | c.releaseConn(cc)
|
---|
1478 | }
|
---|
1479 |
|
---|
1480 | return false, err
|
---|
1481 | }
|
---|
1482 |
|
---|
1483 | var (
|
---|
1484 | // ErrNoFreeConns is returned when no free connections available
|
---|
1485 | // to the given host.
|
---|
1486 | //
|
---|
1487 | // Increase the allowed number of connections per host if you
|
---|
1488 | // see this error.
|
---|
1489 | ErrNoFreeConns = errors.New("no free connections available to host")
|
---|
1490 |
|
---|
1491 | // ErrConnectionClosed may be returned from client methods if the server
|
---|
1492 | // closes connection before returning the first response byte.
|
---|
1493 | //
|
---|
1494 | // If you see this error, then either fix the server by returning
|
---|
1495 | // 'Connection: close' response header before closing the connection
|
---|
1496 | // or add 'Connection: close' request header before sending requests
|
---|
1497 | // to broken server.
|
---|
1498 | ErrConnectionClosed = errors.New("the server closed connection before returning the first response byte. " +
|
---|
1499 | "Make sure the server returns 'Connection: close' response header before closing the connection")
|
---|
1500 | )
|
---|
1501 |
|
---|
1502 | type timeoutError struct{}
|
---|
1503 |
|
---|
1504 | func (e *timeoutError) Error() string {
|
---|
1505 | return "timeout"
|
---|
1506 | }
|
---|
1507 |
|
---|
1508 | // Only implement the Timeout() function of the net.Error interface.
|
---|
1509 | // This allows for checks like:
|
---|
1510 | //
|
---|
1511 | // if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
|
---|
1512 | func (e *timeoutError) Timeout() bool {
|
---|
1513 | return true
|
---|
1514 | }
|
---|
1515 |
|
---|
1516 | // ErrTimeout is returned from timed out calls.
|
---|
1517 | var ErrTimeout = &timeoutError{}
|
---|
1518 |
|
---|
1519 | // SetMaxConns sets up the maximum number of connections which may be established to all hosts listed in Addr.
|
---|
1520 | func (c *HostClient) SetMaxConns(newMaxConns int) {
|
---|
1521 | c.connsLock.Lock()
|
---|
1522 | c.MaxConns = newMaxConns
|
---|
1523 | c.connsLock.Unlock()
|
---|
1524 | }
|
---|
1525 |
|
---|
1526 | func (c *HostClient) acquireConn(reqTimeout time.Duration, connectionClose bool) (cc *clientConn, err error) {
|
---|
1527 | createConn := false
|
---|
1528 | startCleaner := false
|
---|
1529 |
|
---|
1530 | var n int
|
---|
1531 | c.connsLock.Lock()
|
---|
1532 | n = len(c.conns)
|
---|
1533 | if n == 0 {
|
---|
1534 | maxConns := c.MaxConns
|
---|
1535 | if maxConns <= 0 {
|
---|
1536 | maxConns = DefaultMaxConnsPerHost
|
---|
1537 | }
|
---|
1538 | if c.connsCount < maxConns {
|
---|
1539 | c.connsCount++
|
---|
1540 | createConn = true
|
---|
1541 | if !c.connsCleanerRun && !connectionClose {
|
---|
1542 | startCleaner = true
|
---|
1543 | c.connsCleanerRun = true
|
---|
1544 | }
|
---|
1545 | }
|
---|
1546 | } else {
|
---|
1547 | n--
|
---|
1548 | cc = c.conns[n]
|
---|
1549 | c.conns[n] = nil
|
---|
1550 | c.conns = c.conns[:n]
|
---|
1551 | }
|
---|
1552 | c.connsLock.Unlock()
|
---|
1553 |
|
---|
1554 | if cc != nil {
|
---|
1555 | return cc, nil
|
---|
1556 | }
|
---|
1557 | if !createConn {
|
---|
1558 | if c.MaxConnWaitTimeout <= 0 {
|
---|
1559 | return nil, ErrNoFreeConns
|
---|
1560 | }
|
---|
1561 |
|
---|
1562 | // reqTimeout c.MaxConnWaitTimeout wait duration
|
---|
1563 | // d1 d2 min(d1, d2)
|
---|
1564 | // 0(not set) d2 d2
|
---|
1565 | // d1 0(don't wait) 0(don't wait)
|
---|
1566 | // 0(not set) d2 d2
|
---|
1567 | timeout := c.MaxConnWaitTimeout
|
---|
1568 | timeoutOverridden := false
|
---|
1569 | // reqTimeout == 0 means not set
|
---|
1570 | if reqTimeout > 0 && reqTimeout < timeout {
|
---|
1571 | timeout = reqTimeout
|
---|
1572 | timeoutOverridden = true
|
---|
1573 | }
|
---|
1574 |
|
---|
1575 | // wait for a free connection
|
---|
1576 | tc := AcquireTimer(timeout)
|
---|
1577 | defer ReleaseTimer(tc)
|
---|
1578 |
|
---|
1579 | w := &wantConn{
|
---|
1580 | ready: make(chan struct{}, 1),
|
---|
1581 | }
|
---|
1582 | defer func() {
|
---|
1583 | if err != nil {
|
---|
1584 | w.cancel(c, err)
|
---|
1585 | }
|
---|
1586 | }()
|
---|
1587 |
|
---|
1588 | c.queueForIdle(w)
|
---|
1589 |
|
---|
1590 | select {
|
---|
1591 | case <-w.ready:
|
---|
1592 | return w.conn, w.err
|
---|
1593 | case <-tc.C:
|
---|
1594 | if timeoutOverridden {
|
---|
1595 | return nil, ErrTimeout
|
---|
1596 | }
|
---|
1597 | return nil, ErrNoFreeConns
|
---|
1598 | }
|
---|
1599 | }
|
---|
1600 |
|
---|
1601 | if startCleaner {
|
---|
1602 | go c.connsCleaner()
|
---|
1603 | }
|
---|
1604 |
|
---|
1605 | conn, err := c.dialHostHard()
|
---|
1606 | if err != nil {
|
---|
1607 | c.decConnsCount()
|
---|
1608 | return nil, err
|
---|
1609 | }
|
---|
1610 | cc = acquireClientConn(conn)
|
---|
1611 |
|
---|
1612 | return cc, nil
|
---|
1613 | }
|
---|
1614 |
|
---|
1615 | func (c *HostClient) queueForIdle(w *wantConn) {
|
---|
1616 | c.connsLock.Lock()
|
---|
1617 | defer c.connsLock.Unlock()
|
---|
1618 | if c.connsWait == nil {
|
---|
1619 | c.connsWait = &wantConnQueue{}
|
---|
1620 | }
|
---|
1621 | c.connsWait.clearFront()
|
---|
1622 | c.connsWait.pushBack(w)
|
---|
1623 | }
|
---|
1624 |
|
---|
1625 | func (c *HostClient) dialConnFor(w *wantConn) {
|
---|
1626 | conn, err := c.dialHostHard()
|
---|
1627 | if err != nil {
|
---|
1628 | w.tryDeliver(nil, err)
|
---|
1629 | c.decConnsCount()
|
---|
1630 | return
|
---|
1631 | }
|
---|
1632 |
|
---|
1633 | cc := acquireClientConn(conn)
|
---|
1634 | delivered := w.tryDeliver(cc, nil)
|
---|
1635 | if !delivered {
|
---|
1636 | // not delivered, return idle connection
|
---|
1637 | c.releaseConn(cc)
|
---|
1638 | }
|
---|
1639 | }
|
---|
1640 |
|
---|
1641 | // CloseIdleConnections closes any connections which were previously
|
---|
1642 | // connected from previous requests but are now sitting idle in a
|
---|
1643 | // "keep-alive" state. It does not interrupt any connections currently
|
---|
1644 | // in use.
|
---|
1645 | func (c *HostClient) CloseIdleConnections() {
|
---|
1646 | c.connsLock.Lock()
|
---|
1647 | scratch := append([]*clientConn{}, c.conns...)
|
---|
1648 | for i := range c.conns {
|
---|
1649 | c.conns[i] = nil
|
---|
1650 | }
|
---|
1651 | c.conns = c.conns[:0]
|
---|
1652 | c.connsLock.Unlock()
|
---|
1653 |
|
---|
1654 | for _, cc := range scratch {
|
---|
1655 | c.closeConn(cc)
|
---|
1656 | }
|
---|
1657 | }
|
---|
1658 |
|
---|
1659 | func (c *HostClient) connsCleaner() {
|
---|
1660 | var (
|
---|
1661 | scratch []*clientConn
|
---|
1662 | maxIdleConnDuration = c.MaxIdleConnDuration
|
---|
1663 | )
|
---|
1664 | if maxIdleConnDuration <= 0 {
|
---|
1665 | maxIdleConnDuration = DefaultMaxIdleConnDuration
|
---|
1666 | }
|
---|
1667 | for {
|
---|
1668 | currentTime := time.Now()
|
---|
1669 |
|
---|
1670 | // Determine idle connections to be closed.
|
---|
1671 | c.connsLock.Lock()
|
---|
1672 | conns := c.conns
|
---|
1673 | n := len(conns)
|
---|
1674 | i := 0
|
---|
1675 | for i < n && currentTime.Sub(conns[i].lastUseTime) > maxIdleConnDuration {
|
---|
1676 | i++
|
---|
1677 | }
|
---|
1678 | sleepFor := maxIdleConnDuration
|
---|
1679 | if i < n {
|
---|
1680 | // + 1 so we actually sleep past the expiration time and not up to it.
|
---|
1681 | // Otherwise the > check above would still fail.
|
---|
1682 | sleepFor = maxIdleConnDuration - currentTime.Sub(conns[i].lastUseTime) + 1
|
---|
1683 | }
|
---|
1684 | scratch = append(scratch[:0], conns[:i]...)
|
---|
1685 | if i > 0 {
|
---|
1686 | m := copy(conns, conns[i:])
|
---|
1687 | for i = m; i < n; i++ {
|
---|
1688 | conns[i] = nil
|
---|
1689 | }
|
---|
1690 | c.conns = conns[:m]
|
---|
1691 | }
|
---|
1692 | c.connsLock.Unlock()
|
---|
1693 |
|
---|
1694 | // Close idle connections.
|
---|
1695 | for i, cc := range scratch {
|
---|
1696 | c.closeConn(cc)
|
---|
1697 | scratch[i] = nil
|
---|
1698 | }
|
---|
1699 |
|
---|
1700 | // Determine whether to stop the connsCleaner.
|
---|
1701 | c.connsLock.Lock()
|
---|
1702 | mustStop := c.connsCount == 0
|
---|
1703 | if mustStop {
|
---|
1704 | c.connsCleanerRun = false
|
---|
1705 | }
|
---|
1706 | c.connsLock.Unlock()
|
---|
1707 | if mustStop {
|
---|
1708 | break
|
---|
1709 | }
|
---|
1710 |
|
---|
1711 | time.Sleep(sleepFor)
|
---|
1712 | }
|
---|
1713 | }
|
---|
1714 |
|
---|
1715 | func (c *HostClient) closeConn(cc *clientConn) {
|
---|
1716 | c.decConnsCount()
|
---|
1717 | cc.c.Close()
|
---|
1718 | releaseClientConn(cc)
|
---|
1719 | }
|
---|
1720 |
|
---|
1721 | func (c *HostClient) decConnsCount() {
|
---|
1722 | if c.MaxConnWaitTimeout <= 0 {
|
---|
1723 | c.connsLock.Lock()
|
---|
1724 | c.connsCount--
|
---|
1725 | c.connsLock.Unlock()
|
---|
1726 | return
|
---|
1727 | }
|
---|
1728 |
|
---|
1729 | c.connsLock.Lock()
|
---|
1730 | defer c.connsLock.Unlock()
|
---|
1731 | dialed := false
|
---|
1732 | if q := c.connsWait; q != nil && q.len() > 0 {
|
---|
1733 | for q.len() > 0 {
|
---|
1734 | w := q.popFront()
|
---|
1735 | if w.waiting() {
|
---|
1736 | go c.dialConnFor(w)
|
---|
1737 | dialed = true
|
---|
1738 | break
|
---|
1739 | }
|
---|
1740 | }
|
---|
1741 | }
|
---|
1742 | if !dialed {
|
---|
1743 | c.connsCount--
|
---|
1744 | }
|
---|
1745 | }
|
---|
1746 |
|
---|
1747 | // ConnsCount returns connection count of HostClient
|
---|
1748 | func (c *HostClient) ConnsCount() int {
|
---|
1749 | c.connsLock.Lock()
|
---|
1750 | defer c.connsLock.Unlock()
|
---|
1751 |
|
---|
1752 | return c.connsCount
|
---|
1753 | }
|
---|
1754 |
|
---|
1755 | func acquireClientConn(conn net.Conn) *clientConn {
|
---|
1756 | v := clientConnPool.Get()
|
---|
1757 | if v == nil {
|
---|
1758 | v = &clientConn{}
|
---|
1759 | }
|
---|
1760 | cc := v.(*clientConn)
|
---|
1761 | cc.c = conn
|
---|
1762 | cc.createdTime = time.Now()
|
---|
1763 | return cc
|
---|
1764 | }
|
---|
1765 |
|
---|
1766 | func releaseClientConn(cc *clientConn) {
|
---|
1767 | // Reset all fields.
|
---|
1768 | *cc = clientConn{}
|
---|
1769 | clientConnPool.Put(cc)
|
---|
1770 | }
|
---|
1771 |
|
---|
1772 | var clientConnPool sync.Pool
|
---|
1773 |
|
---|
1774 | func (c *HostClient) releaseConn(cc *clientConn) {
|
---|
1775 | cc.lastUseTime = time.Now()
|
---|
1776 | if c.MaxConnWaitTimeout <= 0 {
|
---|
1777 | c.connsLock.Lock()
|
---|
1778 | c.conns = append(c.conns, cc)
|
---|
1779 | c.connsLock.Unlock()
|
---|
1780 | return
|
---|
1781 | }
|
---|
1782 |
|
---|
1783 | // try to deliver an idle connection to a *wantConn
|
---|
1784 | c.connsLock.Lock()
|
---|
1785 | defer c.connsLock.Unlock()
|
---|
1786 | delivered := false
|
---|
1787 | if q := c.connsWait; q != nil && q.len() > 0 {
|
---|
1788 | for q.len() > 0 {
|
---|
1789 | w := q.popFront()
|
---|
1790 | if w.waiting() {
|
---|
1791 | delivered = w.tryDeliver(cc, nil)
|
---|
1792 | break
|
---|
1793 | }
|
---|
1794 | }
|
---|
1795 | }
|
---|
1796 | if !delivered {
|
---|
1797 | c.conns = append(c.conns, cc)
|
---|
1798 | }
|
---|
1799 | }
|
---|
1800 |
|
---|
1801 | func (c *HostClient) acquireWriter(conn net.Conn) *bufio.Writer {
|
---|
1802 | var v interface{}
|
---|
1803 | if c.clientWriterPool != nil {
|
---|
1804 | v = c.clientWriterPool.Get()
|
---|
1805 | if v == nil {
|
---|
1806 | n := c.WriteBufferSize
|
---|
1807 | if n <= 0 {
|
---|
1808 | n = defaultWriteBufferSize
|
---|
1809 | }
|
---|
1810 | return bufio.NewWriterSize(conn, n)
|
---|
1811 | }
|
---|
1812 | } else {
|
---|
1813 | v = c.writerPool.Get()
|
---|
1814 | if v == nil {
|
---|
1815 | n := c.WriteBufferSize
|
---|
1816 | if n <= 0 {
|
---|
1817 | n = defaultWriteBufferSize
|
---|
1818 | }
|
---|
1819 | return bufio.NewWriterSize(conn, n)
|
---|
1820 | }
|
---|
1821 | }
|
---|
1822 |
|
---|
1823 | bw := v.(*bufio.Writer)
|
---|
1824 | bw.Reset(conn)
|
---|
1825 | return bw
|
---|
1826 | }
|
---|
1827 |
|
---|
1828 | func (c *HostClient) releaseWriter(bw *bufio.Writer) {
|
---|
1829 | if c.clientWriterPool != nil {
|
---|
1830 | c.clientWriterPool.Put(bw)
|
---|
1831 | } else {
|
---|
1832 | c.writerPool.Put(bw)
|
---|
1833 | }
|
---|
1834 | }
|
---|
1835 |
|
---|
1836 | func (c *HostClient) acquireReader(conn net.Conn) *bufio.Reader {
|
---|
1837 | var v interface{}
|
---|
1838 | if c.clientReaderPool != nil {
|
---|
1839 | v = c.clientReaderPool.Get()
|
---|
1840 | if v == nil {
|
---|
1841 | n := c.ReadBufferSize
|
---|
1842 | if n <= 0 {
|
---|
1843 | n = defaultReadBufferSize
|
---|
1844 | }
|
---|
1845 | return bufio.NewReaderSize(conn, n)
|
---|
1846 | }
|
---|
1847 | } else {
|
---|
1848 | v = c.readerPool.Get()
|
---|
1849 | if v == nil {
|
---|
1850 | n := c.ReadBufferSize
|
---|
1851 | if n <= 0 {
|
---|
1852 | n = defaultReadBufferSize
|
---|
1853 | }
|
---|
1854 | return bufio.NewReaderSize(conn, n)
|
---|
1855 | }
|
---|
1856 | }
|
---|
1857 |
|
---|
1858 | br := v.(*bufio.Reader)
|
---|
1859 | br.Reset(conn)
|
---|
1860 | return br
|
---|
1861 | }
|
---|
1862 |
|
---|
1863 | func (c *HostClient) releaseReader(br *bufio.Reader) {
|
---|
1864 | if c.clientReaderPool != nil {
|
---|
1865 | c.clientReaderPool.Put(br)
|
---|
1866 | } else {
|
---|
1867 | c.readerPool.Put(br)
|
---|
1868 | }
|
---|
1869 | }
|
---|
1870 |
|
---|
1871 | func newClientTLSConfig(c *tls.Config, addr string) *tls.Config {
|
---|
1872 | if c == nil {
|
---|
1873 | c = &tls.Config{}
|
---|
1874 | } else {
|
---|
1875 | c = c.Clone()
|
---|
1876 | }
|
---|
1877 |
|
---|
1878 | if c.ClientSessionCache == nil {
|
---|
1879 | c.ClientSessionCache = tls.NewLRUClientSessionCache(0)
|
---|
1880 | }
|
---|
1881 |
|
---|
1882 | if len(c.ServerName) == 0 {
|
---|
1883 | serverName := tlsServerName(addr)
|
---|
1884 | if serverName == "*" {
|
---|
1885 | c.InsecureSkipVerify = true
|
---|
1886 | } else {
|
---|
1887 | c.ServerName = serverName
|
---|
1888 | }
|
---|
1889 | }
|
---|
1890 | return c
|
---|
1891 | }
|
---|
1892 |
|
---|
1893 | func tlsServerName(addr string) string {
|
---|
1894 | if !strings.Contains(addr, ":") {
|
---|
1895 | return addr
|
---|
1896 | }
|
---|
1897 | host, _, err := net.SplitHostPort(addr)
|
---|
1898 | if err != nil {
|
---|
1899 | return "*"
|
---|
1900 | }
|
---|
1901 | return host
|
---|
1902 | }
|
---|
1903 |
|
---|
1904 | func (c *HostClient) nextAddr() string {
|
---|
1905 | c.addrsLock.Lock()
|
---|
1906 | if c.addrs == nil {
|
---|
1907 | c.addrs = strings.Split(c.Addr, ",")
|
---|
1908 | }
|
---|
1909 | addr := c.addrs[0]
|
---|
1910 | if len(c.addrs) > 1 {
|
---|
1911 | addr = c.addrs[c.addrIdx%uint32(len(c.addrs))]
|
---|
1912 | c.addrIdx++
|
---|
1913 | }
|
---|
1914 | c.addrsLock.Unlock()
|
---|
1915 | return addr
|
---|
1916 | }
|
---|
1917 |
|
---|
1918 | func (c *HostClient) dialHostHard() (conn net.Conn, err error) {
|
---|
1919 | // attempt to dial all the available hosts before giving up.
|
---|
1920 |
|
---|
1921 | c.addrsLock.Lock()
|
---|
1922 | n := len(c.addrs)
|
---|
1923 | c.addrsLock.Unlock()
|
---|
1924 |
|
---|
1925 | if n == 0 {
|
---|
1926 | // It looks like c.addrs isn't initialized yet.
|
---|
1927 | n = 1
|
---|
1928 | }
|
---|
1929 |
|
---|
1930 | timeout := c.ReadTimeout + c.WriteTimeout
|
---|
1931 | if timeout <= 0 {
|
---|
1932 | timeout = DefaultDialTimeout
|
---|
1933 | }
|
---|
1934 | deadline := time.Now().Add(timeout)
|
---|
1935 | for n > 0 {
|
---|
1936 | addr := c.nextAddr()
|
---|
1937 | tlsConfig := c.cachedTLSConfig(addr)
|
---|
1938 | conn, err = dialAddr(addr, c.Dial, c.DialDualStack, c.IsTLS, tlsConfig, c.WriteTimeout)
|
---|
1939 | if err == nil {
|
---|
1940 | return conn, nil
|
---|
1941 | }
|
---|
1942 | if time.Since(deadline) >= 0 {
|
---|
1943 | break
|
---|
1944 | }
|
---|
1945 | n--
|
---|
1946 | }
|
---|
1947 | return nil, err
|
---|
1948 | }
|
---|
1949 |
|
---|
1950 | func (c *HostClient) cachedTLSConfig(addr string) *tls.Config {
|
---|
1951 | if !c.IsTLS {
|
---|
1952 | return nil
|
---|
1953 | }
|
---|
1954 |
|
---|
1955 | c.tlsConfigMapLock.Lock()
|
---|
1956 | if c.tlsConfigMap == nil {
|
---|
1957 | c.tlsConfigMap = make(map[string]*tls.Config)
|
---|
1958 | }
|
---|
1959 | cfg := c.tlsConfigMap[addr]
|
---|
1960 | if cfg == nil {
|
---|
1961 | cfg = newClientTLSConfig(c.TLSConfig, addr)
|
---|
1962 | c.tlsConfigMap[addr] = cfg
|
---|
1963 | }
|
---|
1964 | c.tlsConfigMapLock.Unlock()
|
---|
1965 |
|
---|
1966 | return cfg
|
---|
1967 | }
|
---|
1968 |
|
---|
1969 | // ErrTLSHandshakeTimeout indicates there is a timeout from tls handshake.
|
---|
1970 | var ErrTLSHandshakeTimeout = errors.New("tls handshake timed out")
|
---|
1971 |
|
---|
1972 | var timeoutErrorChPool sync.Pool
|
---|
1973 |
|
---|
1974 | func tlsClientHandshake(rawConn net.Conn, tlsConfig *tls.Config, timeout time.Duration) (net.Conn, error) {
|
---|
1975 | tc := AcquireTimer(timeout)
|
---|
1976 | defer ReleaseTimer(tc)
|
---|
1977 |
|
---|
1978 | var ch chan error
|
---|
1979 | chv := timeoutErrorChPool.Get()
|
---|
1980 | if chv == nil {
|
---|
1981 | chv = make(chan error)
|
---|
1982 | }
|
---|
1983 | ch = chv.(chan error)
|
---|
1984 | defer timeoutErrorChPool.Put(chv)
|
---|
1985 |
|
---|
1986 | conn := tls.Client(rawConn, tlsConfig)
|
---|
1987 |
|
---|
1988 | go func() {
|
---|
1989 | ch <- conn.Handshake()
|
---|
1990 | }()
|
---|
1991 |
|
---|
1992 | select {
|
---|
1993 | case <-tc.C:
|
---|
1994 | rawConn.Close()
|
---|
1995 | <-ch
|
---|
1996 | return nil, ErrTLSHandshakeTimeout
|
---|
1997 | case err := <-ch:
|
---|
1998 | if err != nil {
|
---|
1999 | rawConn.Close()
|
---|
2000 | return nil, err
|
---|
2001 | }
|
---|
2002 | return conn, nil
|
---|
2003 | }
|
---|
2004 | }
|
---|
2005 |
|
---|
2006 | func dialAddr(addr string, dial DialFunc, dialDualStack, isTLS bool, tlsConfig *tls.Config, timeout time.Duration) (net.Conn, error) {
|
---|
2007 | if dial == nil {
|
---|
2008 | if dialDualStack {
|
---|
2009 | dial = DialDualStack
|
---|
2010 | } else {
|
---|
2011 | dial = Dial
|
---|
2012 | }
|
---|
2013 | addr = addMissingPort(addr, isTLS)
|
---|
2014 | }
|
---|
2015 | conn, err := dial(addr)
|
---|
2016 | if err != nil {
|
---|
2017 | return nil, err
|
---|
2018 | }
|
---|
2019 | if conn == nil {
|
---|
2020 | panic("BUG: DialFunc returned (nil, nil)")
|
---|
2021 | }
|
---|
2022 | _, isTLSAlready := conn.(*tls.Conn)
|
---|
2023 | if isTLS && !isTLSAlready {
|
---|
2024 | if timeout == 0 {
|
---|
2025 | return tls.Client(conn, tlsConfig), nil
|
---|
2026 | }
|
---|
2027 | return tlsClientHandshake(conn, tlsConfig, timeout)
|
---|
2028 | }
|
---|
2029 | return conn, nil
|
---|
2030 | }
|
---|
2031 |
|
---|
2032 | func (c *HostClient) getClientName() []byte {
|
---|
2033 | v := c.clientName.Load()
|
---|
2034 | var clientName []byte
|
---|
2035 | if v == nil {
|
---|
2036 | clientName = []byte(c.Name)
|
---|
2037 | if len(clientName) == 0 && !c.NoDefaultUserAgentHeader {
|
---|
2038 | clientName = defaultUserAgent
|
---|
2039 | }
|
---|
2040 | c.clientName.Store(clientName)
|
---|
2041 | } else {
|
---|
2042 | clientName = v.([]byte)
|
---|
2043 | }
|
---|
2044 | return clientName
|
---|
2045 | }
|
---|
2046 |
|
---|
2047 | func addMissingPort(addr string, isTLS bool) string {
|
---|
2048 | n := strings.Index(addr, ":")
|
---|
2049 | if n >= 0 {
|
---|
2050 | return addr
|
---|
2051 | }
|
---|
2052 | port := 80
|
---|
2053 | if isTLS {
|
---|
2054 | port = 443
|
---|
2055 | }
|
---|
2056 | return net.JoinHostPort(addr, strconv.Itoa(port))
|
---|
2057 | }
|
---|
2058 |
|
---|
2059 | // A wantConn records state about a wanted connection
|
---|
2060 | // (that is, an active call to getConn).
|
---|
2061 | // The conn may be gotten by dialing or by finding an idle connection,
|
---|
2062 | // or a cancellation may make the conn no longer wanted.
|
---|
2063 | // These three options are racing against each other and use
|
---|
2064 | // wantConn to coordinate and agree about the winning outcome.
|
---|
2065 | //
|
---|
2066 | // inspired by net/http/transport.go
|
---|
2067 | type wantConn struct {
|
---|
2068 | ready chan struct{}
|
---|
2069 | mu sync.Mutex // protects conn, err, close(ready)
|
---|
2070 | conn *clientConn
|
---|
2071 | err error
|
---|
2072 | }
|
---|
2073 |
|
---|
2074 | // waiting reports whether w is still waiting for an answer (connection or error).
|
---|
2075 | func (w *wantConn) waiting() bool {
|
---|
2076 | select {
|
---|
2077 | case <-w.ready:
|
---|
2078 | return false
|
---|
2079 | default:
|
---|
2080 | return true
|
---|
2081 | }
|
---|
2082 | }
|
---|
2083 |
|
---|
2084 | // tryDeliver attempts to deliver conn, err to w and reports whether it succeeded.
|
---|
2085 | func (w *wantConn) tryDeliver(conn *clientConn, err error) bool {
|
---|
2086 | w.mu.Lock()
|
---|
2087 | defer w.mu.Unlock()
|
---|
2088 |
|
---|
2089 | if w.conn != nil || w.err != nil {
|
---|
2090 | return false
|
---|
2091 | }
|
---|
2092 | w.conn = conn
|
---|
2093 | w.err = err
|
---|
2094 | if w.conn == nil && w.err == nil {
|
---|
2095 | panic("fasthttp: internal error: misuse of tryDeliver")
|
---|
2096 | }
|
---|
2097 | close(w.ready)
|
---|
2098 | return true
|
---|
2099 | }
|
---|
2100 |
|
---|
2101 | // cancel marks w as no longer wanting a result (for example, due to cancellation).
|
---|
2102 | // If a connection has been delivered already, cancel returns it with c.releaseConn.
|
---|
2103 | func (w *wantConn) cancel(c *HostClient, err error) {
|
---|
2104 | w.mu.Lock()
|
---|
2105 | if w.conn == nil && w.err == nil {
|
---|
2106 | close(w.ready) // catch misbehavior in future delivery
|
---|
2107 | }
|
---|
2108 |
|
---|
2109 | conn := w.conn
|
---|
2110 | w.conn = nil
|
---|
2111 | w.err = err
|
---|
2112 | w.mu.Unlock()
|
---|
2113 |
|
---|
2114 | if conn != nil {
|
---|
2115 | c.releaseConn(conn)
|
---|
2116 | }
|
---|
2117 | }
|
---|
2118 |
|
---|
2119 | // A wantConnQueue is a queue of wantConns.
|
---|
2120 | //
|
---|
2121 | // inspired by net/http/transport.go
|
---|
2122 | type wantConnQueue struct {
|
---|
2123 | // This is a queue, not a deque.
|
---|
2124 | // It is split into two stages - head[headPos:] and tail.
|
---|
2125 | // popFront is trivial (headPos++) on the first stage, and
|
---|
2126 | // pushBack is trivial (append) on the second stage.
|
---|
2127 | // If the first stage is empty, popFront can swap the
|
---|
2128 | // first and second stages to remedy the situation.
|
---|
2129 | //
|
---|
2130 | // This two-stage split is analogous to the use of two lists
|
---|
2131 | // in Okasaki's purely functional queue but without the
|
---|
2132 | // overhead of reversing the list when swapping stages.
|
---|
2133 | head []*wantConn
|
---|
2134 | headPos int
|
---|
2135 | tail []*wantConn
|
---|
2136 | }
|
---|
2137 |
|
---|
2138 | // len returns the number of items in the queue.
|
---|
2139 | func (q *wantConnQueue) len() int {
|
---|
2140 | return len(q.head) - q.headPos + len(q.tail)
|
---|
2141 | }
|
---|
2142 |
|
---|
2143 | // pushBack adds w to the back of the queue.
|
---|
2144 | func (q *wantConnQueue) pushBack(w *wantConn) {
|
---|
2145 | q.tail = append(q.tail, w)
|
---|
2146 | }
|
---|
2147 |
|
---|
2148 | // popFront removes and returns the wantConn at the front of the queue.
|
---|
2149 | func (q *wantConnQueue) popFront() *wantConn {
|
---|
2150 | if q.headPos >= len(q.head) {
|
---|
2151 | if len(q.tail) == 0 {
|
---|
2152 | return nil
|
---|
2153 | }
|
---|
2154 | // Pick up tail as new head, clear tail.
|
---|
2155 | q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
|
---|
2156 | }
|
---|
2157 |
|
---|
2158 | w := q.head[q.headPos]
|
---|
2159 | q.head[q.headPos] = nil
|
---|
2160 | q.headPos++
|
---|
2161 | return w
|
---|
2162 | }
|
---|
2163 |
|
---|
2164 | // peekFront returns the wantConn at the front of the queue without removing it.
|
---|
2165 | func (q *wantConnQueue) peekFront() *wantConn {
|
---|
2166 | if q.headPos < len(q.head) {
|
---|
2167 | return q.head[q.headPos]
|
---|
2168 | }
|
---|
2169 | if len(q.tail) > 0 {
|
---|
2170 | return q.tail[0]
|
---|
2171 | }
|
---|
2172 | return nil
|
---|
2173 | }
|
---|
2174 |
|
---|
2175 | // cleanFront pops any wantConns that are no longer waiting from the head of the
|
---|
2176 | // queue, reporting whether any were popped.
|
---|
2177 | func (q *wantConnQueue) clearFront() (cleaned bool) {
|
---|
2178 | for {
|
---|
2179 | w := q.peekFront()
|
---|
2180 | if w == nil || w.waiting() {
|
---|
2181 | return cleaned
|
---|
2182 | }
|
---|
2183 | q.popFront()
|
---|
2184 | cleaned = true
|
---|
2185 | }
|
---|
2186 | }
|
---|
2187 |
|
---|
2188 | // PipelineClient pipelines requests over a limited set of concurrent
|
---|
2189 | // connections to the given Addr.
|
---|
2190 | //
|
---|
2191 | // This client may be used in highly loaded HTTP-based RPC systems for reducing
|
---|
2192 | // context switches and network level overhead.
|
---|
2193 | // See https://en.wikipedia.org/wiki/HTTP_pipelining for details.
|
---|
2194 | //
|
---|
2195 | // It is forbidden copying PipelineClient instances. Create new instances
|
---|
2196 | // instead.
|
---|
2197 | //
|
---|
2198 | // It is safe calling PipelineClient methods from concurrently running
|
---|
2199 | // goroutines.
|
---|
2200 | type PipelineClient struct {
|
---|
2201 | noCopy noCopy //nolint:unused,structcheck
|
---|
2202 |
|
---|
2203 | // Address of the host to connect to.
|
---|
2204 | Addr string
|
---|
2205 |
|
---|
2206 | // PipelineClient name. Used in User-Agent request header.
|
---|
2207 | Name string
|
---|
2208 |
|
---|
2209 | // NoDefaultUserAgentHeader when set to true, causes the default
|
---|
2210 | // User-Agent header to be excluded from the Request.
|
---|
2211 | NoDefaultUserAgentHeader bool
|
---|
2212 |
|
---|
2213 | // The maximum number of concurrent connections to the Addr.
|
---|
2214 | //
|
---|
2215 | // A single connection is used by default.
|
---|
2216 | MaxConns int
|
---|
2217 |
|
---|
2218 | // The maximum number of pending pipelined requests over
|
---|
2219 | // a single connection to Addr.
|
---|
2220 | //
|
---|
2221 | // DefaultMaxPendingRequests is used by default.
|
---|
2222 | MaxPendingRequests int
|
---|
2223 |
|
---|
2224 | // The maximum delay before sending pipelined requests as a batch
|
---|
2225 | // to the server.
|
---|
2226 | //
|
---|
2227 | // By default requests are sent immediately to the server.
|
---|
2228 | MaxBatchDelay time.Duration
|
---|
2229 |
|
---|
2230 | // Callback for connection establishing to the host.
|
---|
2231 | //
|
---|
2232 | // Default Dial is used if not set.
|
---|
2233 | Dial DialFunc
|
---|
2234 |
|
---|
2235 | // Attempt to connect to both ipv4 and ipv6 host addresses
|
---|
2236 | // if set to true.
|
---|
2237 | //
|
---|
2238 | // This option is used only if default TCP dialer is used,
|
---|
2239 | // i.e. if Dial is blank.
|
---|
2240 | //
|
---|
2241 | // By default client connects only to ipv4 addresses,
|
---|
2242 | // since unfortunately ipv6 remains broken in many networks worldwide :)
|
---|
2243 | DialDualStack bool
|
---|
2244 |
|
---|
2245 | // Response header names are passed as-is without normalization
|
---|
2246 | // if this option is set.
|
---|
2247 | //
|
---|
2248 | // Disabled header names' normalization may be useful only for proxying
|
---|
2249 | // responses to other clients expecting case-sensitive
|
---|
2250 | // header names. See https://github.com/valyala/fasthttp/issues/57
|
---|
2251 | // for details.
|
---|
2252 | //
|
---|
2253 | // By default request and response header names are normalized, i.e.
|
---|
2254 | // The first letter and the first letters following dashes
|
---|
2255 | // are uppercased, while all the other letters are lowercased.
|
---|
2256 | // Examples:
|
---|
2257 | //
|
---|
2258 | // * HOST -> Host
|
---|
2259 | // * content-type -> Content-Type
|
---|
2260 | // * cONTENT-lenGTH -> Content-Length
|
---|
2261 | DisableHeaderNamesNormalizing bool
|
---|
2262 |
|
---|
2263 | // Path values are sent as-is without normalization
|
---|
2264 | //
|
---|
2265 | // Disabled path normalization may be useful for proxying incoming requests
|
---|
2266 | // to servers that are expecting paths to be forwarded as-is.
|
---|
2267 | //
|
---|
2268 | // By default path values are normalized, i.e.
|
---|
2269 | // extra slashes are removed, special characters are encoded.
|
---|
2270 | DisablePathNormalizing bool
|
---|
2271 |
|
---|
2272 | // Whether to use TLS (aka SSL or HTTPS) for host connections.
|
---|
2273 | IsTLS bool
|
---|
2274 |
|
---|
2275 | // Optional TLS config.
|
---|
2276 | TLSConfig *tls.Config
|
---|
2277 |
|
---|
2278 | // Idle connection to the host is closed after this duration.
|
---|
2279 | //
|
---|
2280 | // By default idle connection is closed after
|
---|
2281 | // DefaultMaxIdleConnDuration.
|
---|
2282 | MaxIdleConnDuration time.Duration
|
---|
2283 |
|
---|
2284 | // Buffer size for responses' reading.
|
---|
2285 | // This also limits the maximum header size.
|
---|
2286 | //
|
---|
2287 | // Default buffer size is used if 0.
|
---|
2288 | ReadBufferSize int
|
---|
2289 |
|
---|
2290 | // Buffer size for requests' writing.
|
---|
2291 | //
|
---|
2292 | // Default buffer size is used if 0.
|
---|
2293 | WriteBufferSize int
|
---|
2294 |
|
---|
2295 | // Maximum duration for full response reading (including body).
|
---|
2296 | //
|
---|
2297 | // By default response read timeout is unlimited.
|
---|
2298 | ReadTimeout time.Duration
|
---|
2299 |
|
---|
2300 | // Maximum duration for full request writing (including body).
|
---|
2301 | //
|
---|
2302 | // By default request write timeout is unlimited.
|
---|
2303 | WriteTimeout time.Duration
|
---|
2304 |
|
---|
2305 | // Logger for logging client errors.
|
---|
2306 | //
|
---|
2307 | // By default standard logger from log package is used.
|
---|
2308 | Logger Logger
|
---|
2309 |
|
---|
2310 | connClients []*pipelineConnClient
|
---|
2311 | connClientsLock sync.Mutex
|
---|
2312 | }
|
---|
2313 |
|
---|
2314 | type pipelineConnClient struct {
|
---|
2315 | noCopy noCopy //nolint:unused,structcheck
|
---|
2316 |
|
---|
2317 | Addr string
|
---|
2318 | Name string
|
---|
2319 | NoDefaultUserAgentHeader bool
|
---|
2320 | MaxPendingRequests int
|
---|
2321 | MaxBatchDelay time.Duration
|
---|
2322 | Dial DialFunc
|
---|
2323 | DialDualStack bool
|
---|
2324 | DisableHeaderNamesNormalizing bool
|
---|
2325 | DisablePathNormalizing bool
|
---|
2326 | IsTLS bool
|
---|
2327 | TLSConfig *tls.Config
|
---|
2328 | MaxIdleConnDuration time.Duration
|
---|
2329 | ReadBufferSize int
|
---|
2330 | WriteBufferSize int
|
---|
2331 | ReadTimeout time.Duration
|
---|
2332 | WriteTimeout time.Duration
|
---|
2333 | Logger Logger
|
---|
2334 |
|
---|
2335 | workPool sync.Pool
|
---|
2336 |
|
---|
2337 | chLock sync.Mutex
|
---|
2338 | chW chan *pipelineWork
|
---|
2339 | chR chan *pipelineWork
|
---|
2340 |
|
---|
2341 | tlsConfigLock sync.Mutex
|
---|
2342 | tlsConfig *tls.Config
|
---|
2343 | clientName atomic.Value
|
---|
2344 | }
|
---|
2345 |
|
---|
2346 | type pipelineWork struct {
|
---|
2347 | reqCopy Request
|
---|
2348 | respCopy Response
|
---|
2349 | req *Request
|
---|
2350 | resp *Response
|
---|
2351 | t *time.Timer
|
---|
2352 | deadline time.Time
|
---|
2353 | err error
|
---|
2354 | done chan struct{}
|
---|
2355 | }
|
---|
2356 |
|
---|
2357 | // DoTimeout performs the given request and waits for response during
|
---|
2358 | // the given timeout duration.
|
---|
2359 | //
|
---|
2360 | // Request must contain at least non-zero RequestURI with full url (including
|
---|
2361 | // scheme and host) or non-zero Host header + RequestURI.
|
---|
2362 | //
|
---|
2363 | // The function doesn't follow redirects.
|
---|
2364 | //
|
---|
2365 | // Response is ignored if resp is nil.
|
---|
2366 | //
|
---|
2367 | // ErrTimeout is returned if the response wasn't returned during
|
---|
2368 | // the given timeout.
|
---|
2369 | //
|
---|
2370 | // It is recommended obtaining req and resp via AcquireRequest
|
---|
2371 | // and AcquireResponse in performance-critical code.
|
---|
2372 | //
|
---|
2373 | // Warning: DoTimeout does not terminate the request itself. The request will
|
---|
2374 | // continue in the background and the response will be discarded.
|
---|
2375 | // If requests take too long and the connection pool gets filled up please
|
---|
2376 | // try setting a ReadTimeout.
|
---|
2377 | func (c *PipelineClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
|
---|
2378 | return c.DoDeadline(req, resp, time.Now().Add(timeout))
|
---|
2379 | }
|
---|
2380 |
|
---|
2381 | // DoDeadline performs the given request and waits for response until
|
---|
2382 | // the given deadline.
|
---|
2383 | //
|
---|
2384 | // Request must contain at least non-zero RequestURI with full url (including
|
---|
2385 | // scheme and host) or non-zero Host header + RequestURI.
|
---|
2386 | //
|
---|
2387 | // The function doesn't follow redirects.
|
---|
2388 | //
|
---|
2389 | // Response is ignored if resp is nil.
|
---|
2390 | //
|
---|
2391 | // ErrTimeout is returned if the response wasn't returned until
|
---|
2392 | // the given deadline.
|
---|
2393 | //
|
---|
2394 | // It is recommended obtaining req and resp via AcquireRequest
|
---|
2395 | // and AcquireResponse in performance-critical code.
|
---|
2396 | func (c *PipelineClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
|
---|
2397 | return c.getConnClient().DoDeadline(req, resp, deadline)
|
---|
2398 | }
|
---|
2399 |
|
---|
2400 | func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
|
---|
2401 | c.init()
|
---|
2402 |
|
---|
2403 | timeout := -time.Since(deadline)
|
---|
2404 | if timeout < 0 {
|
---|
2405 | return ErrTimeout
|
---|
2406 | }
|
---|
2407 |
|
---|
2408 | if c.DisablePathNormalizing {
|
---|
2409 | req.URI().DisablePathNormalizing = true
|
---|
2410 | }
|
---|
2411 |
|
---|
2412 | userAgentOld := req.Header.UserAgent()
|
---|
2413 | if len(userAgentOld) == 0 {
|
---|
2414 | req.Header.userAgent = append(req.Header.userAgent[:0], c.getClientName()...)
|
---|
2415 | }
|
---|
2416 |
|
---|
2417 | w := acquirePipelineWork(&c.workPool, timeout)
|
---|
2418 | w.respCopy.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
|
---|
2419 | w.req = &w.reqCopy
|
---|
2420 | w.resp = &w.respCopy
|
---|
2421 |
|
---|
2422 | // Make a copy of the request in order to avoid data races on timeouts
|
---|
2423 | req.copyToSkipBody(&w.reqCopy)
|
---|
2424 | swapRequestBody(req, &w.reqCopy)
|
---|
2425 |
|
---|
2426 | // Put the request to outgoing queue
|
---|
2427 | select {
|
---|
2428 | case c.chW <- w:
|
---|
2429 | // Fast path: len(c.ch) < cap(c.ch)
|
---|
2430 | default:
|
---|
2431 | // Slow path
|
---|
2432 | select {
|
---|
2433 | case c.chW <- w:
|
---|
2434 | case <-w.t.C:
|
---|
2435 | releasePipelineWork(&c.workPool, w)
|
---|
2436 | return ErrTimeout
|
---|
2437 | }
|
---|
2438 | }
|
---|
2439 |
|
---|
2440 | // Wait for the response
|
---|
2441 | var err error
|
---|
2442 | select {
|
---|
2443 | case <-w.done:
|
---|
2444 | if resp != nil {
|
---|
2445 | w.respCopy.copyToSkipBody(resp)
|
---|
2446 | swapResponseBody(resp, &w.respCopy)
|
---|
2447 | }
|
---|
2448 | err = w.err
|
---|
2449 | releasePipelineWork(&c.workPool, w)
|
---|
2450 | case <-w.t.C:
|
---|
2451 | err = ErrTimeout
|
---|
2452 | }
|
---|
2453 |
|
---|
2454 | return err
|
---|
2455 | }
|
---|
2456 |
|
---|
2457 | // Do performs the given http request and sets the corresponding response.
|
---|
2458 | //
|
---|
2459 | // Request must contain at least non-zero RequestURI with full url (including
|
---|
2460 | // scheme and host) or non-zero Host header + RequestURI.
|
---|
2461 | //
|
---|
2462 | // The function doesn't follow redirects. Use Get* for following redirects.
|
---|
2463 | //
|
---|
2464 | // Response is ignored if resp is nil.
|
---|
2465 | //
|
---|
2466 | // It is recommended obtaining req and resp via AcquireRequest
|
---|
2467 | // and AcquireResponse in performance-critical code.
|
---|
2468 | func (c *PipelineClient) Do(req *Request, resp *Response) error {
|
---|
2469 | return c.getConnClient().Do(req, resp)
|
---|
2470 | }
|
---|
2471 |
|
---|
2472 | func (c *pipelineConnClient) Do(req *Request, resp *Response) error {
|
---|
2473 | c.init()
|
---|
2474 |
|
---|
2475 | if c.DisablePathNormalizing {
|
---|
2476 | req.URI().DisablePathNormalizing = true
|
---|
2477 | }
|
---|
2478 |
|
---|
2479 | userAgentOld := req.Header.UserAgent()
|
---|
2480 | if len(userAgentOld) == 0 {
|
---|
2481 | req.Header.userAgent = append(req.Header.userAgent[:0], c.getClientName()...)
|
---|
2482 | }
|
---|
2483 |
|
---|
2484 | w := acquirePipelineWork(&c.workPool, 0)
|
---|
2485 | w.req = req
|
---|
2486 | if resp != nil {
|
---|
2487 | resp.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
|
---|
2488 | w.resp = resp
|
---|
2489 | } else {
|
---|
2490 | w.resp = &w.respCopy
|
---|
2491 | }
|
---|
2492 |
|
---|
2493 | // Put the request to outgoing queue
|
---|
2494 | select {
|
---|
2495 | case c.chW <- w:
|
---|
2496 | default:
|
---|
2497 | // Try substituting the oldest w with the current one.
|
---|
2498 | select {
|
---|
2499 | case wOld := <-c.chW:
|
---|
2500 | wOld.err = ErrPipelineOverflow
|
---|
2501 | wOld.done <- struct{}{}
|
---|
2502 | default:
|
---|
2503 | }
|
---|
2504 | select {
|
---|
2505 | case c.chW <- w:
|
---|
2506 | default:
|
---|
2507 | releasePipelineWork(&c.workPool, w)
|
---|
2508 | return ErrPipelineOverflow
|
---|
2509 | }
|
---|
2510 | }
|
---|
2511 |
|
---|
2512 | // Wait for the response
|
---|
2513 | <-w.done
|
---|
2514 | err := w.err
|
---|
2515 |
|
---|
2516 | releasePipelineWork(&c.workPool, w)
|
---|
2517 |
|
---|
2518 | return err
|
---|
2519 | }
|
---|
2520 |
|
---|
2521 | func (c *PipelineClient) getConnClient() *pipelineConnClient {
|
---|
2522 | c.connClientsLock.Lock()
|
---|
2523 | cc := c.getConnClientUnlocked()
|
---|
2524 | c.connClientsLock.Unlock()
|
---|
2525 | return cc
|
---|
2526 | }
|
---|
2527 |
|
---|
2528 | func (c *PipelineClient) getConnClientUnlocked() *pipelineConnClient {
|
---|
2529 | if len(c.connClients) == 0 {
|
---|
2530 | return c.newConnClient()
|
---|
2531 | }
|
---|
2532 |
|
---|
2533 | // Return the client with the minimum number of pending requests.
|
---|
2534 | minCC := c.connClients[0]
|
---|
2535 | minReqs := minCC.PendingRequests()
|
---|
2536 | if minReqs == 0 {
|
---|
2537 | return minCC
|
---|
2538 | }
|
---|
2539 | for i := 1; i < len(c.connClients); i++ {
|
---|
2540 | cc := c.connClients[i]
|
---|
2541 | reqs := cc.PendingRequests()
|
---|
2542 | if reqs == 0 {
|
---|
2543 | return cc
|
---|
2544 | }
|
---|
2545 | if reqs < minReqs {
|
---|
2546 | minCC = cc
|
---|
2547 | minReqs = reqs
|
---|
2548 | }
|
---|
2549 | }
|
---|
2550 |
|
---|
2551 | maxConns := c.MaxConns
|
---|
2552 | if maxConns <= 0 {
|
---|
2553 | maxConns = 1
|
---|
2554 | }
|
---|
2555 | if len(c.connClients) < maxConns {
|
---|
2556 | return c.newConnClient()
|
---|
2557 | }
|
---|
2558 | return minCC
|
---|
2559 | }
|
---|
2560 |
|
---|
2561 | func (c *PipelineClient) newConnClient() *pipelineConnClient {
|
---|
2562 | cc := &pipelineConnClient{
|
---|
2563 | Addr: c.Addr,
|
---|
2564 | Name: c.Name,
|
---|
2565 | NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader,
|
---|
2566 | MaxPendingRequests: c.MaxPendingRequests,
|
---|
2567 | MaxBatchDelay: c.MaxBatchDelay,
|
---|
2568 | Dial: c.Dial,
|
---|
2569 | DialDualStack: c.DialDualStack,
|
---|
2570 | DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
|
---|
2571 | DisablePathNormalizing: c.DisablePathNormalizing,
|
---|
2572 | IsTLS: c.IsTLS,
|
---|
2573 | TLSConfig: c.TLSConfig,
|
---|
2574 | MaxIdleConnDuration: c.MaxIdleConnDuration,
|
---|
2575 | ReadBufferSize: c.ReadBufferSize,
|
---|
2576 | WriteBufferSize: c.WriteBufferSize,
|
---|
2577 | ReadTimeout: c.ReadTimeout,
|
---|
2578 | WriteTimeout: c.WriteTimeout,
|
---|
2579 | Logger: c.Logger,
|
---|
2580 | }
|
---|
2581 | c.connClients = append(c.connClients, cc)
|
---|
2582 | return cc
|
---|
2583 | }
|
---|
2584 |
|
---|
2585 | // ErrPipelineOverflow may be returned from PipelineClient.Do*
|
---|
2586 | // if the requests' queue is overflown.
|
---|
2587 | var ErrPipelineOverflow = errors.New("pipelined requests' queue has been overflown. Increase MaxConns and/or MaxPendingRequests")
|
---|
2588 |
|
---|
2589 | // DefaultMaxPendingRequests is the default value
|
---|
2590 | // for PipelineClient.MaxPendingRequests.
|
---|
2591 | const DefaultMaxPendingRequests = 1024
|
---|
2592 |
|
---|
2593 | func (c *pipelineConnClient) init() {
|
---|
2594 | c.chLock.Lock()
|
---|
2595 | if c.chR == nil {
|
---|
2596 | maxPendingRequests := c.MaxPendingRequests
|
---|
2597 | if maxPendingRequests <= 0 {
|
---|
2598 | maxPendingRequests = DefaultMaxPendingRequests
|
---|
2599 | }
|
---|
2600 | c.chR = make(chan *pipelineWork, maxPendingRequests)
|
---|
2601 | if c.chW == nil {
|
---|
2602 | c.chW = make(chan *pipelineWork, maxPendingRequests)
|
---|
2603 | }
|
---|
2604 | go func() {
|
---|
2605 | // Keep restarting the worker if it fails (connection errors for example).
|
---|
2606 | for {
|
---|
2607 | if err := c.worker(); err != nil {
|
---|
2608 | c.logger().Printf("error in PipelineClient(%q): %s", c.Addr, err)
|
---|
2609 | if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
|
---|
2610 | // Throttle client reconnections on temporary errors
|
---|
2611 | time.Sleep(time.Second)
|
---|
2612 | }
|
---|
2613 | } else {
|
---|
2614 | c.chLock.Lock()
|
---|
2615 | stop := len(c.chR) == 0 && len(c.chW) == 0
|
---|
2616 | if !stop {
|
---|
2617 | c.chR = nil
|
---|
2618 | c.chW = nil
|
---|
2619 | }
|
---|
2620 | c.chLock.Unlock()
|
---|
2621 |
|
---|
2622 | if stop {
|
---|
2623 | break
|
---|
2624 | }
|
---|
2625 | }
|
---|
2626 | }
|
---|
2627 | }()
|
---|
2628 | }
|
---|
2629 | c.chLock.Unlock()
|
---|
2630 | }
|
---|
2631 |
|
---|
2632 | func (c *pipelineConnClient) worker() error {
|
---|
2633 | tlsConfig := c.cachedTLSConfig()
|
---|
2634 | conn, err := dialAddr(c.Addr, c.Dial, c.DialDualStack, c.IsTLS, tlsConfig, c.WriteTimeout)
|
---|
2635 | if err != nil {
|
---|
2636 | return err
|
---|
2637 | }
|
---|
2638 |
|
---|
2639 | // Start reader and writer
|
---|
2640 | stopW := make(chan struct{})
|
---|
2641 | doneW := make(chan error)
|
---|
2642 | go func() {
|
---|
2643 | doneW <- c.writer(conn, stopW)
|
---|
2644 | }()
|
---|
2645 | stopR := make(chan struct{})
|
---|
2646 | doneR := make(chan error)
|
---|
2647 | go func() {
|
---|
2648 | doneR <- c.reader(conn, stopR)
|
---|
2649 | }()
|
---|
2650 |
|
---|
2651 | // Wait until reader and writer are stopped
|
---|
2652 | select {
|
---|
2653 | case err = <-doneW:
|
---|
2654 | conn.Close()
|
---|
2655 | close(stopR)
|
---|
2656 | <-doneR
|
---|
2657 | case err = <-doneR:
|
---|
2658 | conn.Close()
|
---|
2659 | close(stopW)
|
---|
2660 | <-doneW
|
---|
2661 | }
|
---|
2662 |
|
---|
2663 | // Notify pending readers
|
---|
2664 | for len(c.chR) > 0 {
|
---|
2665 | w := <-c.chR
|
---|
2666 | w.err = errPipelineConnStopped
|
---|
2667 | w.done <- struct{}{}
|
---|
2668 | }
|
---|
2669 |
|
---|
2670 | return err
|
---|
2671 | }
|
---|
2672 |
|
---|
2673 | func (c *pipelineConnClient) cachedTLSConfig() *tls.Config {
|
---|
2674 | if !c.IsTLS {
|
---|
2675 | return nil
|
---|
2676 | }
|
---|
2677 |
|
---|
2678 | c.tlsConfigLock.Lock()
|
---|
2679 | cfg := c.tlsConfig
|
---|
2680 | if cfg == nil {
|
---|
2681 | cfg = newClientTLSConfig(c.TLSConfig, c.Addr)
|
---|
2682 | c.tlsConfig = cfg
|
---|
2683 | }
|
---|
2684 | c.tlsConfigLock.Unlock()
|
---|
2685 |
|
---|
2686 | return cfg
|
---|
2687 | }
|
---|
2688 |
|
---|
2689 | func (c *pipelineConnClient) writer(conn net.Conn, stopCh <-chan struct{}) error {
|
---|
2690 | writeBufferSize := c.WriteBufferSize
|
---|
2691 | if writeBufferSize <= 0 {
|
---|
2692 | writeBufferSize = defaultWriteBufferSize
|
---|
2693 | }
|
---|
2694 | bw := bufio.NewWriterSize(conn, writeBufferSize)
|
---|
2695 | defer bw.Flush()
|
---|
2696 | chR := c.chR
|
---|
2697 | chW := c.chW
|
---|
2698 | writeTimeout := c.WriteTimeout
|
---|
2699 |
|
---|
2700 | maxIdleConnDuration := c.MaxIdleConnDuration
|
---|
2701 | if maxIdleConnDuration <= 0 {
|
---|
2702 | maxIdleConnDuration = DefaultMaxIdleConnDuration
|
---|
2703 | }
|
---|
2704 | maxBatchDelay := c.MaxBatchDelay
|
---|
2705 |
|
---|
2706 | var (
|
---|
2707 | stopTimer = time.NewTimer(time.Hour)
|
---|
2708 | flushTimer = time.NewTimer(time.Hour)
|
---|
2709 | flushTimerCh <-chan time.Time
|
---|
2710 | instantTimerCh = make(chan time.Time)
|
---|
2711 |
|
---|
2712 | w *pipelineWork
|
---|
2713 | err error
|
---|
2714 | )
|
---|
2715 | close(instantTimerCh)
|
---|
2716 | for {
|
---|
2717 | againChW:
|
---|
2718 | select {
|
---|
2719 | case w = <-chW:
|
---|
2720 | // Fast path: len(chW) > 0
|
---|
2721 | default:
|
---|
2722 | // Slow path
|
---|
2723 | stopTimer.Reset(maxIdleConnDuration)
|
---|
2724 | select {
|
---|
2725 | case w = <-chW:
|
---|
2726 | case <-stopTimer.C:
|
---|
2727 | return nil
|
---|
2728 | case <-stopCh:
|
---|
2729 | return nil
|
---|
2730 | case <-flushTimerCh:
|
---|
2731 | if err = bw.Flush(); err != nil {
|
---|
2732 | return err
|
---|
2733 | }
|
---|
2734 | flushTimerCh = nil
|
---|
2735 | goto againChW
|
---|
2736 | }
|
---|
2737 | }
|
---|
2738 |
|
---|
2739 | if !w.deadline.IsZero() && time.Since(w.deadline) >= 0 {
|
---|
2740 | w.err = ErrTimeout
|
---|
2741 | w.done <- struct{}{}
|
---|
2742 | continue
|
---|
2743 | }
|
---|
2744 |
|
---|
2745 | w.resp.parseNetConn(conn)
|
---|
2746 |
|
---|
2747 | if writeTimeout > 0 {
|
---|
2748 | // Set Deadline every time, since golang has fixed the performance issue
|
---|
2749 | // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
|
---|
2750 | currentTime := time.Now()
|
---|
2751 | if err = conn.SetWriteDeadline(currentTime.Add(writeTimeout)); err != nil {
|
---|
2752 | w.err = err
|
---|
2753 | w.done <- struct{}{}
|
---|
2754 | return err
|
---|
2755 | }
|
---|
2756 | }
|
---|
2757 | if err = w.req.Write(bw); err != nil {
|
---|
2758 | w.err = err
|
---|
2759 | w.done <- struct{}{}
|
---|
2760 | return err
|
---|
2761 | }
|
---|
2762 | if flushTimerCh == nil && (len(chW) == 0 || len(chR) == cap(chR)) {
|
---|
2763 | if maxBatchDelay > 0 {
|
---|
2764 | flushTimer.Reset(maxBatchDelay)
|
---|
2765 | flushTimerCh = flushTimer.C
|
---|
2766 | } else {
|
---|
2767 | flushTimerCh = instantTimerCh
|
---|
2768 | }
|
---|
2769 | }
|
---|
2770 |
|
---|
2771 | againChR:
|
---|
2772 | select {
|
---|
2773 | case chR <- w:
|
---|
2774 | // Fast path: len(chR) < cap(chR)
|
---|
2775 | default:
|
---|
2776 | // Slow path
|
---|
2777 | select {
|
---|
2778 | case chR <- w:
|
---|
2779 | case <-stopCh:
|
---|
2780 | w.err = errPipelineConnStopped
|
---|
2781 | w.done <- struct{}{}
|
---|
2782 | return nil
|
---|
2783 | case <-flushTimerCh:
|
---|
2784 | if err = bw.Flush(); err != nil {
|
---|
2785 | w.err = err
|
---|
2786 | w.done <- struct{}{}
|
---|
2787 | return err
|
---|
2788 | }
|
---|
2789 | flushTimerCh = nil
|
---|
2790 | goto againChR
|
---|
2791 | }
|
---|
2792 | }
|
---|
2793 | }
|
---|
2794 | }
|
---|
2795 |
|
---|
2796 | func (c *pipelineConnClient) reader(conn net.Conn, stopCh <-chan struct{}) error {
|
---|
2797 | readBufferSize := c.ReadBufferSize
|
---|
2798 | if readBufferSize <= 0 {
|
---|
2799 | readBufferSize = defaultReadBufferSize
|
---|
2800 | }
|
---|
2801 | br := bufio.NewReaderSize(conn, readBufferSize)
|
---|
2802 | chR := c.chR
|
---|
2803 | readTimeout := c.ReadTimeout
|
---|
2804 |
|
---|
2805 | var (
|
---|
2806 | w *pipelineWork
|
---|
2807 | err error
|
---|
2808 | )
|
---|
2809 | for {
|
---|
2810 | select {
|
---|
2811 | case w = <-chR:
|
---|
2812 | // Fast path: len(chR) > 0
|
---|
2813 | default:
|
---|
2814 | // Slow path
|
---|
2815 | select {
|
---|
2816 | case w = <-chR:
|
---|
2817 | case <-stopCh:
|
---|
2818 | return nil
|
---|
2819 | }
|
---|
2820 | }
|
---|
2821 |
|
---|
2822 | if readTimeout > 0 {
|
---|
2823 | // Set Deadline every time, since golang has fixed the performance issue
|
---|
2824 | // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
|
---|
2825 | currentTime := time.Now()
|
---|
2826 | if err = conn.SetReadDeadline(currentTime.Add(readTimeout)); err != nil {
|
---|
2827 | w.err = err
|
---|
2828 | w.done <- struct{}{}
|
---|
2829 | return err
|
---|
2830 | }
|
---|
2831 | }
|
---|
2832 | if err = w.resp.Read(br); err != nil {
|
---|
2833 | w.err = err
|
---|
2834 | w.done <- struct{}{}
|
---|
2835 | return err
|
---|
2836 | }
|
---|
2837 |
|
---|
2838 | w.done <- struct{}{}
|
---|
2839 | }
|
---|
2840 | }
|
---|
2841 |
|
---|
2842 | func (c *pipelineConnClient) logger() Logger {
|
---|
2843 | if c.Logger != nil {
|
---|
2844 | return c.Logger
|
---|
2845 | }
|
---|
2846 | return defaultLogger
|
---|
2847 | }
|
---|
2848 |
|
---|
2849 | // PendingRequests returns the current number of pending requests pipelined
|
---|
2850 | // to the server.
|
---|
2851 | //
|
---|
2852 | // This number may exceed MaxPendingRequests*MaxConns by up to two times, since
|
---|
2853 | // each connection to the server may keep up to MaxPendingRequests requests
|
---|
2854 | // in the queue before sending them to the server.
|
---|
2855 | //
|
---|
2856 | // This function may be used for balancing load among multiple PipelineClient
|
---|
2857 | // instances.
|
---|
2858 | func (c *PipelineClient) PendingRequests() int {
|
---|
2859 | c.connClientsLock.Lock()
|
---|
2860 | n := 0
|
---|
2861 | for _, cc := range c.connClients {
|
---|
2862 | n += cc.PendingRequests()
|
---|
2863 | }
|
---|
2864 | c.connClientsLock.Unlock()
|
---|
2865 | return n
|
---|
2866 | }
|
---|
2867 |
|
---|
2868 | func (c *pipelineConnClient) PendingRequests() int {
|
---|
2869 | c.init()
|
---|
2870 |
|
---|
2871 | c.chLock.Lock()
|
---|
2872 | n := len(c.chR) + len(c.chW)
|
---|
2873 | c.chLock.Unlock()
|
---|
2874 | return n
|
---|
2875 | }
|
---|
2876 |
|
---|
2877 | func (c *pipelineConnClient) getClientName() []byte {
|
---|
2878 | v := c.clientName.Load()
|
---|
2879 | var clientName []byte
|
---|
2880 | if v == nil {
|
---|
2881 | clientName = []byte(c.Name)
|
---|
2882 | if len(clientName) == 0 && !c.NoDefaultUserAgentHeader {
|
---|
2883 | clientName = defaultUserAgent
|
---|
2884 | }
|
---|
2885 | c.clientName.Store(clientName)
|
---|
2886 | } else {
|
---|
2887 | clientName = v.([]byte)
|
---|
2888 | }
|
---|
2889 | return clientName
|
---|
2890 | }
|
---|
2891 |
|
---|
2892 | var errPipelineConnStopped = errors.New("pipeline connection has been stopped")
|
---|
2893 |
|
---|
2894 | func acquirePipelineWork(pool *sync.Pool, timeout time.Duration) *pipelineWork {
|
---|
2895 | v := pool.Get()
|
---|
2896 | if v == nil {
|
---|
2897 | v = &pipelineWork{
|
---|
2898 | done: make(chan struct{}, 1),
|
---|
2899 | }
|
---|
2900 | }
|
---|
2901 | w := v.(*pipelineWork)
|
---|
2902 | if timeout > 0 {
|
---|
2903 | if w.t == nil {
|
---|
2904 | w.t = time.NewTimer(timeout)
|
---|
2905 | } else {
|
---|
2906 | w.t.Reset(timeout)
|
---|
2907 | }
|
---|
2908 | w.deadline = time.Now().Add(timeout)
|
---|
2909 | } else {
|
---|
2910 | w.deadline = zeroTime
|
---|
2911 | }
|
---|
2912 | return w
|
---|
2913 | }
|
---|
2914 |
|
---|
2915 | func releasePipelineWork(pool *sync.Pool, w *pipelineWork) {
|
---|
2916 | if w.t != nil {
|
---|
2917 | w.t.Stop()
|
---|
2918 | }
|
---|
2919 | w.reqCopy.Reset()
|
---|
2920 | w.respCopy.Reset()
|
---|
2921 | w.req = nil
|
---|
2922 | w.resp = nil
|
---|
2923 | w.err = nil
|
---|
2924 | pool.Put(w)
|
---|
2925 | }
|
---|