[145] | 1 | package fasthttp
|
---|
| 2 |
|
---|
| 3 | import (
|
---|
| 4 | "sync"
|
---|
| 5 | "sync/atomic"
|
---|
| 6 | "time"
|
---|
| 7 | )
|
---|
| 8 |
|
---|
| 9 | // BalancingClient is the interface for clients, which may be passed
|
---|
| 10 | // to LBClient.Clients.
|
---|
| 11 | type BalancingClient interface {
|
---|
| 12 | DoDeadline(req *Request, resp *Response, deadline time.Time) error
|
---|
| 13 | PendingRequests() int
|
---|
| 14 | }
|
---|
| 15 |
|
---|
| 16 | // LBClient balances requests among available LBClient.Clients.
|
---|
| 17 | //
|
---|
| 18 | // It has the following features:
|
---|
| 19 | //
|
---|
| 20 | // - Balances load among available clients using 'least loaded' + 'least total'
|
---|
| 21 | // hybrid technique.
|
---|
| 22 | // - Dynamically decreases load on unhealthy clients.
|
---|
| 23 | //
|
---|
| 24 | // It is forbidden copying LBClient instances. Create new instances instead.
|
---|
| 25 | //
|
---|
| 26 | // It is safe calling LBClient methods from concurrently running goroutines.
|
---|
| 27 | type LBClient struct {
|
---|
| 28 | noCopy noCopy //nolint:unused,structcheck
|
---|
| 29 |
|
---|
| 30 | // Clients must contain non-zero clients list.
|
---|
| 31 | // Incoming requests are balanced among these clients.
|
---|
| 32 | Clients []BalancingClient
|
---|
| 33 |
|
---|
| 34 | // HealthCheck is a callback called after each request.
|
---|
| 35 | //
|
---|
| 36 | // The request, response and the error returned by the client
|
---|
| 37 | // is passed to HealthCheck, so the callback may determine whether
|
---|
| 38 | // the client is healthy.
|
---|
| 39 | //
|
---|
| 40 | // Load on the current client is decreased if HealthCheck returns false.
|
---|
| 41 | //
|
---|
| 42 | // By default HealthCheck returns false if err != nil.
|
---|
| 43 | HealthCheck func(req *Request, resp *Response, err error) bool
|
---|
| 44 |
|
---|
| 45 | // Timeout is the request timeout used when calling LBClient.Do.
|
---|
| 46 | //
|
---|
| 47 | // DefaultLBClientTimeout is used by default.
|
---|
| 48 | Timeout time.Duration
|
---|
| 49 |
|
---|
| 50 | cs []*lbClient
|
---|
| 51 |
|
---|
| 52 | once sync.Once
|
---|
| 53 | }
|
---|
| 54 |
|
---|
| 55 | // DefaultLBClientTimeout is the default request timeout used by LBClient
|
---|
| 56 | // when calling LBClient.Do.
|
---|
| 57 | //
|
---|
| 58 | // The timeout may be overridden via LBClient.Timeout.
|
---|
| 59 | const DefaultLBClientTimeout = time.Second
|
---|
| 60 |
|
---|
| 61 | // DoDeadline calls DoDeadline on the least loaded client
|
---|
| 62 | func (cc *LBClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
|
---|
| 63 | return cc.get().DoDeadline(req, resp, deadline)
|
---|
| 64 | }
|
---|
| 65 |
|
---|
| 66 | // DoTimeout calculates deadline and calls DoDeadline on the least loaded client
|
---|
| 67 | func (cc *LBClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
|
---|
| 68 | deadline := time.Now().Add(timeout)
|
---|
| 69 | return cc.get().DoDeadline(req, resp, deadline)
|
---|
| 70 | }
|
---|
| 71 |
|
---|
| 72 | // Do calls calculates deadline using LBClient.Timeout and calls DoDeadline
|
---|
| 73 | // on the least loaded client.
|
---|
| 74 | func (cc *LBClient) Do(req *Request, resp *Response) error {
|
---|
| 75 | timeout := cc.Timeout
|
---|
| 76 | if timeout <= 0 {
|
---|
| 77 | timeout = DefaultLBClientTimeout
|
---|
| 78 | }
|
---|
| 79 | return cc.DoTimeout(req, resp, timeout)
|
---|
| 80 | }
|
---|
| 81 |
|
---|
| 82 | func (cc *LBClient) init() {
|
---|
| 83 | if len(cc.Clients) == 0 {
|
---|
| 84 | panic("BUG: LBClient.Clients cannot be empty")
|
---|
| 85 | }
|
---|
| 86 | for _, c := range cc.Clients {
|
---|
| 87 | cc.cs = append(cc.cs, &lbClient{
|
---|
| 88 | c: c,
|
---|
| 89 | healthCheck: cc.HealthCheck,
|
---|
| 90 | })
|
---|
| 91 | }
|
---|
| 92 | }
|
---|
| 93 |
|
---|
| 94 | func (cc *LBClient) get() *lbClient {
|
---|
| 95 | cc.once.Do(cc.init)
|
---|
| 96 |
|
---|
| 97 | cs := cc.cs
|
---|
| 98 |
|
---|
| 99 | minC := cs[0]
|
---|
| 100 | minN := minC.PendingRequests()
|
---|
| 101 | minT := atomic.LoadUint64(&minC.total)
|
---|
| 102 | for _, c := range cs[1:] {
|
---|
| 103 | n := c.PendingRequests()
|
---|
| 104 | t := atomic.LoadUint64(&c.total)
|
---|
| 105 | if n < minN || (n == minN && t < minT) {
|
---|
| 106 | minC = c
|
---|
| 107 | minN = n
|
---|
| 108 | minT = t
|
---|
| 109 | }
|
---|
| 110 | }
|
---|
| 111 | return minC
|
---|
| 112 | }
|
---|
| 113 |
|
---|
| 114 | type lbClient struct {
|
---|
| 115 | c BalancingClient
|
---|
| 116 | healthCheck func(req *Request, resp *Response, err error) bool
|
---|
| 117 | penalty uint32
|
---|
| 118 |
|
---|
| 119 | // total amount of requests handled.
|
---|
| 120 | total uint64
|
---|
| 121 | }
|
---|
| 122 |
|
---|
| 123 | func (c *lbClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
|
---|
| 124 | err := c.c.DoDeadline(req, resp, deadline)
|
---|
| 125 | if !c.isHealthy(req, resp, err) && c.incPenalty() {
|
---|
| 126 | // Penalize the client returning error, so the next requests
|
---|
| 127 | // are routed to another clients.
|
---|
| 128 | time.AfterFunc(penaltyDuration, c.decPenalty)
|
---|
| 129 | } else {
|
---|
| 130 | atomic.AddUint64(&c.total, 1)
|
---|
| 131 | }
|
---|
| 132 | return err
|
---|
| 133 | }
|
---|
| 134 |
|
---|
| 135 | func (c *lbClient) PendingRequests() int {
|
---|
| 136 | n := c.c.PendingRequests()
|
---|
| 137 | m := atomic.LoadUint32(&c.penalty)
|
---|
| 138 | return n + int(m)
|
---|
| 139 | }
|
---|
| 140 |
|
---|
| 141 | func (c *lbClient) isHealthy(req *Request, resp *Response, err error) bool {
|
---|
| 142 | if c.healthCheck == nil {
|
---|
| 143 | return err == nil
|
---|
| 144 | }
|
---|
| 145 | return c.healthCheck(req, resp, err)
|
---|
| 146 | }
|
---|
| 147 |
|
---|
| 148 | func (c *lbClient) incPenalty() bool {
|
---|
| 149 | m := atomic.AddUint32(&c.penalty, 1)
|
---|
| 150 | if m > maxPenalty {
|
---|
| 151 | c.decPenalty()
|
---|
| 152 | return false
|
---|
| 153 | }
|
---|
| 154 | return true
|
---|
| 155 | }
|
---|
| 156 |
|
---|
| 157 | func (c *lbClient) decPenalty() {
|
---|
| 158 | atomic.AddUint32(&c.penalty, ^uint32(0))
|
---|
| 159 | }
|
---|
| 160 |
|
---|
| 161 | const (
|
---|
| 162 | maxPenalty = 300
|
---|
| 163 |
|
---|
| 164 | penaltyDuration = 3 * time.Second
|
---|
| 165 | )
|
---|