source: code/trunk/vendor/github.com/valyala/fasthttp/client.go@ 145

Last change on this file since 145 was 145, checked in by Izuru Yakumo, 22 months ago

Updated the Makefile and vendored depedencies

Signed-off-by: Izuru Yakumo <yakumo.izuru@…>

File size: 77.1 KB
RevLine 
[145]1// go:build !windows || !race
2
3package fasthttp
4
5import (
6 "bufio"
7 "crypto/tls"
8 "errors"
9 "fmt"
10 "io"
11 "net"
12 "strconv"
13 "strings"
14 "sync"
15 "sync/atomic"
16 "time"
17)
18
19// Do performs the given http request and fills the given http response.
20//
21// Request must contain at least non-zero RequestURI with full url (including
22// scheme and host) or non-zero Host header + RequestURI.
23//
24// Client determines the server to be requested in the following order:
25//
26// - from RequestURI if it contains full url with scheme and host;
27// - from Host header otherwise.
28//
29// The function doesn't follow redirects. Use Get* for following redirects.
30//
31// Response is ignored if resp is nil.
32//
33// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
34// to the requested host are busy.
35//
36// It is recommended obtaining req and resp via AcquireRequest
37// and AcquireResponse in performance-critical code.
38func Do(req *Request, resp *Response) error {
39 return defaultClient.Do(req, resp)
40}
41
42// DoTimeout performs the given request and waits for response during
43// the given timeout duration.
44//
45// Request must contain at least non-zero RequestURI with full url (including
46// scheme and host) or non-zero Host header + RequestURI.
47//
48// Client determines the server to be requested in the following order:
49//
50// - from RequestURI if it contains full url with scheme and host;
51// - from Host header otherwise.
52//
53// The function doesn't follow redirects. Use Get* for following redirects.
54//
55// Response is ignored if resp is nil.
56//
57// ErrTimeout is returned if the response wasn't returned during
58// the given timeout.
59//
60// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
61// to the requested host are busy.
62//
63// It is recommended obtaining req and resp via AcquireRequest
64// and AcquireResponse in performance-critical code.
65//
66// Warning: DoTimeout does not terminate the request itself. The request will
67// continue in the background and the response will be discarded.
68// If requests take too long and the connection pool gets filled up please
69// try using a Client and setting a ReadTimeout.
70func DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
71 return defaultClient.DoTimeout(req, resp, timeout)
72}
73
74// DoDeadline performs the given request and waits for response until
75// the given deadline.
76//
77// Request must contain at least non-zero RequestURI with full url (including
78// scheme and host) or non-zero Host header + RequestURI.
79//
80// Client determines the server to be requested in the following order:
81//
82// - from RequestURI if it contains full url with scheme and host;
83// - from Host header otherwise.
84//
85// The function doesn't follow redirects. Use Get* for following redirects.
86//
87// Response is ignored if resp is nil.
88//
89// ErrTimeout is returned if the response wasn't returned until
90// the given deadline.
91//
92// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
93// to the requested host are busy.
94//
95// It is recommended obtaining req and resp via AcquireRequest
96// and AcquireResponse in performance-critical code.
97func DoDeadline(req *Request, resp *Response, deadline time.Time) error {
98 return defaultClient.DoDeadline(req, resp, deadline)
99}
100
101// DoRedirects performs the given http request and fills the given http response,
102// following up to maxRedirectsCount redirects. When the redirect count exceeds
103// maxRedirectsCount, ErrTooManyRedirects is returned.
104//
105// Request must contain at least non-zero RequestURI with full url (including
106// scheme and host) or non-zero Host header + RequestURI.
107//
108// Client determines the server to be requested in the following order:
109//
110// - from RequestURI if it contains full url with scheme and host;
111// - from Host header otherwise.
112//
113// Response is ignored if resp is nil.
114//
115// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
116// to the requested host are busy.
117//
118// It is recommended obtaining req and resp via AcquireRequest
119// and AcquireResponse in performance-critical code.
120func DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
121 _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, &defaultClient)
122 return err
123}
124
125// Get returns the status code and body of url.
126//
127// The contents of dst will be replaced by the body and returned, if the dst
128// is too small a new slice will be allocated.
129//
130// The function follows redirects. Use Do* for manually handling redirects.
131func Get(dst []byte, url string) (statusCode int, body []byte, err error) {
132 return defaultClient.Get(dst, url)
133}
134
135// GetTimeout returns the status code and body of url.
136//
137// The contents of dst will be replaced by the body and returned, if the dst
138// is too small a new slice will be allocated.
139//
140// The function follows redirects. Use Do* for manually handling redirects.
141//
142// ErrTimeout error is returned if url contents couldn't be fetched
143// during the given timeout.
144func GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
145 return defaultClient.GetTimeout(dst, url, timeout)
146}
147
148// GetDeadline returns the status code and body of url.
149//
150// The contents of dst will be replaced by the body and returned, if the dst
151// is too small a new slice will be allocated.
152//
153// The function follows redirects. Use Do* for manually handling redirects.
154//
155// ErrTimeout error is returned if url contents couldn't be fetched
156// until the given deadline.
157func GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
158 return defaultClient.GetDeadline(dst, url, deadline)
159}
160
161// Post sends POST request to the given url with the given POST arguments.
162//
163// The contents of dst will be replaced by the body and returned, if the dst
164// is too small a new slice will be allocated.
165//
166// The function follows redirects. Use Do* for manually handling redirects.
167//
168// Empty POST body is sent if postArgs is nil.
169func Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
170 return defaultClient.Post(dst, url, postArgs)
171}
172
173var defaultClient Client
174
175// Client implements http client.
176//
177// Copying Client by value is prohibited. Create new instance instead.
178//
179// It is safe calling Client methods from concurrently running goroutines.
180//
181// The fields of a Client should not be changed while it is in use.
182type Client struct {
183 noCopy noCopy //nolint:unused,structcheck
184
185 // Client name. Used in User-Agent request header.
186 //
187 // Default client name is used if not set.
188 Name string
189
190 // NoDefaultUserAgentHeader when set to true, causes the default
191 // User-Agent header to be excluded from the Request.
192 NoDefaultUserAgentHeader bool
193
194 // Callback for establishing new connections to hosts.
195 //
196 // Default Dial is used if not set.
197 Dial DialFunc
198
199 // Attempt to connect to both ipv4 and ipv6 addresses if set to true.
200 //
201 // This option is used only if default TCP dialer is used,
202 // i.e. if Dial is blank.
203 //
204 // By default client connects only to ipv4 addresses,
205 // since unfortunately ipv6 remains broken in many networks worldwide :)
206 DialDualStack bool
207
208 // TLS config for https connections.
209 //
210 // Default TLS config is used if not set.
211 TLSConfig *tls.Config
212
213 // Maximum number of connections per each host which may be established.
214 //
215 // DefaultMaxConnsPerHost is used if not set.
216 MaxConnsPerHost int
217
218 // Idle keep-alive connections are closed after this duration.
219 //
220 // By default idle connections are closed
221 // after DefaultMaxIdleConnDuration.
222 MaxIdleConnDuration time.Duration
223
224 // Keep-alive connections are closed after this duration.
225 //
226 // By default connection duration is unlimited.
227 MaxConnDuration time.Duration
228
229 // Maximum number of attempts for idempotent calls
230 //
231 // DefaultMaxIdemponentCallAttempts is used if not set.
232 MaxIdemponentCallAttempts int
233
234 // Per-connection buffer size for responses' reading.
235 // This also limits the maximum header size.
236 //
237 // Default buffer size is used if 0.
238 ReadBufferSize int
239
240 // Per-connection buffer size for requests' writing.
241 //
242 // Default buffer size is used if 0.
243 WriteBufferSize int
244
245 // Maximum duration for full response reading (including body).
246 //
247 // By default response read timeout is unlimited.
248 ReadTimeout time.Duration
249
250 // Maximum duration for full request writing (including body).
251 //
252 // By default request write timeout is unlimited.
253 WriteTimeout time.Duration
254
255 // Maximum response body size.
256 //
257 // The client returns ErrBodyTooLarge if this limit is greater than 0
258 // and response body is greater than the limit.
259 //
260 // By default response body size is unlimited.
261 MaxResponseBodySize int
262
263 // Header names are passed as-is without normalization
264 // if this option is set.
265 //
266 // Disabled header names' normalization may be useful only for proxying
267 // responses to other clients expecting case-sensitive
268 // header names. See https://github.com/valyala/fasthttp/issues/57
269 // for details.
270 //
271 // By default request and response header names are normalized, i.e.
272 // The first letter and the first letters following dashes
273 // are uppercased, while all the other letters are lowercased.
274 // Examples:
275 //
276 // * HOST -> Host
277 // * content-type -> Content-Type
278 // * cONTENT-lenGTH -> Content-Length
279 DisableHeaderNamesNormalizing bool
280
281 // Path values are sent as-is without normalization
282 //
283 // Disabled path normalization may be useful for proxying incoming requests
284 // to servers that are expecting paths to be forwarded as-is.
285 //
286 // By default path values are normalized, i.e.
287 // extra slashes are removed, special characters are encoded.
288 DisablePathNormalizing bool
289
290 // Maximum duration for waiting for a free connection.
291 //
292 // By default will not waiting, return ErrNoFreeConns immediately
293 MaxConnWaitTimeout time.Duration
294
295 // RetryIf controls whether a retry should be attempted after an error.
296 //
297 // By default will use isIdempotent function
298 RetryIf RetryIfFunc
299
300 // ConfigureClient configures the fasthttp.HostClient.
301 ConfigureClient func(hc *HostClient) error
302
303 mLock sync.Mutex
304 m map[string]*HostClient
305 ms map[string]*HostClient
306 readerPool sync.Pool
307 writerPool sync.Pool
308}
309
310// Get returns the status code and body of url.
311//
312// The contents of dst will be replaced by the body and returned, if the dst
313// is too small a new slice will be allocated.
314//
315// The function follows redirects. Use Do* for manually handling redirects.
316func (c *Client) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
317 return clientGetURL(dst, url, c)
318}
319
320// GetTimeout returns the status code and body of url.
321//
322// The contents of dst will be replaced by the body and returned, if the dst
323// is too small a new slice will be allocated.
324//
325// The function follows redirects. Use Do* for manually handling redirects.
326//
327// ErrTimeout error is returned if url contents couldn't be fetched
328// during the given timeout.
329func (c *Client) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
330 return clientGetURLTimeout(dst, url, timeout, c)
331}
332
333// GetDeadline returns the status code and body of url.
334//
335// The contents of dst will be replaced by the body and returned, if the dst
336// is too small a new slice will be allocated.
337//
338// The function follows redirects. Use Do* for manually handling redirects.
339//
340// ErrTimeout error is returned if url contents couldn't be fetched
341// until the given deadline.
342func (c *Client) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
343 return clientGetURLDeadline(dst, url, deadline, c)
344}
345
346// Post sends POST request to the given url with the given POST arguments.
347//
348// The contents of dst will be replaced by the body and returned, if the dst
349// is too small a new slice will be allocated.
350//
351// The function follows redirects. Use Do* for manually handling redirects.
352//
353// Empty POST body is sent if postArgs is nil.
354func (c *Client) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
355 return clientPostURL(dst, url, postArgs, c)
356}
357
358// DoTimeout performs the given request and waits for response during
359// the given timeout duration.
360//
361// Request must contain at least non-zero RequestURI with full url (including
362// scheme and host) or non-zero Host header + RequestURI.
363//
364// Client determines the server to be requested in the following order:
365//
366// - from RequestURI if it contains full url with scheme and host;
367// - from Host header otherwise.
368//
369// The function doesn't follow redirects. Use Get* for following redirects.
370//
371// Response is ignored if resp is nil.
372//
373// ErrTimeout is returned if the response wasn't returned during
374// the given timeout.
375//
376// ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
377// to the requested host are busy.
378//
379// It is recommended obtaining req and resp via AcquireRequest
380// and AcquireResponse in performance-critical code.
381//
382// Warning: DoTimeout does not terminate the request itself. The request will
383// continue in the background and the response will be discarded.
384// If requests take too long and the connection pool gets filled up please
385// try setting a ReadTimeout.
386func (c *Client) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
387 return clientDoTimeout(req, resp, timeout, c)
388}
389
390// DoDeadline performs the given request and waits for response until
391// the given deadline.
392//
393// Request must contain at least non-zero RequestURI with full url (including
394// scheme and host) or non-zero Host header + RequestURI.
395//
396// Client determines the server to be requested in the following order:
397//
398// - from RequestURI if it contains full url with scheme and host;
399// - from Host header otherwise.
400//
401// The function doesn't follow redirects. Use Get* for following redirects.
402//
403// Response is ignored if resp is nil.
404//
405// ErrTimeout is returned if the response wasn't returned until
406// the given deadline.
407//
408// ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
409// to the requested host are busy.
410//
411// It is recommended obtaining req and resp via AcquireRequest
412// and AcquireResponse in performance-critical code.
413func (c *Client) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
414 return clientDoDeadline(req, resp, deadline, c)
415}
416
417// DoRedirects performs the given http request and fills the given http response,
418// following up to maxRedirectsCount redirects. When the redirect count exceeds
419// maxRedirectsCount, ErrTooManyRedirects is returned.
420//
421// Request must contain at least non-zero RequestURI with full url (including
422// scheme and host) or non-zero Host header + RequestURI.
423//
424// Client determines the server to be requested in the following order:
425//
426// - from RequestURI if it contains full url with scheme and host;
427// - from Host header otherwise.
428//
429// Response is ignored if resp is nil.
430//
431// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
432// to the requested host are busy.
433//
434// It is recommended obtaining req and resp via AcquireRequest
435// and AcquireResponse in performance-critical code.
436func (c *Client) DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
437 _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, c)
438 return err
439}
440
441// Do performs the given http request and fills the given http response.
442//
443// Request must contain at least non-zero RequestURI with full url (including
444// scheme and host) or non-zero Host header + RequestURI.
445//
446// Client determines the server to be requested in the following order:
447//
448// - from RequestURI if it contains full url with scheme and host;
449// - from Host header otherwise.
450//
451// Response is ignored if resp is nil.
452//
453// The function doesn't follow redirects. Use Get* for following redirects.
454//
455// ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
456// to the requested host are busy.
457//
458// It is recommended obtaining req and resp via AcquireRequest
459// and AcquireResponse in performance-critical code.
460func (c *Client) Do(req *Request, resp *Response) error {
461 uri := req.URI()
462 if uri == nil {
463 return ErrorInvalidURI
464 }
465
466 host := uri.Host()
467
468 isTLS := false
469 if uri.isHttps() {
470 isTLS = true
471 } else if !uri.isHttp() {
472 return fmt.Errorf("unsupported protocol %q. http and https are supported", uri.Scheme())
473 }
474
475 startCleaner := false
476
477 c.mLock.Lock()
478 m := c.m
479 if isTLS {
480 m = c.ms
481 }
482 if m == nil {
483 m = make(map[string]*HostClient)
484 if isTLS {
485 c.ms = m
486 } else {
487 c.m = m
488 }
489 }
490 hc := m[string(host)]
491 if hc == nil {
492 hc = &HostClient{
493 Addr: addMissingPort(string(host), isTLS),
494 Name: c.Name,
495 NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader,
496 Dial: c.Dial,
497 DialDualStack: c.DialDualStack,
498 IsTLS: isTLS,
499 TLSConfig: c.TLSConfig,
500 MaxConns: c.MaxConnsPerHost,
501 MaxIdleConnDuration: c.MaxIdleConnDuration,
502 MaxConnDuration: c.MaxConnDuration,
503 MaxIdemponentCallAttempts: c.MaxIdemponentCallAttempts,
504 ReadBufferSize: c.ReadBufferSize,
505 WriteBufferSize: c.WriteBufferSize,
506 ReadTimeout: c.ReadTimeout,
507 WriteTimeout: c.WriteTimeout,
508 MaxResponseBodySize: c.MaxResponseBodySize,
509 DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
510 DisablePathNormalizing: c.DisablePathNormalizing,
511 MaxConnWaitTimeout: c.MaxConnWaitTimeout,
512 RetryIf: c.RetryIf,
513 clientReaderPool: &c.readerPool,
514 clientWriterPool: &c.writerPool,
515 }
516
517 if c.ConfigureClient != nil {
518 if err := c.ConfigureClient(hc); err != nil {
519 return err
520 }
521 }
522
523 m[string(host)] = hc
524 if len(m) == 1 {
525 startCleaner = true
526 }
527 }
528
529 atomic.AddInt32(&hc.pendingClientRequests, 1)
530 defer atomic.AddInt32(&hc.pendingClientRequests, -1)
531
532 c.mLock.Unlock()
533
534 if startCleaner {
535 go c.mCleaner(m)
536 }
537
538 return hc.Do(req, resp)
539}
540
541// CloseIdleConnections closes any connections which were previously
542// connected from previous requests but are now sitting idle in a
543// "keep-alive" state. It does not interrupt any connections currently
544// in use.
545func (c *Client) CloseIdleConnections() {
546 c.mLock.Lock()
547 for _, v := range c.m {
548 v.CloseIdleConnections()
549 }
550 for _, v := range c.ms {
551 v.CloseIdleConnections()
552 }
553 c.mLock.Unlock()
554}
555
556func (c *Client) mCleaner(m map[string]*HostClient) {
557 mustStop := false
558
559 sleep := c.MaxIdleConnDuration
560 if sleep < time.Second {
561 sleep = time.Second
562 } else if sleep > 10*time.Second {
563 sleep = 10 * time.Second
564 }
565
566 for {
567 c.mLock.Lock()
568 for k, v := range m {
569 v.connsLock.Lock()
570 if v.connsCount == 0 && atomic.LoadInt32(&v.pendingClientRequests) == 0 {
571 delete(m, k)
572 }
573 v.connsLock.Unlock()
574 }
575 if len(m) == 0 {
576 mustStop = true
577 }
578 c.mLock.Unlock()
579
580 if mustStop {
581 break
582 }
583 time.Sleep(sleep)
584 }
585}
586
587// DefaultMaxConnsPerHost is the maximum number of concurrent connections
588// http client may establish per host by default (i.e. if
589// Client.MaxConnsPerHost isn't set).
590const DefaultMaxConnsPerHost = 512
591
592// DefaultMaxIdleConnDuration is the default duration before idle keep-alive
593// connection is closed.
594const DefaultMaxIdleConnDuration = 10 * time.Second
595
596// DefaultMaxIdemponentCallAttempts is the default idempotent calls attempts count.
597const DefaultMaxIdemponentCallAttempts = 5
598
599// DialFunc must establish connection to addr.
600//
601// There is no need in establishing TLS (SSL) connection for https.
602// The client automatically converts connection to TLS
603// if HostClient.IsTLS is set.
604//
605// TCP address passed to DialFunc always contains host and port.
606// Example TCP addr values:
607//
608// - foobar.com:80
609// - foobar.com:443
610// - foobar.com:8080
611type DialFunc func(addr string) (net.Conn, error)
612
613// RetryIfFunc signature of retry if function
614//
615// Request argument passed to RetryIfFunc, if there are any request errors.
616type RetryIfFunc func(request *Request) bool
617
618// TransportFunc wraps every request/response.
619type TransportFunc func(*Request, *Response) error
620
621// HostClient balances http requests among hosts listed in Addr.
622//
623// HostClient may be used for balancing load among multiple upstream hosts.
624// While multiple addresses passed to HostClient.Addr may be used for balancing
625// load among them, it would be better using LBClient instead, since HostClient
626// may unevenly balance load among upstream hosts.
627//
628// It is forbidden copying HostClient instances. Create new instances instead.
629//
630// It is safe calling HostClient methods from concurrently running goroutines.
631type HostClient struct {
632 noCopy noCopy //nolint:unused,structcheck
633
634 // Comma-separated list of upstream HTTP server host addresses,
635 // which are passed to Dial in a round-robin manner.
636 //
637 // Each address may contain port if default dialer is used.
638 // For example,
639 //
640 // - foobar.com:80
641 // - foobar.com:443
642 // - foobar.com:8080
643 Addr string
644
645 // Client name. Used in User-Agent request header.
646 Name string
647
648 // NoDefaultUserAgentHeader when set to true, causes the default
649 // User-Agent header to be excluded from the Request.
650 NoDefaultUserAgentHeader bool
651
652 // Callback for establishing new connection to the host.
653 //
654 // Default Dial is used if not set.
655 Dial DialFunc
656
657 // Attempt to connect to both ipv4 and ipv6 host addresses
658 // if set to true.
659 //
660 // This option is used only if default TCP dialer is used,
661 // i.e. if Dial is blank.
662 //
663 // By default client connects only to ipv4 addresses,
664 // since unfortunately ipv6 remains broken in many networks worldwide :)
665 DialDualStack bool
666
667 // Whether to use TLS (aka SSL or HTTPS) for host connections.
668 IsTLS bool
669
670 // Optional TLS config.
671 TLSConfig *tls.Config
672
673 // Maximum number of connections which may be established to all hosts
674 // listed in Addr.
675 //
676 // You can change this value while the HostClient is being used
677 // using HostClient.SetMaxConns(value)
678 //
679 // DefaultMaxConnsPerHost is used if not set.
680 MaxConns int
681
682 // Keep-alive connections are closed after this duration.
683 //
684 // By default connection duration is unlimited.
685 MaxConnDuration time.Duration
686
687 // Idle keep-alive connections are closed after this duration.
688 //
689 // By default idle connections are closed
690 // after DefaultMaxIdleConnDuration.
691 MaxIdleConnDuration time.Duration
692
693 // Maximum number of attempts for idempotent calls
694 //
695 // DefaultMaxIdemponentCallAttempts is used if not set.
696 MaxIdemponentCallAttempts int
697
698 // Per-connection buffer size for responses' reading.
699 // This also limits the maximum header size.
700 //
701 // Default buffer size is used if 0.
702 ReadBufferSize int
703
704 // Per-connection buffer size for requests' writing.
705 //
706 // Default buffer size is used if 0.
707 WriteBufferSize int
708
709 // Maximum duration for full response reading (including body).
710 //
711 // By default response read timeout is unlimited.
712 ReadTimeout time.Duration
713
714 // Maximum duration for full request writing (including body).
715 //
716 // By default request write timeout is unlimited.
717 WriteTimeout time.Duration
718
719 // Maximum response body size.
720 //
721 // The client returns ErrBodyTooLarge if this limit is greater than 0
722 // and response body is greater than the limit.
723 //
724 // By default response body size is unlimited.
725 MaxResponseBodySize int
726
727 // Header names are passed as-is without normalization
728 // if this option is set.
729 //
730 // Disabled header names' normalization may be useful only for proxying
731 // responses to other clients expecting case-sensitive
732 // header names. See https://github.com/valyala/fasthttp/issues/57
733 // for details.
734 //
735 // By default request and response header names are normalized, i.e.
736 // The first letter and the first letters following dashes
737 // are uppercased, while all the other letters are lowercased.
738 // Examples:
739 //
740 // * HOST -> Host
741 // * content-type -> Content-Type
742 // * cONTENT-lenGTH -> Content-Length
743 DisableHeaderNamesNormalizing bool
744
745 // Path values are sent as-is without normalization
746 //
747 // Disabled path normalization may be useful for proxying incoming requests
748 // to servers that are expecting paths to be forwarded as-is.
749 //
750 // By default path values are normalized, i.e.
751 // extra slashes are removed, special characters are encoded.
752 DisablePathNormalizing bool
753
754 // Will not log potentially sensitive content in error logs
755 //
756 // This option is useful for servers that handle sensitive data
757 // in the request/response.
758 //
759 // Client logs full errors by default.
760 SecureErrorLogMessage bool
761
762 // Maximum duration for waiting for a free connection.
763 //
764 // By default will not waiting, return ErrNoFreeConns immediately
765 MaxConnWaitTimeout time.Duration
766
767 // RetryIf controls whether a retry should be attempted after an error.
768 //
769 // By default will use isIdempotent function
770 RetryIf RetryIfFunc
771
772 // Transport defines a transport-like mechanism that wraps every request/response.
773 Transport TransportFunc
774
775 clientName atomic.Value
776 lastUseTime uint32
777
778 connsLock sync.Mutex
779 connsCount int
780 conns []*clientConn
781 connsWait *wantConnQueue
782
783 addrsLock sync.Mutex
784 addrs []string
785 addrIdx uint32
786
787 tlsConfigMap map[string]*tls.Config
788 tlsConfigMapLock sync.Mutex
789
790 readerPool sync.Pool
791 writerPool sync.Pool
792
793 clientReaderPool *sync.Pool
794 clientWriterPool *sync.Pool
795
796 pendingRequests int32
797
798 // pendingClientRequests counts the number of requests that a Client is currently running using this HostClient.
799 // It will be incremented ealier than pendingRequests and will be used by Client to see if the HostClient is still in use.
800 pendingClientRequests int32
801
802 connsCleanerRun bool
803}
804
805type clientConn struct {
806 c net.Conn
807
808 createdTime time.Time
809 lastUseTime time.Time
810}
811
812var startTimeUnix = time.Now().Unix()
813
814// LastUseTime returns time the client was last used
815func (c *HostClient) LastUseTime() time.Time {
816 n := atomic.LoadUint32(&c.lastUseTime)
817 return time.Unix(startTimeUnix+int64(n), 0)
818}
819
820// Get returns the status code and body of url.
821//
822// The contents of dst will be replaced by the body and returned, if the dst
823// is too small a new slice will be allocated.
824//
825// The function follows redirects. Use Do* for manually handling redirects.
826func (c *HostClient) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
827 return clientGetURL(dst, url, c)
828}
829
830// GetTimeout returns the status code and body of url.
831//
832// The contents of dst will be replaced by the body and returned, if the dst
833// is too small a new slice will be allocated.
834//
835// The function follows redirects. Use Do* for manually handling redirects.
836//
837// ErrTimeout error is returned if url contents couldn't be fetched
838// during the given timeout.
839func (c *HostClient) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
840 return clientGetURLTimeout(dst, url, timeout, c)
841}
842
843// GetDeadline returns the status code and body of url.
844//
845// The contents of dst will be replaced by the body and returned, if the dst
846// is too small a new slice will be allocated.
847//
848// The function follows redirects. Use Do* for manually handling redirects.
849//
850// ErrTimeout error is returned if url contents couldn't be fetched
851// until the given deadline.
852func (c *HostClient) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
853 return clientGetURLDeadline(dst, url, deadline, c)
854}
855
856// Post sends POST request to the given url with the given POST arguments.
857//
858// The contents of dst will be replaced by the body and returned, if the dst
859// is too small a new slice will be allocated.
860//
861// The function follows redirects. Use Do* for manually handling redirects.
862//
863// Empty POST body is sent if postArgs is nil.
864func (c *HostClient) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
865 return clientPostURL(dst, url, postArgs, c)
866}
867
868type clientDoer interface {
869 Do(req *Request, resp *Response) error
870}
871
872func clientGetURL(dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
873 req := AcquireRequest()
874
875 statusCode, body, err = doRequestFollowRedirectsBuffer(req, dst, url, c)
876
877 ReleaseRequest(req)
878 return statusCode, body, err
879}
880
881func clientGetURLTimeout(dst []byte, url string, timeout time.Duration, c clientDoer) (statusCode int, body []byte, err error) {
882 deadline := time.Now().Add(timeout)
883 return clientGetURLDeadline(dst, url, deadline, c)
884}
885
886type clientURLResponse struct {
887 statusCode int
888 body []byte
889 err error
890}
891
892func clientGetURLDeadline(dst []byte, url string, deadline time.Time, c clientDoer) (statusCode int, body []byte, err error) {
893 timeout := -time.Since(deadline)
894 if timeout <= 0 {
895 return 0, dst, ErrTimeout
896 }
897
898 var ch chan clientURLResponse
899 chv := clientURLResponseChPool.Get()
900 if chv == nil {
901 chv = make(chan clientURLResponse, 1)
902 }
903 ch = chv.(chan clientURLResponse)
904
905 // Note that the request continues execution on ErrTimeout until
906 // client-specific ReadTimeout exceeds. This helps limiting load
907 // on slow hosts by MaxConns* concurrent requests.
908 //
909 // Without this 'hack' the load on slow host could exceed MaxConns*
910 // concurrent requests, since timed out requests on client side
911 // usually continue execution on the host.
912
913 var mu sync.Mutex
914 var timedout, responded bool
915
916 go func() {
917 req := AcquireRequest()
918
919 statusCodeCopy, bodyCopy, errCopy := doRequestFollowRedirectsBuffer(req, dst, url, c)
920 mu.Lock()
921 {
922 if !timedout {
923 ch <- clientURLResponse{
924 statusCode: statusCodeCopy,
925 body: bodyCopy,
926 err: errCopy,
927 }
928 responded = true
929 }
930 }
931 mu.Unlock()
932
933 ReleaseRequest(req)
934 }()
935
936 tc := AcquireTimer(timeout)
937 select {
938 case resp := <-ch:
939 statusCode = resp.statusCode
940 body = resp.body
941 err = resp.err
942 case <-tc.C:
943 mu.Lock()
944 {
945 if responded {
946 resp := <-ch
947 statusCode = resp.statusCode
948 body = resp.body
949 err = resp.err
950 } else {
951 timedout = true
952 err = ErrTimeout
953 body = dst
954 }
955 }
956 mu.Unlock()
957 }
958 ReleaseTimer(tc)
959
960 clientURLResponseChPool.Put(chv)
961
962 return statusCode, body, err
963}
964
965var clientURLResponseChPool sync.Pool
966
967func clientPostURL(dst []byte, url string, postArgs *Args, c clientDoer) (statusCode int, body []byte, err error) {
968 req := AcquireRequest()
969 req.Header.SetMethod(MethodPost)
970 req.Header.SetContentTypeBytes(strPostArgsContentType)
971 if postArgs != nil {
972 if _, err := postArgs.WriteTo(req.BodyWriter()); err != nil {
973 return 0, nil, err
974 }
975 }
976
977 statusCode, body, err = doRequestFollowRedirectsBuffer(req, dst, url, c)
978
979 ReleaseRequest(req)
980 return statusCode, body, err
981}
982
983var (
984 // ErrMissingLocation is returned by clients when the Location header is missing on
985 // an HTTP response with a redirect status code.
986 ErrMissingLocation = errors.New("missing Location header for http redirect")
987 // ErrTooManyRedirects is returned by clients when the number of redirects followed
988 // exceed the max count.
989 ErrTooManyRedirects = errors.New("too many redirects detected when doing the request")
990
991 // HostClients are only able to follow redirects to the same protocol.
992 ErrHostClientRedirectToDifferentScheme = errors.New("HostClient can't follow redirects to a different protocol, please use Client instead")
993)
994
995const defaultMaxRedirectsCount = 16
996
997func doRequestFollowRedirectsBuffer(req *Request, dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
998 resp := AcquireResponse()
999 bodyBuf := resp.bodyBuffer()
1000 resp.keepBodyBuffer = true
1001 oldBody := bodyBuf.B
1002 bodyBuf.B = dst
1003
1004 statusCode, _, err = doRequestFollowRedirects(req, resp, url, defaultMaxRedirectsCount, c)
1005
1006 body = bodyBuf.B
1007 bodyBuf.B = oldBody
1008 resp.keepBodyBuffer = false
1009 ReleaseResponse(resp)
1010
1011 return statusCode, body, err
1012}
1013
1014func doRequestFollowRedirects(req *Request, resp *Response, url string, maxRedirectsCount int, c clientDoer) (statusCode int, body []byte, err error) {
1015 redirectsCount := 0
1016
1017 for {
1018 req.SetRequestURI(url)
1019 if err := req.parseURI(); err != nil {
1020 return 0, nil, err
1021 }
1022
1023 if err = c.Do(req, resp); err != nil {
1024 break
1025 }
1026 statusCode = resp.Header.StatusCode()
1027 if !StatusCodeIsRedirect(statusCode) {
1028 break
1029 }
1030
1031 redirectsCount++
1032 if redirectsCount > maxRedirectsCount {
1033 err = ErrTooManyRedirects
1034 break
1035 }
1036 location := resp.Header.peek(strLocation)
1037 if len(location) == 0 {
1038 err = ErrMissingLocation
1039 break
1040 }
1041 url = getRedirectURL(url, location)
1042 }
1043
1044 return statusCode, body, err
1045}
1046
1047func getRedirectURL(baseURL string, location []byte) string {
1048 u := AcquireURI()
1049 u.Update(baseURL)
1050 u.UpdateBytes(location)
1051 redirectURL := u.String()
1052 ReleaseURI(u)
1053 return redirectURL
1054}
1055
1056// StatusCodeIsRedirect returns true if the status code indicates a redirect.
1057func StatusCodeIsRedirect(statusCode int) bool {
1058 return statusCode == StatusMovedPermanently ||
1059 statusCode == StatusFound ||
1060 statusCode == StatusSeeOther ||
1061 statusCode == StatusTemporaryRedirect ||
1062 statusCode == StatusPermanentRedirect
1063}
1064
1065var (
1066 requestPool sync.Pool
1067 responsePool sync.Pool
1068)
1069
1070// AcquireRequest returns an empty Request instance from request pool.
1071//
1072// The returned Request instance may be passed to ReleaseRequest when it is
1073// no longer needed. This allows Request recycling, reduces GC pressure
1074// and usually improves performance.
1075func AcquireRequest() *Request {
1076 v := requestPool.Get()
1077 if v == nil {
1078 return &Request{}
1079 }
1080 return v.(*Request)
1081}
1082
1083// ReleaseRequest returns req acquired via AcquireRequest to request pool.
1084//
1085// It is forbidden accessing req and/or its' members after returning
1086// it to request pool.
1087func ReleaseRequest(req *Request) {
1088 req.Reset()
1089 requestPool.Put(req)
1090}
1091
1092// AcquireResponse returns an empty Response instance from response pool.
1093//
1094// The returned Response instance may be passed to ReleaseResponse when it is
1095// no longer needed. This allows Response recycling, reduces GC pressure
1096// and usually improves performance.
1097func AcquireResponse() *Response {
1098 v := responsePool.Get()
1099 if v == nil {
1100 return &Response{}
1101 }
1102 return v.(*Response)
1103}
1104
1105// ReleaseResponse return resp acquired via AcquireResponse to response pool.
1106//
1107// It is forbidden accessing resp and/or its' members after returning
1108// it to response pool.
1109func ReleaseResponse(resp *Response) {
1110 resp.Reset()
1111 responsePool.Put(resp)
1112}
1113
1114// DoTimeout performs the given request and waits for response during
1115// the given timeout duration.
1116//
1117// Request must contain at least non-zero RequestURI with full url (including
1118// scheme and host) or non-zero Host header + RequestURI.
1119//
1120// The function doesn't follow redirects. Use Get* for following redirects.
1121//
1122// Response is ignored if resp is nil.
1123//
1124// ErrTimeout is returned if the response wasn't returned during
1125// the given timeout.
1126//
1127// ErrNoFreeConns is returned if all HostClient.MaxConns connections
1128// to the host are busy.
1129//
1130// It is recommended obtaining req and resp via AcquireRequest
1131// and AcquireResponse in performance-critical code.
1132//
1133// Warning: DoTimeout does not terminate the request itself. The request will
1134// continue in the background and the response will be discarded.
1135// If requests take too long and the connection pool gets filled up please
1136// try setting a ReadTimeout.
1137func (c *HostClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
1138 return clientDoTimeout(req, resp, timeout, c)
1139}
1140
1141// DoDeadline performs the given request and waits for response until
1142// the given deadline.
1143//
1144// Request must contain at least non-zero RequestURI with full url (including
1145// scheme and host) or non-zero Host header + RequestURI.
1146//
1147// The function doesn't follow redirects. Use Get* for following redirects.
1148//
1149// Response is ignored if resp is nil.
1150//
1151// ErrTimeout is returned if the response wasn't returned until
1152// the given deadline.
1153//
1154// ErrNoFreeConns is returned if all HostClient.MaxConns connections
1155// to the host are busy.
1156//
1157// It is recommended obtaining req and resp via AcquireRequest
1158// and AcquireResponse in performance-critical code.
1159func (c *HostClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
1160 return clientDoDeadline(req, resp, deadline, c)
1161}
1162
1163// DoRedirects performs the given http request and fills the given http response,
1164// following up to maxRedirectsCount redirects. When the redirect count exceeds
1165// maxRedirectsCount, ErrTooManyRedirects is returned.
1166//
1167// Request must contain at least non-zero RequestURI with full url (including
1168// scheme and host) or non-zero Host header + RequestURI.
1169//
1170// Client determines the server to be requested in the following order:
1171//
1172// - from RequestURI if it contains full url with scheme and host;
1173// - from Host header otherwise.
1174//
1175// Response is ignored if resp is nil.
1176//
1177// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
1178// to the requested host are busy.
1179//
1180// It is recommended obtaining req and resp via AcquireRequest
1181// and AcquireResponse in performance-critical code.
1182func (c *HostClient) DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
1183 _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, c)
1184 return err
1185}
1186
1187func clientDoTimeout(req *Request, resp *Response, timeout time.Duration, c clientDoer) error {
1188 deadline := time.Now().Add(timeout)
1189 return clientDoDeadline(req, resp, deadline, c)
1190}
1191
1192func clientDoDeadline(req *Request, resp *Response, deadline time.Time, c clientDoer) error {
1193 timeout := -time.Since(deadline)
1194 if timeout <= 0 {
1195 return ErrTimeout
1196 }
1197
1198 var ch chan error
1199 chv := errorChPool.Get()
1200 if chv == nil {
1201 chv = make(chan error, 1)
1202 }
1203 ch = chv.(chan error)
1204
1205 // Make req and resp copies, since on timeout they no longer
1206 // may be accessed.
1207 reqCopy := AcquireRequest()
1208 req.copyToSkipBody(reqCopy)
1209 swapRequestBody(req, reqCopy)
1210 respCopy := AcquireResponse()
1211 if resp != nil {
1212 // Not calling resp.copyToSkipBody(respCopy) here to avoid
1213 // unexpected messing with headers
1214 respCopy.SkipBody = resp.SkipBody
1215 }
1216
1217 // Note that the request continues execution on ErrTimeout until
1218 // client-specific ReadTimeout exceeds. This helps limiting load
1219 // on slow hosts by MaxConns* concurrent requests.
1220 //
1221 // Without this 'hack' the load on slow host could exceed MaxConns*
1222 // concurrent requests, since timed out requests on client side
1223 // usually continue execution on the host.
1224
1225 var mu sync.Mutex
1226 var timedout, responded bool
1227
1228 go func() {
1229 reqCopy.timeout = timeout
1230 errDo := c.Do(reqCopy, respCopy)
1231 mu.Lock()
1232 {
1233 if !timedout {
1234 if resp != nil {
1235 respCopy.copyToSkipBody(resp)
1236 swapResponseBody(resp, respCopy)
1237 }
1238 swapRequestBody(reqCopy, req)
1239 ch <- errDo
1240 responded = true
1241 }
1242 }
1243 mu.Unlock()
1244
1245 ReleaseResponse(respCopy)
1246 ReleaseRequest(reqCopy)
1247 }()
1248
1249 tc := AcquireTimer(timeout)
1250 var err error
1251 select {
1252 case err = <-ch:
1253 case <-tc.C:
1254 mu.Lock()
1255 {
1256 if responded {
1257 err = <-ch
1258 } else {
1259 timedout = true
1260 err = ErrTimeout
1261 }
1262 }
1263 mu.Unlock()
1264 }
1265 ReleaseTimer(tc)
1266
1267 errorChPool.Put(chv)
1268
1269 return err
1270}
1271
1272var errorChPool sync.Pool
1273
1274// Do performs the given http request and sets the corresponding response.
1275//
1276// Request must contain at least non-zero RequestURI with full url (including
1277// scheme and host) or non-zero Host header + RequestURI.
1278//
1279// The function doesn't follow redirects. Use Get* for following redirects.
1280//
1281// Response is ignored if resp is nil.
1282//
1283// ErrNoFreeConns is returned if all HostClient.MaxConns connections
1284// to the host are busy.
1285//
1286// It is recommended obtaining req and resp via AcquireRequest
1287// and AcquireResponse in performance-critical code.
1288func (c *HostClient) Do(req *Request, resp *Response) error {
1289 var err error
1290 var retry bool
1291 maxAttempts := c.MaxIdemponentCallAttempts
1292 if maxAttempts <= 0 {
1293 maxAttempts = DefaultMaxIdemponentCallAttempts
1294 }
1295 isRequestRetryable := isIdempotent
1296 if c.RetryIf != nil {
1297 isRequestRetryable = c.RetryIf
1298 }
1299 attempts := 0
1300 hasBodyStream := req.IsBodyStream()
1301
1302 atomic.AddInt32(&c.pendingRequests, 1)
1303 for {
1304 retry, err = c.do(req, resp)
1305 if err == nil || !retry {
1306 break
1307 }
1308
1309 if hasBodyStream {
1310 break
1311 }
1312 if !isRequestRetryable(req) {
1313 // Retry non-idempotent requests if the server closes
1314 // the connection before sending the response.
1315 //
1316 // This case is possible if the server closes the idle
1317 // keep-alive connection on timeout.
1318 //
1319 // Apache and nginx usually do this.
1320 if err != io.EOF {
1321 break
1322 }
1323 }
1324 attempts++
1325 if attempts >= maxAttempts {
1326 break
1327 }
1328 }
1329 atomic.AddInt32(&c.pendingRequests, -1)
1330
1331 if err == io.EOF {
1332 err = ErrConnectionClosed
1333 }
1334 return err
1335}
1336
1337// PendingRequests returns the current number of requests the client
1338// is executing.
1339//
1340// This function may be used for balancing load among multiple HostClient
1341// instances.
1342func (c *HostClient) PendingRequests() int {
1343 return int(atomic.LoadInt32(&c.pendingRequests))
1344}
1345
1346func isIdempotent(req *Request) bool {
1347 return req.Header.IsGet() || req.Header.IsHead() || req.Header.IsPut()
1348}
1349
1350func (c *HostClient) do(req *Request, resp *Response) (bool, error) {
1351 nilResp := false
1352 if resp == nil {
1353 nilResp = true
1354 resp = AcquireResponse()
1355 }
1356
1357 ok, err := c.doNonNilReqResp(req, resp)
1358
1359 if nilResp {
1360 ReleaseResponse(resp)
1361 }
1362
1363 return ok, err
1364}
1365
1366func (c *HostClient) doNonNilReqResp(req *Request, resp *Response) (bool, error) {
1367 if req == nil {
1368 panic("BUG: req cannot be nil")
1369 }
1370 if resp == nil {
1371 panic("BUG: resp cannot be nil")
1372 }
1373
1374 // Secure header error logs configuration
1375 resp.secureErrorLogMessage = c.SecureErrorLogMessage
1376 resp.Header.secureErrorLogMessage = c.SecureErrorLogMessage
1377 req.secureErrorLogMessage = c.SecureErrorLogMessage
1378 req.Header.secureErrorLogMessage = c.SecureErrorLogMessage
1379
1380 if c.IsTLS != req.URI().isHttps() {
1381 return false, ErrHostClientRedirectToDifferentScheme
1382 }
1383
1384 atomic.StoreUint32(&c.lastUseTime, uint32(time.Now().Unix()-startTimeUnix))
1385
1386 // Free up resources occupied by response before sending the request,
1387 // so the GC may reclaim these resources (e.g. response body).
1388
1389 // backing up SkipBody in case it was set explicitly
1390 customSkipBody := resp.SkipBody
1391 resp.Reset()
1392 resp.SkipBody = customSkipBody
1393
1394 req.URI().DisablePathNormalizing = c.DisablePathNormalizing
1395
1396 userAgentOld := req.Header.UserAgent()
1397 if len(userAgentOld) == 0 {
1398 req.Header.userAgent = append(req.Header.userAgent[:0], c.getClientName()...)
1399 }
1400
1401 if c.Transport != nil {
1402 err := c.Transport(req, resp)
1403 return err == nil, err
1404 }
1405
1406 cc, err := c.acquireConn(req.timeout, req.ConnectionClose())
1407 if err != nil {
1408 return false, err
1409 }
1410 conn := cc.c
1411
1412 resp.parseNetConn(conn)
1413
1414 if c.WriteTimeout > 0 {
1415 // Set Deadline every time, since golang has fixed the performance issue
1416 // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
1417 currentTime := time.Now()
1418 if err = conn.SetWriteDeadline(currentTime.Add(c.WriteTimeout)); err != nil {
1419 c.closeConn(cc)
1420 return true, err
1421 }
1422 }
1423
1424 resetConnection := false
1425 if c.MaxConnDuration > 0 && time.Since(cc.createdTime) > c.MaxConnDuration && !req.ConnectionClose() {
1426 req.SetConnectionClose()
1427 resetConnection = true
1428 }
1429
1430 bw := c.acquireWriter(conn)
1431 err = req.Write(bw)
1432
1433 if resetConnection {
1434 req.Header.ResetConnectionClose()
1435 }
1436
1437 if err == nil {
1438 err = bw.Flush()
1439 }
1440 if err != nil {
1441 c.releaseWriter(bw)
1442 c.closeConn(cc)
1443 return true, err
1444 }
1445 c.releaseWriter(bw)
1446
1447 if c.ReadTimeout > 0 {
1448 // Set Deadline every time, since golang has fixed the performance issue
1449 // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
1450 currentTime := time.Now()
1451 if err = conn.SetReadDeadline(currentTime.Add(c.ReadTimeout)); err != nil {
1452 c.closeConn(cc)
1453 return true, err
1454 }
1455 }
1456
1457 if customSkipBody || req.Header.IsHead() {
1458 resp.SkipBody = true
1459 }
1460 if c.DisableHeaderNamesNormalizing {
1461 resp.Header.DisableNormalizing()
1462 }
1463
1464 br := c.acquireReader(conn)
1465 if err = resp.ReadLimitBody(br, c.MaxResponseBodySize); err != nil {
1466 c.releaseReader(br)
1467 c.closeConn(cc)
1468 // Don't retry in case of ErrBodyTooLarge since we will just get the same again.
1469 retry := err != ErrBodyTooLarge
1470 return retry, err
1471 }
1472 c.releaseReader(br)
1473
1474 if resetConnection || req.ConnectionClose() || resp.ConnectionClose() {
1475 c.closeConn(cc)
1476 } else {
1477 c.releaseConn(cc)
1478 }
1479
1480 return false, err
1481}
1482
1483var (
1484 // ErrNoFreeConns is returned when no free connections available
1485 // to the given host.
1486 //
1487 // Increase the allowed number of connections per host if you
1488 // see this error.
1489 ErrNoFreeConns = errors.New("no free connections available to host")
1490
1491 // ErrConnectionClosed may be returned from client methods if the server
1492 // closes connection before returning the first response byte.
1493 //
1494 // If you see this error, then either fix the server by returning
1495 // 'Connection: close' response header before closing the connection
1496 // or add 'Connection: close' request header before sending requests
1497 // to broken server.
1498 ErrConnectionClosed = errors.New("the server closed connection before returning the first response byte. " +
1499 "Make sure the server returns 'Connection: close' response header before closing the connection")
1500)
1501
1502type timeoutError struct{}
1503
1504func (e *timeoutError) Error() string {
1505 return "timeout"
1506}
1507
1508// Only implement the Timeout() function of the net.Error interface.
1509// This allows for checks like:
1510//
1511// if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
1512func (e *timeoutError) Timeout() bool {
1513 return true
1514}
1515
1516// ErrTimeout is returned from timed out calls.
1517var ErrTimeout = &timeoutError{}
1518
1519// SetMaxConns sets up the maximum number of connections which may be established to all hosts listed in Addr.
1520func (c *HostClient) SetMaxConns(newMaxConns int) {
1521 c.connsLock.Lock()
1522 c.MaxConns = newMaxConns
1523 c.connsLock.Unlock()
1524}
1525
1526func (c *HostClient) acquireConn(reqTimeout time.Duration, connectionClose bool) (cc *clientConn, err error) {
1527 createConn := false
1528 startCleaner := false
1529
1530 var n int
1531 c.connsLock.Lock()
1532 n = len(c.conns)
1533 if n == 0 {
1534 maxConns := c.MaxConns
1535 if maxConns <= 0 {
1536 maxConns = DefaultMaxConnsPerHost
1537 }
1538 if c.connsCount < maxConns {
1539 c.connsCount++
1540 createConn = true
1541 if !c.connsCleanerRun && !connectionClose {
1542 startCleaner = true
1543 c.connsCleanerRun = true
1544 }
1545 }
1546 } else {
1547 n--
1548 cc = c.conns[n]
1549 c.conns[n] = nil
1550 c.conns = c.conns[:n]
1551 }
1552 c.connsLock.Unlock()
1553
1554 if cc != nil {
1555 return cc, nil
1556 }
1557 if !createConn {
1558 if c.MaxConnWaitTimeout <= 0 {
1559 return nil, ErrNoFreeConns
1560 }
1561
1562 // reqTimeout c.MaxConnWaitTimeout wait duration
1563 // d1 d2 min(d1, d2)
1564 // 0(not set) d2 d2
1565 // d1 0(don't wait) 0(don't wait)
1566 // 0(not set) d2 d2
1567 timeout := c.MaxConnWaitTimeout
1568 timeoutOverridden := false
1569 // reqTimeout == 0 means not set
1570 if reqTimeout > 0 && reqTimeout < timeout {
1571 timeout = reqTimeout
1572 timeoutOverridden = true
1573 }
1574
1575 // wait for a free connection
1576 tc := AcquireTimer(timeout)
1577 defer ReleaseTimer(tc)
1578
1579 w := &wantConn{
1580 ready: make(chan struct{}, 1),
1581 }
1582 defer func() {
1583 if err != nil {
1584 w.cancel(c, err)
1585 }
1586 }()
1587
1588 c.queueForIdle(w)
1589
1590 select {
1591 case <-w.ready:
1592 return w.conn, w.err
1593 case <-tc.C:
1594 if timeoutOverridden {
1595 return nil, ErrTimeout
1596 }
1597 return nil, ErrNoFreeConns
1598 }
1599 }
1600
1601 if startCleaner {
1602 go c.connsCleaner()
1603 }
1604
1605 conn, err := c.dialHostHard()
1606 if err != nil {
1607 c.decConnsCount()
1608 return nil, err
1609 }
1610 cc = acquireClientConn(conn)
1611
1612 return cc, nil
1613}
1614
1615func (c *HostClient) queueForIdle(w *wantConn) {
1616 c.connsLock.Lock()
1617 defer c.connsLock.Unlock()
1618 if c.connsWait == nil {
1619 c.connsWait = &wantConnQueue{}
1620 }
1621 c.connsWait.clearFront()
1622 c.connsWait.pushBack(w)
1623}
1624
1625func (c *HostClient) dialConnFor(w *wantConn) {
1626 conn, err := c.dialHostHard()
1627 if err != nil {
1628 w.tryDeliver(nil, err)
1629 c.decConnsCount()
1630 return
1631 }
1632
1633 cc := acquireClientConn(conn)
1634 delivered := w.tryDeliver(cc, nil)
1635 if !delivered {
1636 // not delivered, return idle connection
1637 c.releaseConn(cc)
1638 }
1639}
1640
1641// CloseIdleConnections closes any connections which were previously
1642// connected from previous requests but are now sitting idle in a
1643// "keep-alive" state. It does not interrupt any connections currently
1644// in use.
1645func (c *HostClient) CloseIdleConnections() {
1646 c.connsLock.Lock()
1647 scratch := append([]*clientConn{}, c.conns...)
1648 for i := range c.conns {
1649 c.conns[i] = nil
1650 }
1651 c.conns = c.conns[:0]
1652 c.connsLock.Unlock()
1653
1654 for _, cc := range scratch {
1655 c.closeConn(cc)
1656 }
1657}
1658
1659func (c *HostClient) connsCleaner() {
1660 var (
1661 scratch []*clientConn
1662 maxIdleConnDuration = c.MaxIdleConnDuration
1663 )
1664 if maxIdleConnDuration <= 0 {
1665 maxIdleConnDuration = DefaultMaxIdleConnDuration
1666 }
1667 for {
1668 currentTime := time.Now()
1669
1670 // Determine idle connections to be closed.
1671 c.connsLock.Lock()
1672 conns := c.conns
1673 n := len(conns)
1674 i := 0
1675 for i < n && currentTime.Sub(conns[i].lastUseTime) > maxIdleConnDuration {
1676 i++
1677 }
1678 sleepFor := maxIdleConnDuration
1679 if i < n {
1680 // + 1 so we actually sleep past the expiration time and not up to it.
1681 // Otherwise the > check above would still fail.
1682 sleepFor = maxIdleConnDuration - currentTime.Sub(conns[i].lastUseTime) + 1
1683 }
1684 scratch = append(scratch[:0], conns[:i]...)
1685 if i > 0 {
1686 m := copy(conns, conns[i:])
1687 for i = m; i < n; i++ {
1688 conns[i] = nil
1689 }
1690 c.conns = conns[:m]
1691 }
1692 c.connsLock.Unlock()
1693
1694 // Close idle connections.
1695 for i, cc := range scratch {
1696 c.closeConn(cc)
1697 scratch[i] = nil
1698 }
1699
1700 // Determine whether to stop the connsCleaner.
1701 c.connsLock.Lock()
1702 mustStop := c.connsCount == 0
1703 if mustStop {
1704 c.connsCleanerRun = false
1705 }
1706 c.connsLock.Unlock()
1707 if mustStop {
1708 break
1709 }
1710
1711 time.Sleep(sleepFor)
1712 }
1713}
1714
1715func (c *HostClient) closeConn(cc *clientConn) {
1716 c.decConnsCount()
1717 cc.c.Close()
1718 releaseClientConn(cc)
1719}
1720
1721func (c *HostClient) decConnsCount() {
1722 if c.MaxConnWaitTimeout <= 0 {
1723 c.connsLock.Lock()
1724 c.connsCount--
1725 c.connsLock.Unlock()
1726 return
1727 }
1728
1729 c.connsLock.Lock()
1730 defer c.connsLock.Unlock()
1731 dialed := false
1732 if q := c.connsWait; q != nil && q.len() > 0 {
1733 for q.len() > 0 {
1734 w := q.popFront()
1735 if w.waiting() {
1736 go c.dialConnFor(w)
1737 dialed = true
1738 break
1739 }
1740 }
1741 }
1742 if !dialed {
1743 c.connsCount--
1744 }
1745}
1746
1747// ConnsCount returns connection count of HostClient
1748func (c *HostClient) ConnsCount() int {
1749 c.connsLock.Lock()
1750 defer c.connsLock.Unlock()
1751
1752 return c.connsCount
1753}
1754
1755func acquireClientConn(conn net.Conn) *clientConn {
1756 v := clientConnPool.Get()
1757 if v == nil {
1758 v = &clientConn{}
1759 }
1760 cc := v.(*clientConn)
1761 cc.c = conn
1762 cc.createdTime = time.Now()
1763 return cc
1764}
1765
1766func releaseClientConn(cc *clientConn) {
1767 // Reset all fields.
1768 *cc = clientConn{}
1769 clientConnPool.Put(cc)
1770}
1771
1772var clientConnPool sync.Pool
1773
1774func (c *HostClient) releaseConn(cc *clientConn) {
1775 cc.lastUseTime = time.Now()
1776 if c.MaxConnWaitTimeout <= 0 {
1777 c.connsLock.Lock()
1778 c.conns = append(c.conns, cc)
1779 c.connsLock.Unlock()
1780 return
1781 }
1782
1783 // try to deliver an idle connection to a *wantConn
1784 c.connsLock.Lock()
1785 defer c.connsLock.Unlock()
1786 delivered := false
1787 if q := c.connsWait; q != nil && q.len() > 0 {
1788 for q.len() > 0 {
1789 w := q.popFront()
1790 if w.waiting() {
1791 delivered = w.tryDeliver(cc, nil)
1792 break
1793 }
1794 }
1795 }
1796 if !delivered {
1797 c.conns = append(c.conns, cc)
1798 }
1799}
1800
1801func (c *HostClient) acquireWriter(conn net.Conn) *bufio.Writer {
1802 var v interface{}
1803 if c.clientWriterPool != nil {
1804 v = c.clientWriterPool.Get()
1805 if v == nil {
1806 n := c.WriteBufferSize
1807 if n <= 0 {
1808 n = defaultWriteBufferSize
1809 }
1810 return bufio.NewWriterSize(conn, n)
1811 }
1812 } else {
1813 v = c.writerPool.Get()
1814 if v == nil {
1815 n := c.WriteBufferSize
1816 if n <= 0 {
1817 n = defaultWriteBufferSize
1818 }
1819 return bufio.NewWriterSize(conn, n)
1820 }
1821 }
1822
1823 bw := v.(*bufio.Writer)
1824 bw.Reset(conn)
1825 return bw
1826}
1827
1828func (c *HostClient) releaseWriter(bw *bufio.Writer) {
1829 if c.clientWriterPool != nil {
1830 c.clientWriterPool.Put(bw)
1831 } else {
1832 c.writerPool.Put(bw)
1833 }
1834}
1835
1836func (c *HostClient) acquireReader(conn net.Conn) *bufio.Reader {
1837 var v interface{}
1838 if c.clientReaderPool != nil {
1839 v = c.clientReaderPool.Get()
1840 if v == nil {
1841 n := c.ReadBufferSize
1842 if n <= 0 {
1843 n = defaultReadBufferSize
1844 }
1845 return bufio.NewReaderSize(conn, n)
1846 }
1847 } else {
1848 v = c.readerPool.Get()
1849 if v == nil {
1850 n := c.ReadBufferSize
1851 if n <= 0 {
1852 n = defaultReadBufferSize
1853 }
1854 return bufio.NewReaderSize(conn, n)
1855 }
1856 }
1857
1858 br := v.(*bufio.Reader)
1859 br.Reset(conn)
1860 return br
1861}
1862
1863func (c *HostClient) releaseReader(br *bufio.Reader) {
1864 if c.clientReaderPool != nil {
1865 c.clientReaderPool.Put(br)
1866 } else {
1867 c.readerPool.Put(br)
1868 }
1869}
1870
1871func newClientTLSConfig(c *tls.Config, addr string) *tls.Config {
1872 if c == nil {
1873 c = &tls.Config{}
1874 } else {
1875 c = c.Clone()
1876 }
1877
1878 if c.ClientSessionCache == nil {
1879 c.ClientSessionCache = tls.NewLRUClientSessionCache(0)
1880 }
1881
1882 if len(c.ServerName) == 0 {
1883 serverName := tlsServerName(addr)
1884 if serverName == "*" {
1885 c.InsecureSkipVerify = true
1886 } else {
1887 c.ServerName = serverName
1888 }
1889 }
1890 return c
1891}
1892
1893func tlsServerName(addr string) string {
1894 if !strings.Contains(addr, ":") {
1895 return addr
1896 }
1897 host, _, err := net.SplitHostPort(addr)
1898 if err != nil {
1899 return "*"
1900 }
1901 return host
1902}
1903
1904func (c *HostClient) nextAddr() string {
1905 c.addrsLock.Lock()
1906 if c.addrs == nil {
1907 c.addrs = strings.Split(c.Addr, ",")
1908 }
1909 addr := c.addrs[0]
1910 if len(c.addrs) > 1 {
1911 addr = c.addrs[c.addrIdx%uint32(len(c.addrs))]
1912 c.addrIdx++
1913 }
1914 c.addrsLock.Unlock()
1915 return addr
1916}
1917
1918func (c *HostClient) dialHostHard() (conn net.Conn, err error) {
1919 // attempt to dial all the available hosts before giving up.
1920
1921 c.addrsLock.Lock()
1922 n := len(c.addrs)
1923 c.addrsLock.Unlock()
1924
1925 if n == 0 {
1926 // It looks like c.addrs isn't initialized yet.
1927 n = 1
1928 }
1929
1930 timeout := c.ReadTimeout + c.WriteTimeout
1931 if timeout <= 0 {
1932 timeout = DefaultDialTimeout
1933 }
1934 deadline := time.Now().Add(timeout)
1935 for n > 0 {
1936 addr := c.nextAddr()
1937 tlsConfig := c.cachedTLSConfig(addr)
1938 conn, err = dialAddr(addr, c.Dial, c.DialDualStack, c.IsTLS, tlsConfig, c.WriteTimeout)
1939 if err == nil {
1940 return conn, nil
1941 }
1942 if time.Since(deadline) >= 0 {
1943 break
1944 }
1945 n--
1946 }
1947 return nil, err
1948}
1949
1950func (c *HostClient) cachedTLSConfig(addr string) *tls.Config {
1951 if !c.IsTLS {
1952 return nil
1953 }
1954
1955 c.tlsConfigMapLock.Lock()
1956 if c.tlsConfigMap == nil {
1957 c.tlsConfigMap = make(map[string]*tls.Config)
1958 }
1959 cfg := c.tlsConfigMap[addr]
1960 if cfg == nil {
1961 cfg = newClientTLSConfig(c.TLSConfig, addr)
1962 c.tlsConfigMap[addr] = cfg
1963 }
1964 c.tlsConfigMapLock.Unlock()
1965
1966 return cfg
1967}
1968
1969// ErrTLSHandshakeTimeout indicates there is a timeout from tls handshake.
1970var ErrTLSHandshakeTimeout = errors.New("tls handshake timed out")
1971
1972var timeoutErrorChPool sync.Pool
1973
1974func tlsClientHandshake(rawConn net.Conn, tlsConfig *tls.Config, timeout time.Duration) (net.Conn, error) {
1975 tc := AcquireTimer(timeout)
1976 defer ReleaseTimer(tc)
1977
1978 var ch chan error
1979 chv := timeoutErrorChPool.Get()
1980 if chv == nil {
1981 chv = make(chan error)
1982 }
1983 ch = chv.(chan error)
1984 defer timeoutErrorChPool.Put(chv)
1985
1986 conn := tls.Client(rawConn, tlsConfig)
1987
1988 go func() {
1989 ch <- conn.Handshake()
1990 }()
1991
1992 select {
1993 case <-tc.C:
1994 rawConn.Close()
1995 <-ch
1996 return nil, ErrTLSHandshakeTimeout
1997 case err := <-ch:
1998 if err != nil {
1999 rawConn.Close()
2000 return nil, err
2001 }
2002 return conn, nil
2003 }
2004}
2005
2006func dialAddr(addr string, dial DialFunc, dialDualStack, isTLS bool, tlsConfig *tls.Config, timeout time.Duration) (net.Conn, error) {
2007 if dial == nil {
2008 if dialDualStack {
2009 dial = DialDualStack
2010 } else {
2011 dial = Dial
2012 }
2013 addr = addMissingPort(addr, isTLS)
2014 }
2015 conn, err := dial(addr)
2016 if err != nil {
2017 return nil, err
2018 }
2019 if conn == nil {
2020 panic("BUG: DialFunc returned (nil, nil)")
2021 }
2022 _, isTLSAlready := conn.(*tls.Conn)
2023 if isTLS && !isTLSAlready {
2024 if timeout == 0 {
2025 return tls.Client(conn, tlsConfig), nil
2026 }
2027 return tlsClientHandshake(conn, tlsConfig, timeout)
2028 }
2029 return conn, nil
2030}
2031
2032func (c *HostClient) getClientName() []byte {
2033 v := c.clientName.Load()
2034 var clientName []byte
2035 if v == nil {
2036 clientName = []byte(c.Name)
2037 if len(clientName) == 0 && !c.NoDefaultUserAgentHeader {
2038 clientName = defaultUserAgent
2039 }
2040 c.clientName.Store(clientName)
2041 } else {
2042 clientName = v.([]byte)
2043 }
2044 return clientName
2045}
2046
2047func addMissingPort(addr string, isTLS bool) string {
2048 n := strings.Index(addr, ":")
2049 if n >= 0 {
2050 return addr
2051 }
2052 port := 80
2053 if isTLS {
2054 port = 443
2055 }
2056 return net.JoinHostPort(addr, strconv.Itoa(port))
2057}
2058
2059// A wantConn records state about a wanted connection
2060// (that is, an active call to getConn).
2061// The conn may be gotten by dialing or by finding an idle connection,
2062// or a cancellation may make the conn no longer wanted.
2063// These three options are racing against each other and use
2064// wantConn to coordinate and agree about the winning outcome.
2065//
2066// inspired by net/http/transport.go
2067type wantConn struct {
2068 ready chan struct{}
2069 mu sync.Mutex // protects conn, err, close(ready)
2070 conn *clientConn
2071 err error
2072}
2073
2074// waiting reports whether w is still waiting for an answer (connection or error).
2075func (w *wantConn) waiting() bool {
2076 select {
2077 case <-w.ready:
2078 return false
2079 default:
2080 return true
2081 }
2082}
2083
2084// tryDeliver attempts to deliver conn, err to w and reports whether it succeeded.
2085func (w *wantConn) tryDeliver(conn *clientConn, err error) bool {
2086 w.mu.Lock()
2087 defer w.mu.Unlock()
2088
2089 if w.conn != nil || w.err != nil {
2090 return false
2091 }
2092 w.conn = conn
2093 w.err = err
2094 if w.conn == nil && w.err == nil {
2095 panic("fasthttp: internal error: misuse of tryDeliver")
2096 }
2097 close(w.ready)
2098 return true
2099}
2100
2101// cancel marks w as no longer wanting a result (for example, due to cancellation).
2102// If a connection has been delivered already, cancel returns it with c.releaseConn.
2103func (w *wantConn) cancel(c *HostClient, err error) {
2104 w.mu.Lock()
2105 if w.conn == nil && w.err == nil {
2106 close(w.ready) // catch misbehavior in future delivery
2107 }
2108
2109 conn := w.conn
2110 w.conn = nil
2111 w.err = err
2112 w.mu.Unlock()
2113
2114 if conn != nil {
2115 c.releaseConn(conn)
2116 }
2117}
2118
2119// A wantConnQueue is a queue of wantConns.
2120//
2121// inspired by net/http/transport.go
2122type wantConnQueue struct {
2123 // This is a queue, not a deque.
2124 // It is split into two stages - head[headPos:] and tail.
2125 // popFront is trivial (headPos++) on the first stage, and
2126 // pushBack is trivial (append) on the second stage.
2127 // If the first stage is empty, popFront can swap the
2128 // first and second stages to remedy the situation.
2129 //
2130 // This two-stage split is analogous to the use of two lists
2131 // in Okasaki's purely functional queue but without the
2132 // overhead of reversing the list when swapping stages.
2133 head []*wantConn
2134 headPos int
2135 tail []*wantConn
2136}
2137
2138// len returns the number of items in the queue.
2139func (q *wantConnQueue) len() int {
2140 return len(q.head) - q.headPos + len(q.tail)
2141}
2142
2143// pushBack adds w to the back of the queue.
2144func (q *wantConnQueue) pushBack(w *wantConn) {
2145 q.tail = append(q.tail, w)
2146}
2147
2148// popFront removes and returns the wantConn at the front of the queue.
2149func (q *wantConnQueue) popFront() *wantConn {
2150 if q.headPos >= len(q.head) {
2151 if len(q.tail) == 0 {
2152 return nil
2153 }
2154 // Pick up tail as new head, clear tail.
2155 q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
2156 }
2157
2158 w := q.head[q.headPos]
2159 q.head[q.headPos] = nil
2160 q.headPos++
2161 return w
2162}
2163
2164// peekFront returns the wantConn at the front of the queue without removing it.
2165func (q *wantConnQueue) peekFront() *wantConn {
2166 if q.headPos < len(q.head) {
2167 return q.head[q.headPos]
2168 }
2169 if len(q.tail) > 0 {
2170 return q.tail[0]
2171 }
2172 return nil
2173}
2174
2175// cleanFront pops any wantConns that are no longer waiting from the head of the
2176// queue, reporting whether any were popped.
2177func (q *wantConnQueue) clearFront() (cleaned bool) {
2178 for {
2179 w := q.peekFront()
2180 if w == nil || w.waiting() {
2181 return cleaned
2182 }
2183 q.popFront()
2184 cleaned = true
2185 }
2186}
2187
2188// PipelineClient pipelines requests over a limited set of concurrent
2189// connections to the given Addr.
2190//
2191// This client may be used in highly loaded HTTP-based RPC systems for reducing
2192// context switches and network level overhead.
2193// See https://en.wikipedia.org/wiki/HTTP_pipelining for details.
2194//
2195// It is forbidden copying PipelineClient instances. Create new instances
2196// instead.
2197//
2198// It is safe calling PipelineClient methods from concurrently running
2199// goroutines.
2200type PipelineClient struct {
2201 noCopy noCopy //nolint:unused,structcheck
2202
2203 // Address of the host to connect to.
2204 Addr string
2205
2206 // PipelineClient name. Used in User-Agent request header.
2207 Name string
2208
2209 // NoDefaultUserAgentHeader when set to true, causes the default
2210 // User-Agent header to be excluded from the Request.
2211 NoDefaultUserAgentHeader bool
2212
2213 // The maximum number of concurrent connections to the Addr.
2214 //
2215 // A single connection is used by default.
2216 MaxConns int
2217
2218 // The maximum number of pending pipelined requests over
2219 // a single connection to Addr.
2220 //
2221 // DefaultMaxPendingRequests is used by default.
2222 MaxPendingRequests int
2223
2224 // The maximum delay before sending pipelined requests as a batch
2225 // to the server.
2226 //
2227 // By default requests are sent immediately to the server.
2228 MaxBatchDelay time.Duration
2229
2230 // Callback for connection establishing to the host.
2231 //
2232 // Default Dial is used if not set.
2233 Dial DialFunc
2234
2235 // Attempt to connect to both ipv4 and ipv6 host addresses
2236 // if set to true.
2237 //
2238 // This option is used only if default TCP dialer is used,
2239 // i.e. if Dial is blank.
2240 //
2241 // By default client connects only to ipv4 addresses,
2242 // since unfortunately ipv6 remains broken in many networks worldwide :)
2243 DialDualStack bool
2244
2245 // Response header names are passed as-is without normalization
2246 // if this option is set.
2247 //
2248 // Disabled header names' normalization may be useful only for proxying
2249 // responses to other clients expecting case-sensitive
2250 // header names. See https://github.com/valyala/fasthttp/issues/57
2251 // for details.
2252 //
2253 // By default request and response header names are normalized, i.e.
2254 // The first letter and the first letters following dashes
2255 // are uppercased, while all the other letters are lowercased.
2256 // Examples:
2257 //
2258 // * HOST -> Host
2259 // * content-type -> Content-Type
2260 // * cONTENT-lenGTH -> Content-Length
2261 DisableHeaderNamesNormalizing bool
2262
2263 // Path values are sent as-is without normalization
2264 //
2265 // Disabled path normalization may be useful for proxying incoming requests
2266 // to servers that are expecting paths to be forwarded as-is.
2267 //
2268 // By default path values are normalized, i.e.
2269 // extra slashes are removed, special characters are encoded.
2270 DisablePathNormalizing bool
2271
2272 // Whether to use TLS (aka SSL or HTTPS) for host connections.
2273 IsTLS bool
2274
2275 // Optional TLS config.
2276 TLSConfig *tls.Config
2277
2278 // Idle connection to the host is closed after this duration.
2279 //
2280 // By default idle connection is closed after
2281 // DefaultMaxIdleConnDuration.
2282 MaxIdleConnDuration time.Duration
2283
2284 // Buffer size for responses' reading.
2285 // This also limits the maximum header size.
2286 //
2287 // Default buffer size is used if 0.
2288 ReadBufferSize int
2289
2290 // Buffer size for requests' writing.
2291 //
2292 // Default buffer size is used if 0.
2293 WriteBufferSize int
2294
2295 // Maximum duration for full response reading (including body).
2296 //
2297 // By default response read timeout is unlimited.
2298 ReadTimeout time.Duration
2299
2300 // Maximum duration for full request writing (including body).
2301 //
2302 // By default request write timeout is unlimited.
2303 WriteTimeout time.Duration
2304
2305 // Logger for logging client errors.
2306 //
2307 // By default standard logger from log package is used.
2308 Logger Logger
2309
2310 connClients []*pipelineConnClient
2311 connClientsLock sync.Mutex
2312}
2313
2314type pipelineConnClient struct {
2315 noCopy noCopy //nolint:unused,structcheck
2316
2317 Addr string
2318 Name string
2319 NoDefaultUserAgentHeader bool
2320 MaxPendingRequests int
2321 MaxBatchDelay time.Duration
2322 Dial DialFunc
2323 DialDualStack bool
2324 DisableHeaderNamesNormalizing bool
2325 DisablePathNormalizing bool
2326 IsTLS bool
2327 TLSConfig *tls.Config
2328 MaxIdleConnDuration time.Duration
2329 ReadBufferSize int
2330 WriteBufferSize int
2331 ReadTimeout time.Duration
2332 WriteTimeout time.Duration
2333 Logger Logger
2334
2335 workPool sync.Pool
2336
2337 chLock sync.Mutex
2338 chW chan *pipelineWork
2339 chR chan *pipelineWork
2340
2341 tlsConfigLock sync.Mutex
2342 tlsConfig *tls.Config
2343 clientName atomic.Value
2344}
2345
2346type pipelineWork struct {
2347 reqCopy Request
2348 respCopy Response
2349 req *Request
2350 resp *Response
2351 t *time.Timer
2352 deadline time.Time
2353 err error
2354 done chan struct{}
2355}
2356
2357// DoTimeout performs the given request and waits for response during
2358// the given timeout duration.
2359//
2360// Request must contain at least non-zero RequestURI with full url (including
2361// scheme and host) or non-zero Host header + RequestURI.
2362//
2363// The function doesn't follow redirects.
2364//
2365// Response is ignored if resp is nil.
2366//
2367// ErrTimeout is returned if the response wasn't returned during
2368// the given timeout.
2369//
2370// It is recommended obtaining req and resp via AcquireRequest
2371// and AcquireResponse in performance-critical code.
2372//
2373// Warning: DoTimeout does not terminate the request itself. The request will
2374// continue in the background and the response will be discarded.
2375// If requests take too long and the connection pool gets filled up please
2376// try setting a ReadTimeout.
2377func (c *PipelineClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
2378 return c.DoDeadline(req, resp, time.Now().Add(timeout))
2379}
2380
2381// DoDeadline performs the given request and waits for response until
2382// the given deadline.
2383//
2384// Request must contain at least non-zero RequestURI with full url (including
2385// scheme and host) or non-zero Host header + RequestURI.
2386//
2387// The function doesn't follow redirects.
2388//
2389// Response is ignored if resp is nil.
2390//
2391// ErrTimeout is returned if the response wasn't returned until
2392// the given deadline.
2393//
2394// It is recommended obtaining req and resp via AcquireRequest
2395// and AcquireResponse in performance-critical code.
2396func (c *PipelineClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
2397 return c.getConnClient().DoDeadline(req, resp, deadline)
2398}
2399
2400func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
2401 c.init()
2402
2403 timeout := -time.Since(deadline)
2404 if timeout < 0 {
2405 return ErrTimeout
2406 }
2407
2408 if c.DisablePathNormalizing {
2409 req.URI().DisablePathNormalizing = true
2410 }
2411
2412 userAgentOld := req.Header.UserAgent()
2413 if len(userAgentOld) == 0 {
2414 req.Header.userAgent = append(req.Header.userAgent[:0], c.getClientName()...)
2415 }
2416
2417 w := acquirePipelineWork(&c.workPool, timeout)
2418 w.respCopy.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
2419 w.req = &w.reqCopy
2420 w.resp = &w.respCopy
2421
2422 // Make a copy of the request in order to avoid data races on timeouts
2423 req.copyToSkipBody(&w.reqCopy)
2424 swapRequestBody(req, &w.reqCopy)
2425
2426 // Put the request to outgoing queue
2427 select {
2428 case c.chW <- w:
2429 // Fast path: len(c.ch) < cap(c.ch)
2430 default:
2431 // Slow path
2432 select {
2433 case c.chW <- w:
2434 case <-w.t.C:
2435 releasePipelineWork(&c.workPool, w)
2436 return ErrTimeout
2437 }
2438 }
2439
2440 // Wait for the response
2441 var err error
2442 select {
2443 case <-w.done:
2444 if resp != nil {
2445 w.respCopy.copyToSkipBody(resp)
2446 swapResponseBody(resp, &w.respCopy)
2447 }
2448 err = w.err
2449 releasePipelineWork(&c.workPool, w)
2450 case <-w.t.C:
2451 err = ErrTimeout
2452 }
2453
2454 return err
2455}
2456
2457// Do performs the given http request and sets the corresponding response.
2458//
2459// Request must contain at least non-zero RequestURI with full url (including
2460// scheme and host) or non-zero Host header + RequestURI.
2461//
2462// The function doesn't follow redirects. Use Get* for following redirects.
2463//
2464// Response is ignored if resp is nil.
2465//
2466// It is recommended obtaining req and resp via AcquireRequest
2467// and AcquireResponse in performance-critical code.
2468func (c *PipelineClient) Do(req *Request, resp *Response) error {
2469 return c.getConnClient().Do(req, resp)
2470}
2471
2472func (c *pipelineConnClient) Do(req *Request, resp *Response) error {
2473 c.init()
2474
2475 if c.DisablePathNormalizing {
2476 req.URI().DisablePathNormalizing = true
2477 }
2478
2479 userAgentOld := req.Header.UserAgent()
2480 if len(userAgentOld) == 0 {
2481 req.Header.userAgent = append(req.Header.userAgent[:0], c.getClientName()...)
2482 }
2483
2484 w := acquirePipelineWork(&c.workPool, 0)
2485 w.req = req
2486 if resp != nil {
2487 resp.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
2488 w.resp = resp
2489 } else {
2490 w.resp = &w.respCopy
2491 }
2492
2493 // Put the request to outgoing queue
2494 select {
2495 case c.chW <- w:
2496 default:
2497 // Try substituting the oldest w with the current one.
2498 select {
2499 case wOld := <-c.chW:
2500 wOld.err = ErrPipelineOverflow
2501 wOld.done <- struct{}{}
2502 default:
2503 }
2504 select {
2505 case c.chW <- w:
2506 default:
2507 releasePipelineWork(&c.workPool, w)
2508 return ErrPipelineOverflow
2509 }
2510 }
2511
2512 // Wait for the response
2513 <-w.done
2514 err := w.err
2515
2516 releasePipelineWork(&c.workPool, w)
2517
2518 return err
2519}
2520
2521func (c *PipelineClient) getConnClient() *pipelineConnClient {
2522 c.connClientsLock.Lock()
2523 cc := c.getConnClientUnlocked()
2524 c.connClientsLock.Unlock()
2525 return cc
2526}
2527
2528func (c *PipelineClient) getConnClientUnlocked() *pipelineConnClient {
2529 if len(c.connClients) == 0 {
2530 return c.newConnClient()
2531 }
2532
2533 // Return the client with the minimum number of pending requests.
2534 minCC := c.connClients[0]
2535 minReqs := minCC.PendingRequests()
2536 if minReqs == 0 {
2537 return minCC
2538 }
2539 for i := 1; i < len(c.connClients); i++ {
2540 cc := c.connClients[i]
2541 reqs := cc.PendingRequests()
2542 if reqs == 0 {
2543 return cc
2544 }
2545 if reqs < minReqs {
2546 minCC = cc
2547 minReqs = reqs
2548 }
2549 }
2550
2551 maxConns := c.MaxConns
2552 if maxConns <= 0 {
2553 maxConns = 1
2554 }
2555 if len(c.connClients) < maxConns {
2556 return c.newConnClient()
2557 }
2558 return minCC
2559}
2560
2561func (c *PipelineClient) newConnClient() *pipelineConnClient {
2562 cc := &pipelineConnClient{
2563 Addr: c.Addr,
2564 Name: c.Name,
2565 NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader,
2566 MaxPendingRequests: c.MaxPendingRequests,
2567 MaxBatchDelay: c.MaxBatchDelay,
2568 Dial: c.Dial,
2569 DialDualStack: c.DialDualStack,
2570 DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
2571 DisablePathNormalizing: c.DisablePathNormalizing,
2572 IsTLS: c.IsTLS,
2573 TLSConfig: c.TLSConfig,
2574 MaxIdleConnDuration: c.MaxIdleConnDuration,
2575 ReadBufferSize: c.ReadBufferSize,
2576 WriteBufferSize: c.WriteBufferSize,
2577 ReadTimeout: c.ReadTimeout,
2578 WriteTimeout: c.WriteTimeout,
2579 Logger: c.Logger,
2580 }
2581 c.connClients = append(c.connClients, cc)
2582 return cc
2583}
2584
2585// ErrPipelineOverflow may be returned from PipelineClient.Do*
2586// if the requests' queue is overflown.
2587var ErrPipelineOverflow = errors.New("pipelined requests' queue has been overflown. Increase MaxConns and/or MaxPendingRequests")
2588
2589// DefaultMaxPendingRequests is the default value
2590// for PipelineClient.MaxPendingRequests.
2591const DefaultMaxPendingRequests = 1024
2592
2593func (c *pipelineConnClient) init() {
2594 c.chLock.Lock()
2595 if c.chR == nil {
2596 maxPendingRequests := c.MaxPendingRequests
2597 if maxPendingRequests <= 0 {
2598 maxPendingRequests = DefaultMaxPendingRequests
2599 }
2600 c.chR = make(chan *pipelineWork, maxPendingRequests)
2601 if c.chW == nil {
2602 c.chW = make(chan *pipelineWork, maxPendingRequests)
2603 }
2604 go func() {
2605 // Keep restarting the worker if it fails (connection errors for example).
2606 for {
2607 if err := c.worker(); err != nil {
2608 c.logger().Printf("error in PipelineClient(%q): %s", c.Addr, err)
2609 if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
2610 // Throttle client reconnections on temporary errors
2611 time.Sleep(time.Second)
2612 }
2613 } else {
2614 c.chLock.Lock()
2615 stop := len(c.chR) == 0 && len(c.chW) == 0
2616 if !stop {
2617 c.chR = nil
2618 c.chW = nil
2619 }
2620 c.chLock.Unlock()
2621
2622 if stop {
2623 break
2624 }
2625 }
2626 }
2627 }()
2628 }
2629 c.chLock.Unlock()
2630}
2631
2632func (c *pipelineConnClient) worker() error {
2633 tlsConfig := c.cachedTLSConfig()
2634 conn, err := dialAddr(c.Addr, c.Dial, c.DialDualStack, c.IsTLS, tlsConfig, c.WriteTimeout)
2635 if err != nil {
2636 return err
2637 }
2638
2639 // Start reader and writer
2640 stopW := make(chan struct{})
2641 doneW := make(chan error)
2642 go func() {
2643 doneW <- c.writer(conn, stopW)
2644 }()
2645 stopR := make(chan struct{})
2646 doneR := make(chan error)
2647 go func() {
2648 doneR <- c.reader(conn, stopR)
2649 }()
2650
2651 // Wait until reader and writer are stopped
2652 select {
2653 case err = <-doneW:
2654 conn.Close()
2655 close(stopR)
2656 <-doneR
2657 case err = <-doneR:
2658 conn.Close()
2659 close(stopW)
2660 <-doneW
2661 }
2662
2663 // Notify pending readers
2664 for len(c.chR) > 0 {
2665 w := <-c.chR
2666 w.err = errPipelineConnStopped
2667 w.done <- struct{}{}
2668 }
2669
2670 return err
2671}
2672
2673func (c *pipelineConnClient) cachedTLSConfig() *tls.Config {
2674 if !c.IsTLS {
2675 return nil
2676 }
2677
2678 c.tlsConfigLock.Lock()
2679 cfg := c.tlsConfig
2680 if cfg == nil {
2681 cfg = newClientTLSConfig(c.TLSConfig, c.Addr)
2682 c.tlsConfig = cfg
2683 }
2684 c.tlsConfigLock.Unlock()
2685
2686 return cfg
2687}
2688
2689func (c *pipelineConnClient) writer(conn net.Conn, stopCh <-chan struct{}) error {
2690 writeBufferSize := c.WriteBufferSize
2691 if writeBufferSize <= 0 {
2692 writeBufferSize = defaultWriteBufferSize
2693 }
2694 bw := bufio.NewWriterSize(conn, writeBufferSize)
2695 defer bw.Flush()
2696 chR := c.chR
2697 chW := c.chW
2698 writeTimeout := c.WriteTimeout
2699
2700 maxIdleConnDuration := c.MaxIdleConnDuration
2701 if maxIdleConnDuration <= 0 {
2702 maxIdleConnDuration = DefaultMaxIdleConnDuration
2703 }
2704 maxBatchDelay := c.MaxBatchDelay
2705
2706 var (
2707 stopTimer = time.NewTimer(time.Hour)
2708 flushTimer = time.NewTimer(time.Hour)
2709 flushTimerCh <-chan time.Time
2710 instantTimerCh = make(chan time.Time)
2711
2712 w *pipelineWork
2713 err error
2714 )
2715 close(instantTimerCh)
2716 for {
2717 againChW:
2718 select {
2719 case w = <-chW:
2720 // Fast path: len(chW) > 0
2721 default:
2722 // Slow path
2723 stopTimer.Reset(maxIdleConnDuration)
2724 select {
2725 case w = <-chW:
2726 case <-stopTimer.C:
2727 return nil
2728 case <-stopCh:
2729 return nil
2730 case <-flushTimerCh:
2731 if err = bw.Flush(); err != nil {
2732 return err
2733 }
2734 flushTimerCh = nil
2735 goto againChW
2736 }
2737 }
2738
2739 if !w.deadline.IsZero() && time.Since(w.deadline) >= 0 {
2740 w.err = ErrTimeout
2741 w.done <- struct{}{}
2742 continue
2743 }
2744
2745 w.resp.parseNetConn(conn)
2746
2747 if writeTimeout > 0 {
2748 // Set Deadline every time, since golang has fixed the performance issue
2749 // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
2750 currentTime := time.Now()
2751 if err = conn.SetWriteDeadline(currentTime.Add(writeTimeout)); err != nil {
2752 w.err = err
2753 w.done <- struct{}{}
2754 return err
2755 }
2756 }
2757 if err = w.req.Write(bw); err != nil {
2758 w.err = err
2759 w.done <- struct{}{}
2760 return err
2761 }
2762 if flushTimerCh == nil && (len(chW) == 0 || len(chR) == cap(chR)) {
2763 if maxBatchDelay > 0 {
2764 flushTimer.Reset(maxBatchDelay)
2765 flushTimerCh = flushTimer.C
2766 } else {
2767 flushTimerCh = instantTimerCh
2768 }
2769 }
2770
2771 againChR:
2772 select {
2773 case chR <- w:
2774 // Fast path: len(chR) < cap(chR)
2775 default:
2776 // Slow path
2777 select {
2778 case chR <- w:
2779 case <-stopCh:
2780 w.err = errPipelineConnStopped
2781 w.done <- struct{}{}
2782 return nil
2783 case <-flushTimerCh:
2784 if err = bw.Flush(); err != nil {
2785 w.err = err
2786 w.done <- struct{}{}
2787 return err
2788 }
2789 flushTimerCh = nil
2790 goto againChR
2791 }
2792 }
2793 }
2794}
2795
2796func (c *pipelineConnClient) reader(conn net.Conn, stopCh <-chan struct{}) error {
2797 readBufferSize := c.ReadBufferSize
2798 if readBufferSize <= 0 {
2799 readBufferSize = defaultReadBufferSize
2800 }
2801 br := bufio.NewReaderSize(conn, readBufferSize)
2802 chR := c.chR
2803 readTimeout := c.ReadTimeout
2804
2805 var (
2806 w *pipelineWork
2807 err error
2808 )
2809 for {
2810 select {
2811 case w = <-chR:
2812 // Fast path: len(chR) > 0
2813 default:
2814 // Slow path
2815 select {
2816 case w = <-chR:
2817 case <-stopCh:
2818 return nil
2819 }
2820 }
2821
2822 if readTimeout > 0 {
2823 // Set Deadline every time, since golang has fixed the performance issue
2824 // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
2825 currentTime := time.Now()
2826 if err = conn.SetReadDeadline(currentTime.Add(readTimeout)); err != nil {
2827 w.err = err
2828 w.done <- struct{}{}
2829 return err
2830 }
2831 }
2832 if err = w.resp.Read(br); err != nil {
2833 w.err = err
2834 w.done <- struct{}{}
2835 return err
2836 }
2837
2838 w.done <- struct{}{}
2839 }
2840}
2841
2842func (c *pipelineConnClient) logger() Logger {
2843 if c.Logger != nil {
2844 return c.Logger
2845 }
2846 return defaultLogger
2847}
2848
2849// PendingRequests returns the current number of pending requests pipelined
2850// to the server.
2851//
2852// This number may exceed MaxPendingRequests*MaxConns by up to two times, since
2853// each connection to the server may keep up to MaxPendingRequests requests
2854// in the queue before sending them to the server.
2855//
2856// This function may be used for balancing load among multiple PipelineClient
2857// instances.
2858func (c *PipelineClient) PendingRequests() int {
2859 c.connClientsLock.Lock()
2860 n := 0
2861 for _, cc := range c.connClients {
2862 n += cc.PendingRequests()
2863 }
2864 c.connClientsLock.Unlock()
2865 return n
2866}
2867
2868func (c *pipelineConnClient) PendingRequests() int {
2869 c.init()
2870
2871 c.chLock.Lock()
2872 n := len(c.chR) + len(c.chW)
2873 c.chLock.Unlock()
2874 return n
2875}
2876
2877func (c *pipelineConnClient) getClientName() []byte {
2878 v := c.clientName.Load()
2879 var clientName []byte
2880 if v == nil {
2881 clientName = []byte(c.Name)
2882 if len(clientName) == 0 && !c.NoDefaultUserAgentHeader {
2883 clientName = defaultUserAgent
2884 }
2885 c.clientName.Store(clientName)
2886 } else {
2887 clientName = v.([]byte)
2888 }
2889 return clientName
2890}
2891
2892var errPipelineConnStopped = errors.New("pipeline connection has been stopped")
2893
2894func acquirePipelineWork(pool *sync.Pool, timeout time.Duration) *pipelineWork {
2895 v := pool.Get()
2896 if v == nil {
2897 v = &pipelineWork{
2898 done: make(chan struct{}, 1),
2899 }
2900 }
2901 w := v.(*pipelineWork)
2902 if timeout > 0 {
2903 if w.t == nil {
2904 w.t = time.NewTimer(timeout)
2905 } else {
2906 w.t.Reset(timeout)
2907 }
2908 w.deadline = time.Now().Add(timeout)
2909 } else {
2910 w.deadline = zeroTime
2911 }
2912 return w
2913}
2914
2915func releasePipelineWork(pool *sync.Pool, w *pipelineWork) {
2916 if w.t != nil {
2917 w.t.Stop()
2918 }
2919 w.reqCopy.Reset()
2920 w.respCopy.Reset()
2921 w.req = nil
2922 w.resp = nil
2923 w.err = nil
2924 pool.Put(w)
2925}
Note: See TracBrowser for help on using the repository browser.