source: code/trunk/vendor/github.com/valyala/fasthttp/lbclient.go@ 145

Last change on this file since 145 was 145, checked in by Izuru Yakumo, 22 months ago

Updated the Makefile and vendored depedencies

Signed-off-by: Izuru Yakumo <yakumo.izuru@…>

File size: 4.1 KB
RevLine 
[145]1package fasthttp
2
3import (
4 "sync"
5 "sync/atomic"
6 "time"
7)
8
9// BalancingClient is the interface for clients, which may be passed
10// to LBClient.Clients.
11type BalancingClient interface {
12 DoDeadline(req *Request, resp *Response, deadline time.Time) error
13 PendingRequests() int
14}
15
16// LBClient balances requests among available LBClient.Clients.
17//
18// It has the following features:
19//
20// - Balances load among available clients using 'least loaded' + 'least total'
21// hybrid technique.
22// - Dynamically decreases load on unhealthy clients.
23//
24// It is forbidden copying LBClient instances. Create new instances instead.
25//
26// It is safe calling LBClient methods from concurrently running goroutines.
27type LBClient struct {
28 noCopy noCopy //nolint:unused,structcheck
29
30 // Clients must contain non-zero clients list.
31 // Incoming requests are balanced among these clients.
32 Clients []BalancingClient
33
34 // HealthCheck is a callback called after each request.
35 //
36 // The request, response and the error returned by the client
37 // is passed to HealthCheck, so the callback may determine whether
38 // the client is healthy.
39 //
40 // Load on the current client is decreased if HealthCheck returns false.
41 //
42 // By default HealthCheck returns false if err != nil.
43 HealthCheck func(req *Request, resp *Response, err error) bool
44
45 // Timeout is the request timeout used when calling LBClient.Do.
46 //
47 // DefaultLBClientTimeout is used by default.
48 Timeout time.Duration
49
50 cs []*lbClient
51
52 once sync.Once
53}
54
55// DefaultLBClientTimeout is the default request timeout used by LBClient
56// when calling LBClient.Do.
57//
58// The timeout may be overridden via LBClient.Timeout.
59const DefaultLBClientTimeout = time.Second
60
61// DoDeadline calls DoDeadline on the least loaded client
62func (cc *LBClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
63 return cc.get().DoDeadline(req, resp, deadline)
64}
65
66// DoTimeout calculates deadline and calls DoDeadline on the least loaded client
67func (cc *LBClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
68 deadline := time.Now().Add(timeout)
69 return cc.get().DoDeadline(req, resp, deadline)
70}
71
72// Do calls calculates deadline using LBClient.Timeout and calls DoDeadline
73// on the least loaded client.
74func (cc *LBClient) Do(req *Request, resp *Response) error {
75 timeout := cc.Timeout
76 if timeout <= 0 {
77 timeout = DefaultLBClientTimeout
78 }
79 return cc.DoTimeout(req, resp, timeout)
80}
81
82func (cc *LBClient) init() {
83 if len(cc.Clients) == 0 {
84 panic("BUG: LBClient.Clients cannot be empty")
85 }
86 for _, c := range cc.Clients {
87 cc.cs = append(cc.cs, &lbClient{
88 c: c,
89 healthCheck: cc.HealthCheck,
90 })
91 }
92}
93
94func (cc *LBClient) get() *lbClient {
95 cc.once.Do(cc.init)
96
97 cs := cc.cs
98
99 minC := cs[0]
100 minN := minC.PendingRequests()
101 minT := atomic.LoadUint64(&minC.total)
102 for _, c := range cs[1:] {
103 n := c.PendingRequests()
104 t := atomic.LoadUint64(&c.total)
105 if n < minN || (n == minN && t < minT) {
106 minC = c
107 minN = n
108 minT = t
109 }
110 }
111 return minC
112}
113
114type lbClient struct {
115 c BalancingClient
116 healthCheck func(req *Request, resp *Response, err error) bool
117 penalty uint32
118
119 // total amount of requests handled.
120 total uint64
121}
122
123func (c *lbClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
124 err := c.c.DoDeadline(req, resp, deadline)
125 if !c.isHealthy(req, resp, err) && c.incPenalty() {
126 // Penalize the client returning error, so the next requests
127 // are routed to another clients.
128 time.AfterFunc(penaltyDuration, c.decPenalty)
129 } else {
130 atomic.AddUint64(&c.total, 1)
131 }
132 return err
133}
134
135func (c *lbClient) PendingRequests() int {
136 n := c.c.PendingRequests()
137 m := atomic.LoadUint32(&c.penalty)
138 return n + int(m)
139}
140
141func (c *lbClient) isHealthy(req *Request, resp *Response, err error) bool {
142 if c.healthCheck == nil {
143 return err == nil
144 }
145 return c.healthCheck(req, resp, err)
146}
147
148func (c *lbClient) incPenalty() bool {
149 m := atomic.AddUint32(&c.penalty, 1)
150 if m > maxPenalty {
151 c.decPenalty()
152 return false
153 }
154 return true
155}
156
157func (c *lbClient) decPenalty() {
158 atomic.AddUint32(&c.penalty, ^uint32(0))
159}
160
161const (
162 maxPenalty = 300
163
164 penaltyDuration = 3 * time.Second
165)
Note: See TracBrowser for help on using the repository browser.