1 | // Copyright 2015 The Go Authors. All rights reserved.
|
---|
2 | // Use of this source code is governed by a BSD-style
|
---|
3 | // license that can be found in the LICENSE file.
|
---|
4 |
|
---|
5 | // Package rate provides a rate limiter.
|
---|
6 | package rate
|
---|
7 |
|
---|
8 | import (
|
---|
9 | "context"
|
---|
10 | "fmt"
|
---|
11 | "math"
|
---|
12 | "sync"
|
---|
13 | "time"
|
---|
14 | )
|
---|
15 |
|
---|
16 | // Limit defines the maximum frequency of some events.
|
---|
17 | // Limit is represented as number of events per second.
|
---|
18 | // A zero Limit allows no events.
|
---|
19 | type Limit float64
|
---|
20 |
|
---|
21 | // Inf is the infinite rate limit; it allows all events (even if burst is zero).
|
---|
22 | const Inf = Limit(math.MaxFloat64)
|
---|
23 |
|
---|
24 | // Every converts a minimum time interval between events to a Limit.
|
---|
25 | func Every(interval time.Duration) Limit {
|
---|
26 | if interval <= 0 {
|
---|
27 | return Inf
|
---|
28 | }
|
---|
29 | return 1 / Limit(interval.Seconds())
|
---|
30 | }
|
---|
31 |
|
---|
32 | // A Limiter controls how frequently events are allowed to happen.
|
---|
33 | // It implements a "token bucket" of size b, initially full and refilled
|
---|
34 | // at rate r tokens per second.
|
---|
35 | // Informally, in any large enough time interval, the Limiter limits the
|
---|
36 | // rate to r tokens per second, with a maximum burst size of b events.
|
---|
37 | // As a special case, if r == Inf (the infinite rate), b is ignored.
|
---|
38 | // See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets.
|
---|
39 | //
|
---|
40 | // The zero value is a valid Limiter, but it will reject all events.
|
---|
41 | // Use NewLimiter to create non-zero Limiters.
|
---|
42 | //
|
---|
43 | // Limiter has three main methods, Allow, Reserve, and Wait.
|
---|
44 | // Most callers should use Wait.
|
---|
45 | //
|
---|
46 | // Each of the three methods consumes a single token.
|
---|
47 | // They differ in their behavior when no token is available.
|
---|
48 | // If no token is available, Allow returns false.
|
---|
49 | // If no token is available, Reserve returns a reservation for a future token
|
---|
50 | // and the amount of time the caller must wait before using it.
|
---|
51 | // If no token is available, Wait blocks until one can be obtained
|
---|
52 | // or its associated context.Context is canceled.
|
---|
53 | //
|
---|
54 | // The methods AllowN, ReserveN, and WaitN consume n tokens.
|
---|
55 | type Limiter struct {
|
---|
56 | mu sync.Mutex
|
---|
57 | limit Limit
|
---|
58 | burst int
|
---|
59 | tokens float64
|
---|
60 | // last is the last time the limiter's tokens field was updated
|
---|
61 | last time.Time
|
---|
62 | // lastEvent is the latest time of a rate-limited event (past or future)
|
---|
63 | lastEvent time.Time
|
---|
64 | }
|
---|
65 |
|
---|
66 | // Limit returns the maximum overall event rate.
|
---|
67 | func (lim *Limiter) Limit() Limit {
|
---|
68 | lim.mu.Lock()
|
---|
69 | defer lim.mu.Unlock()
|
---|
70 | return lim.limit
|
---|
71 | }
|
---|
72 |
|
---|
73 | // Burst returns the maximum burst size. Burst is the maximum number of tokens
|
---|
74 | // that can be consumed in a single call to Allow, Reserve, or Wait, so higher
|
---|
75 | // Burst values allow more events to happen at once.
|
---|
76 | // A zero Burst allows no events, unless limit == Inf.
|
---|
77 | func (lim *Limiter) Burst() int {
|
---|
78 | lim.mu.Lock()
|
---|
79 | defer lim.mu.Unlock()
|
---|
80 | return lim.burst
|
---|
81 | }
|
---|
82 |
|
---|
83 | // TokensAt returns the number of tokens available at time t.
|
---|
84 | func (lim *Limiter) TokensAt(t time.Time) float64 {
|
---|
85 | lim.mu.Lock()
|
---|
86 | _, tokens := lim.advance(t) // does not mutate lim
|
---|
87 | lim.mu.Unlock()
|
---|
88 | return tokens
|
---|
89 | }
|
---|
90 |
|
---|
91 | // Tokens returns the number of tokens available now.
|
---|
92 | func (lim *Limiter) Tokens() float64 {
|
---|
93 | return lim.TokensAt(time.Now())
|
---|
94 | }
|
---|
95 |
|
---|
96 | // NewLimiter returns a new Limiter that allows events up to rate r and permits
|
---|
97 | // bursts of at most b tokens.
|
---|
98 | func NewLimiter(r Limit, b int) *Limiter {
|
---|
99 | return &Limiter{
|
---|
100 | limit: r,
|
---|
101 | burst: b,
|
---|
102 | }
|
---|
103 | }
|
---|
104 |
|
---|
105 | // Allow reports whether an event may happen now.
|
---|
106 | func (lim *Limiter) Allow() bool {
|
---|
107 | return lim.AllowN(time.Now(), 1)
|
---|
108 | }
|
---|
109 |
|
---|
110 | // AllowN reports whether n events may happen at time t.
|
---|
111 | // Use this method if you intend to drop / skip events that exceed the rate limit.
|
---|
112 | // Otherwise use Reserve or Wait.
|
---|
113 | func (lim *Limiter) AllowN(t time.Time, n int) bool {
|
---|
114 | return lim.reserveN(t, n, 0).ok
|
---|
115 | }
|
---|
116 |
|
---|
117 | // A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
|
---|
118 | // A Reservation may be canceled, which may enable the Limiter to permit additional events.
|
---|
119 | type Reservation struct {
|
---|
120 | ok bool
|
---|
121 | lim *Limiter
|
---|
122 | tokens int
|
---|
123 | timeToAct time.Time
|
---|
124 | // This is the Limit at reservation time, it can change later.
|
---|
125 | limit Limit
|
---|
126 | }
|
---|
127 |
|
---|
128 | // OK returns whether the limiter can provide the requested number of tokens
|
---|
129 | // within the maximum wait time. If OK is false, Delay returns InfDuration, and
|
---|
130 | // Cancel does nothing.
|
---|
131 | func (r *Reservation) OK() bool {
|
---|
132 | return r.ok
|
---|
133 | }
|
---|
134 |
|
---|
135 | // Delay is shorthand for DelayFrom(time.Now()).
|
---|
136 | func (r *Reservation) Delay() time.Duration {
|
---|
137 | return r.DelayFrom(time.Now())
|
---|
138 | }
|
---|
139 |
|
---|
140 | // InfDuration is the duration returned by Delay when a Reservation is not OK.
|
---|
141 | const InfDuration = time.Duration(math.MaxInt64)
|
---|
142 |
|
---|
143 | // DelayFrom returns the duration for which the reservation holder must wait
|
---|
144 | // before taking the reserved action. Zero duration means act immediately.
|
---|
145 | // InfDuration means the limiter cannot grant the tokens requested in this
|
---|
146 | // Reservation within the maximum wait time.
|
---|
147 | func (r *Reservation) DelayFrom(t time.Time) time.Duration {
|
---|
148 | if !r.ok {
|
---|
149 | return InfDuration
|
---|
150 | }
|
---|
151 | delay := r.timeToAct.Sub(t)
|
---|
152 | if delay < 0 {
|
---|
153 | return 0
|
---|
154 | }
|
---|
155 | return delay
|
---|
156 | }
|
---|
157 |
|
---|
158 | // Cancel is shorthand for CancelAt(time.Now()).
|
---|
159 | func (r *Reservation) Cancel() {
|
---|
160 | r.CancelAt(time.Now())
|
---|
161 | }
|
---|
162 |
|
---|
163 | // CancelAt indicates that the reservation holder will not perform the reserved action
|
---|
164 | // and reverses the effects of this Reservation on the rate limit as much as possible,
|
---|
165 | // considering that other reservations may have already been made.
|
---|
166 | func (r *Reservation) CancelAt(t time.Time) {
|
---|
167 | if !r.ok {
|
---|
168 | return
|
---|
169 | }
|
---|
170 |
|
---|
171 | r.lim.mu.Lock()
|
---|
172 | defer r.lim.mu.Unlock()
|
---|
173 |
|
---|
174 | if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(t) {
|
---|
175 | return
|
---|
176 | }
|
---|
177 |
|
---|
178 | // calculate tokens to restore
|
---|
179 | // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
|
---|
180 | // after r was obtained. These tokens should not be restored.
|
---|
181 | restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
|
---|
182 | if restoreTokens <= 0 {
|
---|
183 | return
|
---|
184 | }
|
---|
185 | // advance time to now
|
---|
186 | t, tokens := r.lim.advance(t)
|
---|
187 | // calculate new number of tokens
|
---|
188 | tokens += restoreTokens
|
---|
189 | if burst := float64(r.lim.burst); tokens > burst {
|
---|
190 | tokens = burst
|
---|
191 | }
|
---|
192 | // update state
|
---|
193 | r.lim.last = t
|
---|
194 | r.lim.tokens = tokens
|
---|
195 | if r.timeToAct == r.lim.lastEvent {
|
---|
196 | prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
|
---|
197 | if !prevEvent.Before(t) {
|
---|
198 | r.lim.lastEvent = prevEvent
|
---|
199 | }
|
---|
200 | }
|
---|
201 | }
|
---|
202 |
|
---|
203 | // Reserve is shorthand for ReserveN(time.Now(), 1).
|
---|
204 | func (lim *Limiter) Reserve() *Reservation {
|
---|
205 | return lim.ReserveN(time.Now(), 1)
|
---|
206 | }
|
---|
207 |
|
---|
208 | // ReserveN returns a Reservation that indicates how long the caller must wait before n events happen.
|
---|
209 | // The Limiter takes this Reservation into account when allowing future events.
|
---|
210 | // The returned Reservation’s OK() method returns false if n exceeds the Limiter's burst size.
|
---|
211 | // Usage example:
|
---|
212 | //
|
---|
213 | // r := lim.ReserveN(time.Now(), 1)
|
---|
214 | // if !r.OK() {
|
---|
215 | // // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
|
---|
216 | // return
|
---|
217 | // }
|
---|
218 | // time.Sleep(r.Delay())
|
---|
219 | // Act()
|
---|
220 | //
|
---|
221 | // Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
|
---|
222 | // If you need to respect a deadline or cancel the delay, use Wait instead.
|
---|
223 | // To drop or skip events exceeding rate limit, use Allow instead.
|
---|
224 | func (lim *Limiter) ReserveN(t time.Time, n int) *Reservation {
|
---|
225 | r := lim.reserveN(t, n, InfDuration)
|
---|
226 | return &r
|
---|
227 | }
|
---|
228 |
|
---|
229 | // Wait is shorthand for WaitN(ctx, 1).
|
---|
230 | func (lim *Limiter) Wait(ctx context.Context) (err error) {
|
---|
231 | return lim.WaitN(ctx, 1)
|
---|
232 | }
|
---|
233 |
|
---|
234 | // WaitN blocks until lim permits n events to happen.
|
---|
235 | // It returns an error if n exceeds the Limiter's burst size, the Context is
|
---|
236 | // canceled, or the expected wait time exceeds the Context's Deadline.
|
---|
237 | // The burst limit is ignored if the rate limit is Inf.
|
---|
238 | func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
|
---|
239 | // The test code calls lim.wait with a fake timer generator.
|
---|
240 | // This is the real timer generator.
|
---|
241 | newTimer := func(d time.Duration) (<-chan time.Time, func() bool, func()) {
|
---|
242 | timer := time.NewTimer(d)
|
---|
243 | return timer.C, timer.Stop, func() {}
|
---|
244 | }
|
---|
245 |
|
---|
246 | return lim.wait(ctx, n, time.Now(), newTimer)
|
---|
247 | }
|
---|
248 |
|
---|
249 | // wait is the internal implementation of WaitN.
|
---|
250 | func (lim *Limiter) wait(ctx context.Context, n int, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error {
|
---|
251 | lim.mu.Lock()
|
---|
252 | burst := lim.burst
|
---|
253 | limit := lim.limit
|
---|
254 | lim.mu.Unlock()
|
---|
255 |
|
---|
256 | if n > burst && limit != Inf {
|
---|
257 | return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
|
---|
258 | }
|
---|
259 | // Check if ctx is already cancelled
|
---|
260 | select {
|
---|
261 | case <-ctx.Done():
|
---|
262 | return ctx.Err()
|
---|
263 | default:
|
---|
264 | }
|
---|
265 | // Determine wait limit
|
---|
266 | waitLimit := InfDuration
|
---|
267 | if deadline, ok := ctx.Deadline(); ok {
|
---|
268 | waitLimit = deadline.Sub(t)
|
---|
269 | }
|
---|
270 | // Reserve
|
---|
271 | r := lim.reserveN(t, n, waitLimit)
|
---|
272 | if !r.ok {
|
---|
273 | return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
|
---|
274 | }
|
---|
275 | // Wait if necessary
|
---|
276 | delay := r.DelayFrom(t)
|
---|
277 | if delay == 0 {
|
---|
278 | return nil
|
---|
279 | }
|
---|
280 | ch, stop, advance := newTimer(delay)
|
---|
281 | defer stop()
|
---|
282 | advance() // only has an effect when testing
|
---|
283 | select {
|
---|
284 | case <-ch:
|
---|
285 | // We can proceed.
|
---|
286 | return nil
|
---|
287 | case <-ctx.Done():
|
---|
288 | // Context was canceled before we could proceed. Cancel the
|
---|
289 | // reservation, which may permit other events to proceed sooner.
|
---|
290 | r.Cancel()
|
---|
291 | return ctx.Err()
|
---|
292 | }
|
---|
293 | }
|
---|
294 |
|
---|
295 | // SetLimit is shorthand for SetLimitAt(time.Now(), newLimit).
|
---|
296 | func (lim *Limiter) SetLimit(newLimit Limit) {
|
---|
297 | lim.SetLimitAt(time.Now(), newLimit)
|
---|
298 | }
|
---|
299 |
|
---|
300 | // SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated
|
---|
301 | // or underutilized by those which reserved (using Reserve or Wait) but did not yet act
|
---|
302 | // before SetLimitAt was called.
|
---|
303 | func (lim *Limiter) SetLimitAt(t time.Time, newLimit Limit) {
|
---|
304 | lim.mu.Lock()
|
---|
305 | defer lim.mu.Unlock()
|
---|
306 |
|
---|
307 | t, tokens := lim.advance(t)
|
---|
308 |
|
---|
309 | lim.last = t
|
---|
310 | lim.tokens = tokens
|
---|
311 | lim.limit = newLimit
|
---|
312 | }
|
---|
313 |
|
---|
314 | // SetBurst is shorthand for SetBurstAt(time.Now(), newBurst).
|
---|
315 | func (lim *Limiter) SetBurst(newBurst int) {
|
---|
316 | lim.SetBurstAt(time.Now(), newBurst)
|
---|
317 | }
|
---|
318 |
|
---|
319 | // SetBurstAt sets a new burst size for the limiter.
|
---|
320 | func (lim *Limiter) SetBurstAt(t time.Time, newBurst int) {
|
---|
321 | lim.mu.Lock()
|
---|
322 | defer lim.mu.Unlock()
|
---|
323 |
|
---|
324 | t, tokens := lim.advance(t)
|
---|
325 |
|
---|
326 | lim.last = t
|
---|
327 | lim.tokens = tokens
|
---|
328 | lim.burst = newBurst
|
---|
329 | }
|
---|
330 |
|
---|
331 | // reserveN is a helper method for AllowN, ReserveN, and WaitN.
|
---|
332 | // maxFutureReserve specifies the maximum reservation wait duration allowed.
|
---|
333 | // reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
|
---|
334 | func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {
|
---|
335 | lim.mu.Lock()
|
---|
336 | defer lim.mu.Unlock()
|
---|
337 |
|
---|
338 | if lim.limit == Inf {
|
---|
339 | return Reservation{
|
---|
340 | ok: true,
|
---|
341 | lim: lim,
|
---|
342 | tokens: n,
|
---|
343 | timeToAct: t,
|
---|
344 | }
|
---|
345 | } else if lim.limit == 0 {
|
---|
346 | var ok bool
|
---|
347 | if lim.burst >= n {
|
---|
348 | ok = true
|
---|
349 | lim.burst -= n
|
---|
350 | }
|
---|
351 | return Reservation{
|
---|
352 | ok: ok,
|
---|
353 | lim: lim,
|
---|
354 | tokens: lim.burst,
|
---|
355 | timeToAct: t,
|
---|
356 | }
|
---|
357 | }
|
---|
358 |
|
---|
359 | t, tokens := lim.advance(t)
|
---|
360 |
|
---|
361 | // Calculate the remaining number of tokens resulting from the request.
|
---|
362 | tokens -= float64(n)
|
---|
363 |
|
---|
364 | // Calculate the wait duration
|
---|
365 | var waitDuration time.Duration
|
---|
366 | if tokens < 0 {
|
---|
367 | waitDuration = lim.limit.durationFromTokens(-tokens)
|
---|
368 | }
|
---|
369 |
|
---|
370 | // Decide result
|
---|
371 | ok := n <= lim.burst && waitDuration <= maxFutureReserve
|
---|
372 |
|
---|
373 | // Prepare reservation
|
---|
374 | r := Reservation{
|
---|
375 | ok: ok,
|
---|
376 | lim: lim,
|
---|
377 | limit: lim.limit,
|
---|
378 | }
|
---|
379 | if ok {
|
---|
380 | r.tokens = n
|
---|
381 | r.timeToAct = t.Add(waitDuration)
|
---|
382 |
|
---|
383 | // Update state
|
---|
384 | lim.last = t
|
---|
385 | lim.tokens = tokens
|
---|
386 | lim.lastEvent = r.timeToAct
|
---|
387 | }
|
---|
388 |
|
---|
389 | return r
|
---|
390 | }
|
---|
391 |
|
---|
392 | // advance calculates and returns an updated state for lim resulting from the passage of time.
|
---|
393 | // lim is not changed.
|
---|
394 | // advance requires that lim.mu is held.
|
---|
395 | func (lim *Limiter) advance(t time.Time) (newT time.Time, newTokens float64) {
|
---|
396 | last := lim.last
|
---|
397 | if t.Before(last) {
|
---|
398 | last = t
|
---|
399 | }
|
---|
400 |
|
---|
401 | // Calculate the new number of tokens, due to time that passed.
|
---|
402 | elapsed := t.Sub(last)
|
---|
403 | delta := lim.limit.tokensFromDuration(elapsed)
|
---|
404 | tokens := lim.tokens + delta
|
---|
405 | if burst := float64(lim.burst); tokens > burst {
|
---|
406 | tokens = burst
|
---|
407 | }
|
---|
408 | return t, tokens
|
---|
409 | }
|
---|
410 |
|
---|
411 | // durationFromTokens is a unit conversion function from the number of tokens to the duration
|
---|
412 | // of time it takes to accumulate them at a rate of limit tokens per second.
|
---|
413 | func (limit Limit) durationFromTokens(tokens float64) time.Duration {
|
---|
414 | if limit <= 0 {
|
---|
415 | return InfDuration
|
---|
416 | }
|
---|
417 | seconds := tokens / float64(limit)
|
---|
418 | return time.Duration(float64(time.Second) * seconds)
|
---|
419 | }
|
---|
420 |
|
---|
421 | // tokensFromDuration is a unit conversion function from a time duration to the number of tokens
|
---|
422 | // which could be accumulated during that duration at a rate of limit tokens per second.
|
---|
423 | func (limit Limit) tokensFromDuration(d time.Duration) float64 {
|
---|
424 | if limit <= 0 {
|
---|
425 | return 0
|
---|
426 | }
|
---|
427 | return d.Seconds() * float64(limit)
|
---|
428 | }
|
---|