1 | package fasthttp
|
---|
2 |
|
---|
3 | import (
|
---|
4 | "sync"
|
---|
5 | "sync/atomic"
|
---|
6 | "time"
|
---|
7 | )
|
---|
8 |
|
---|
9 | // BalancingClient is the interface for clients, which may be passed
|
---|
10 | // to LBClient.Clients.
|
---|
11 | type BalancingClient interface {
|
---|
12 | DoDeadline(req *Request, resp *Response, deadline time.Time) error
|
---|
13 | PendingRequests() int
|
---|
14 | }
|
---|
15 |
|
---|
16 | // LBClient balances requests among available LBClient.Clients.
|
---|
17 | //
|
---|
18 | // It has the following features:
|
---|
19 | //
|
---|
20 | // - Balances load among available clients using 'least loaded' + 'least total'
|
---|
21 | // hybrid technique.
|
---|
22 | // - Dynamically decreases load on unhealthy clients.
|
---|
23 | //
|
---|
24 | // It is forbidden copying LBClient instances. Create new instances instead.
|
---|
25 | //
|
---|
26 | // It is safe calling LBClient methods from concurrently running goroutines.
|
---|
27 | type LBClient struct {
|
---|
28 | noCopy noCopy //nolint:unused,structcheck
|
---|
29 |
|
---|
30 | // Clients must contain non-zero clients list.
|
---|
31 | // Incoming requests are balanced among these clients.
|
---|
32 | Clients []BalancingClient
|
---|
33 |
|
---|
34 | // HealthCheck is a callback called after each request.
|
---|
35 | //
|
---|
36 | // The request, response and the error returned by the client
|
---|
37 | // is passed to HealthCheck, so the callback may determine whether
|
---|
38 | // the client is healthy.
|
---|
39 | //
|
---|
40 | // Load on the current client is decreased if HealthCheck returns false.
|
---|
41 | //
|
---|
42 | // By default HealthCheck returns false if err != nil.
|
---|
43 | HealthCheck func(req *Request, resp *Response, err error) bool
|
---|
44 |
|
---|
45 | // Timeout is the request timeout used when calling LBClient.Do.
|
---|
46 | //
|
---|
47 | // DefaultLBClientTimeout is used by default.
|
---|
48 | Timeout time.Duration
|
---|
49 |
|
---|
50 | cs []*lbClient
|
---|
51 |
|
---|
52 | once sync.Once
|
---|
53 | }
|
---|
54 |
|
---|
55 | // DefaultLBClientTimeout is the default request timeout used by LBClient
|
---|
56 | // when calling LBClient.Do.
|
---|
57 | //
|
---|
58 | // The timeout may be overridden via LBClient.Timeout.
|
---|
59 | const DefaultLBClientTimeout = time.Second
|
---|
60 |
|
---|
61 | // DoDeadline calls DoDeadline on the least loaded client
|
---|
62 | func (cc *LBClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
|
---|
63 | return cc.get().DoDeadline(req, resp, deadline)
|
---|
64 | }
|
---|
65 |
|
---|
66 | // DoTimeout calculates deadline and calls DoDeadline on the least loaded client
|
---|
67 | func (cc *LBClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
|
---|
68 | deadline := time.Now().Add(timeout)
|
---|
69 | return cc.get().DoDeadline(req, resp, deadline)
|
---|
70 | }
|
---|
71 |
|
---|
72 | // Do calls calculates deadline using LBClient.Timeout and calls DoDeadline
|
---|
73 | // on the least loaded client.
|
---|
74 | func (cc *LBClient) Do(req *Request, resp *Response) error {
|
---|
75 | timeout := cc.Timeout
|
---|
76 | if timeout <= 0 {
|
---|
77 | timeout = DefaultLBClientTimeout
|
---|
78 | }
|
---|
79 | return cc.DoTimeout(req, resp, timeout)
|
---|
80 | }
|
---|
81 |
|
---|
82 | func (cc *LBClient) init() {
|
---|
83 | if len(cc.Clients) == 0 {
|
---|
84 | panic("BUG: LBClient.Clients cannot be empty")
|
---|
85 | }
|
---|
86 | for _, c := range cc.Clients {
|
---|
87 | cc.cs = append(cc.cs, &lbClient{
|
---|
88 | c: c,
|
---|
89 | healthCheck: cc.HealthCheck,
|
---|
90 | })
|
---|
91 | }
|
---|
92 | }
|
---|
93 |
|
---|
94 | func (cc *LBClient) get() *lbClient {
|
---|
95 | cc.once.Do(cc.init)
|
---|
96 |
|
---|
97 | cs := cc.cs
|
---|
98 |
|
---|
99 | minC := cs[0]
|
---|
100 | minN := minC.PendingRequests()
|
---|
101 | minT := atomic.LoadUint64(&minC.total)
|
---|
102 | for _, c := range cs[1:] {
|
---|
103 | n := c.PendingRequests()
|
---|
104 | t := atomic.LoadUint64(&c.total)
|
---|
105 | if n < minN || (n == minN && t < minT) {
|
---|
106 | minC = c
|
---|
107 | minN = n
|
---|
108 | minT = t
|
---|
109 | }
|
---|
110 | }
|
---|
111 | return minC
|
---|
112 | }
|
---|
113 |
|
---|
114 | type lbClient struct {
|
---|
115 | c BalancingClient
|
---|
116 | healthCheck func(req *Request, resp *Response, err error) bool
|
---|
117 | penalty uint32
|
---|
118 |
|
---|
119 | // total amount of requests handled.
|
---|
120 | total uint64
|
---|
121 | }
|
---|
122 |
|
---|
123 | func (c *lbClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
|
---|
124 | err := c.c.DoDeadline(req, resp, deadline)
|
---|
125 | if !c.isHealthy(req, resp, err) && c.incPenalty() {
|
---|
126 | // Penalize the client returning error, so the next requests
|
---|
127 | // are routed to another clients.
|
---|
128 | time.AfterFunc(penaltyDuration, c.decPenalty)
|
---|
129 | } else {
|
---|
130 | atomic.AddUint64(&c.total, 1)
|
---|
131 | }
|
---|
132 | return err
|
---|
133 | }
|
---|
134 |
|
---|
135 | func (c *lbClient) PendingRequests() int {
|
---|
136 | n := c.c.PendingRequests()
|
---|
137 | m := atomic.LoadUint32(&c.penalty)
|
---|
138 | return n + int(m)
|
---|
139 | }
|
---|
140 |
|
---|
141 | func (c *lbClient) isHealthy(req *Request, resp *Response, err error) bool {
|
---|
142 | if c.healthCheck == nil {
|
---|
143 | return err == nil
|
---|
144 | }
|
---|
145 | return c.healthCheck(req, resp, err)
|
---|
146 | }
|
---|
147 |
|
---|
148 | func (c *lbClient) incPenalty() bool {
|
---|
149 | m := atomic.AddUint32(&c.penalty, 1)
|
---|
150 | if m > maxPenalty {
|
---|
151 | c.decPenalty()
|
---|
152 | return false
|
---|
153 | }
|
---|
154 | return true
|
---|
155 | }
|
---|
156 |
|
---|
157 | func (c *lbClient) decPenalty() {
|
---|
158 | atomic.AddUint32(&c.penalty, ^uint32(0))
|
---|
159 | }
|
---|
160 |
|
---|
161 | const (
|
---|
162 | maxPenalty = 300
|
---|
163 |
|
---|
164 | penaltyDuration = 3 * time.Second
|
---|
165 | )
|
---|