1 | // Package quantile computes approximate quantiles over an unbounded data
|
---|
2 | // stream within low memory and CPU bounds.
|
---|
3 | //
|
---|
4 | // A small amount of accuracy is traded to achieve the above properties.
|
---|
5 | //
|
---|
6 | // Multiple streams can be merged before calling Query to generate a single set
|
---|
7 | // of results. This is meaningful when the streams represent the same type of
|
---|
8 | // data. See Merge and Samples.
|
---|
9 | //
|
---|
10 | // For more detailed information about the algorithm used, see:
|
---|
11 | //
|
---|
12 | // Effective Computation of Biased Quantiles over Data Streams
|
---|
13 | //
|
---|
14 | // http://www.cs.rutgers.edu/~muthu/bquant.pdf
|
---|
15 | package quantile
|
---|
16 |
|
---|
17 | import (
|
---|
18 | "math"
|
---|
19 | "sort"
|
---|
20 | )
|
---|
21 |
|
---|
22 | // Sample holds an observed value and meta information for compression. JSON
|
---|
23 | // tags have been added for convenience.
|
---|
24 | type Sample struct {
|
---|
25 | Value float64 `json:",string"`
|
---|
26 | Width float64 `json:",string"`
|
---|
27 | Delta float64 `json:",string"`
|
---|
28 | }
|
---|
29 |
|
---|
30 | // Samples represents a slice of samples. It implements sort.Interface.
|
---|
31 | type Samples []Sample
|
---|
32 |
|
---|
33 | func (a Samples) Len() int { return len(a) }
|
---|
34 | func (a Samples) Less(i, j int) bool { return a[i].Value < a[j].Value }
|
---|
35 | func (a Samples) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
---|
36 |
|
---|
37 | type invariant func(s *stream, r float64) float64
|
---|
38 |
|
---|
39 | // NewLowBiased returns an initialized Stream for low-biased quantiles
|
---|
40 | // (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but
|
---|
41 | // error guarantees can still be given even for the lower ranks of the data
|
---|
42 | // distribution.
|
---|
43 | //
|
---|
44 | // The provided epsilon is a relative error, i.e. the true quantile of a value
|
---|
45 | // returned by a query is guaranteed to be within (1±Epsilon)*Quantile.
|
---|
46 | //
|
---|
47 | // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error
|
---|
48 | // properties.
|
---|
49 | func NewLowBiased(epsilon float64) *Stream {
|
---|
50 | ƒ := func(s *stream, r float64) float64 {
|
---|
51 | return 2 * epsilon * r
|
---|
52 | }
|
---|
53 | return newStream(ƒ)
|
---|
54 | }
|
---|
55 |
|
---|
56 | // NewHighBiased returns an initialized Stream for high-biased quantiles
|
---|
57 | // (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but
|
---|
58 | // error guarantees can still be given even for the higher ranks of the data
|
---|
59 | // distribution.
|
---|
60 | //
|
---|
61 | // The provided epsilon is a relative error, i.e. the true quantile of a value
|
---|
62 | // returned by a query is guaranteed to be within 1-(1±Epsilon)*(1-Quantile).
|
---|
63 | //
|
---|
64 | // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error
|
---|
65 | // properties.
|
---|
66 | func NewHighBiased(epsilon float64) *Stream {
|
---|
67 | ƒ := func(s *stream, r float64) float64 {
|
---|
68 | return 2 * epsilon * (s.n - r)
|
---|
69 | }
|
---|
70 | return newStream(ƒ)
|
---|
71 | }
|
---|
72 |
|
---|
73 | // NewTargeted returns an initialized Stream concerned with a particular set of
|
---|
74 | // quantile values that are supplied a priori. Knowing these a priori reduces
|
---|
75 | // space and computation time. The targets map maps the desired quantiles to
|
---|
76 | // their absolute errors, i.e. the true quantile of a value returned by a query
|
---|
77 | // is guaranteed to be within (Quantile±Epsilon).
|
---|
78 | //
|
---|
79 | // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties.
|
---|
80 | func NewTargeted(targetMap map[float64]float64) *Stream {
|
---|
81 | // Convert map to slice to avoid slow iterations on a map.
|
---|
82 | // ƒ is called on the hot path, so converting the map to a slice
|
---|
83 | // beforehand results in significant CPU savings.
|
---|
84 | targets := targetMapToSlice(targetMap)
|
---|
85 |
|
---|
86 | ƒ := func(s *stream, r float64) float64 {
|
---|
87 | var m = math.MaxFloat64
|
---|
88 | var f float64
|
---|
89 | for _, t := range targets {
|
---|
90 | if t.quantile*s.n <= r {
|
---|
91 | f = (2 * t.epsilon * r) / t.quantile
|
---|
92 | } else {
|
---|
93 | f = (2 * t.epsilon * (s.n - r)) / (1 - t.quantile)
|
---|
94 | }
|
---|
95 | if f < m {
|
---|
96 | m = f
|
---|
97 | }
|
---|
98 | }
|
---|
99 | return m
|
---|
100 | }
|
---|
101 | return newStream(ƒ)
|
---|
102 | }
|
---|
103 |
|
---|
104 | type target struct {
|
---|
105 | quantile float64
|
---|
106 | epsilon float64
|
---|
107 | }
|
---|
108 |
|
---|
109 | func targetMapToSlice(targetMap map[float64]float64) []target {
|
---|
110 | targets := make([]target, 0, len(targetMap))
|
---|
111 |
|
---|
112 | for quantile, epsilon := range targetMap {
|
---|
113 | t := target{
|
---|
114 | quantile: quantile,
|
---|
115 | epsilon: epsilon,
|
---|
116 | }
|
---|
117 | targets = append(targets, t)
|
---|
118 | }
|
---|
119 |
|
---|
120 | return targets
|
---|
121 | }
|
---|
122 |
|
---|
123 | // Stream computes quantiles for a stream of float64s. It is not thread-safe by
|
---|
124 | // design. Take care when using across multiple goroutines.
|
---|
125 | type Stream struct {
|
---|
126 | *stream
|
---|
127 | b Samples
|
---|
128 | sorted bool
|
---|
129 | }
|
---|
130 |
|
---|
131 | func newStream(ƒ invariant) *Stream {
|
---|
132 | x := &stream{ƒ: ƒ}
|
---|
133 | return &Stream{x, make(Samples, 0, 500), true}
|
---|
134 | }
|
---|
135 |
|
---|
136 | // Insert inserts v into the stream.
|
---|
137 | func (s *Stream) Insert(v float64) {
|
---|
138 | s.insert(Sample{Value: v, Width: 1})
|
---|
139 | }
|
---|
140 |
|
---|
141 | func (s *Stream) insert(sample Sample) {
|
---|
142 | s.b = append(s.b, sample)
|
---|
143 | s.sorted = false
|
---|
144 | if len(s.b) == cap(s.b) {
|
---|
145 | s.flush()
|
---|
146 | }
|
---|
147 | }
|
---|
148 |
|
---|
149 | // Query returns the computed qth percentiles value. If s was created with
|
---|
150 | // NewTargeted, and q is not in the set of quantiles provided a priori, Query
|
---|
151 | // will return an unspecified result.
|
---|
152 | func (s *Stream) Query(q float64) float64 {
|
---|
153 | if !s.flushed() {
|
---|
154 | // Fast path when there hasn't been enough data for a flush;
|
---|
155 | // this also yields better accuracy for small sets of data.
|
---|
156 | l := len(s.b)
|
---|
157 | if l == 0 {
|
---|
158 | return 0
|
---|
159 | }
|
---|
160 | i := int(math.Ceil(float64(l) * q))
|
---|
161 | if i > 0 {
|
---|
162 | i -= 1
|
---|
163 | }
|
---|
164 | s.maybeSort()
|
---|
165 | return s.b[i].Value
|
---|
166 | }
|
---|
167 | s.flush()
|
---|
168 | return s.stream.query(q)
|
---|
169 | }
|
---|
170 |
|
---|
171 | // Merge merges samples into the underlying streams samples. This is handy when
|
---|
172 | // merging multiple streams from separate threads, database shards, etc.
|
---|
173 | //
|
---|
174 | // ATTENTION: This method is broken and does not yield correct results. The
|
---|
175 | // underlying algorithm is not capable of merging streams correctly.
|
---|
176 | func (s *Stream) Merge(samples Samples) {
|
---|
177 | sort.Sort(samples)
|
---|
178 | s.stream.merge(samples)
|
---|
179 | }
|
---|
180 |
|
---|
181 | // Reset reinitializes and clears the list reusing the samples buffer memory.
|
---|
182 | func (s *Stream) Reset() {
|
---|
183 | s.stream.reset()
|
---|
184 | s.b = s.b[:0]
|
---|
185 | }
|
---|
186 |
|
---|
187 | // Samples returns stream samples held by s.
|
---|
188 | func (s *Stream) Samples() Samples {
|
---|
189 | if !s.flushed() {
|
---|
190 | return s.b
|
---|
191 | }
|
---|
192 | s.flush()
|
---|
193 | return s.stream.samples()
|
---|
194 | }
|
---|
195 |
|
---|
196 | // Count returns the total number of samples observed in the stream
|
---|
197 | // since initialization.
|
---|
198 | func (s *Stream) Count() int {
|
---|
199 | return len(s.b) + s.stream.count()
|
---|
200 | }
|
---|
201 |
|
---|
202 | func (s *Stream) flush() {
|
---|
203 | s.maybeSort()
|
---|
204 | s.stream.merge(s.b)
|
---|
205 | s.b = s.b[:0]
|
---|
206 | }
|
---|
207 |
|
---|
208 | func (s *Stream) maybeSort() {
|
---|
209 | if !s.sorted {
|
---|
210 | s.sorted = true
|
---|
211 | sort.Sort(s.b)
|
---|
212 | }
|
---|
213 | }
|
---|
214 |
|
---|
215 | func (s *Stream) flushed() bool {
|
---|
216 | return len(s.stream.l) > 0
|
---|
217 | }
|
---|
218 |
|
---|
219 | type stream struct {
|
---|
220 | n float64
|
---|
221 | l []Sample
|
---|
222 | ƒ invariant
|
---|
223 | }
|
---|
224 |
|
---|
225 | func (s *stream) reset() {
|
---|
226 | s.l = s.l[:0]
|
---|
227 | s.n = 0
|
---|
228 | }
|
---|
229 |
|
---|
230 | func (s *stream) insert(v float64) {
|
---|
231 | s.merge(Samples{{v, 1, 0}})
|
---|
232 | }
|
---|
233 |
|
---|
234 | func (s *stream) merge(samples Samples) {
|
---|
235 | // TODO(beorn7): This tries to merge not only individual samples, but
|
---|
236 | // whole summaries. The paper doesn't mention merging summaries at
|
---|
237 | // all. Unittests show that the merging is inaccurate. Find out how to
|
---|
238 | // do merges properly.
|
---|
239 | var r float64
|
---|
240 | i := 0
|
---|
241 | for _, sample := range samples {
|
---|
242 | for ; i < len(s.l); i++ {
|
---|
243 | c := s.l[i]
|
---|
244 | if c.Value > sample.Value {
|
---|
245 | // Insert at position i.
|
---|
246 | s.l = append(s.l, Sample{})
|
---|
247 | copy(s.l[i+1:], s.l[i:])
|
---|
248 | s.l[i] = Sample{
|
---|
249 | sample.Value,
|
---|
250 | sample.Width,
|
---|
251 | math.Max(sample.Delta, math.Floor(s.ƒ(s, r))-1),
|
---|
252 | // TODO(beorn7): How to calculate delta correctly?
|
---|
253 | }
|
---|
254 | i++
|
---|
255 | goto inserted
|
---|
256 | }
|
---|
257 | r += c.Width
|
---|
258 | }
|
---|
259 | s.l = append(s.l, Sample{sample.Value, sample.Width, 0})
|
---|
260 | i++
|
---|
261 | inserted:
|
---|
262 | s.n += sample.Width
|
---|
263 | r += sample.Width
|
---|
264 | }
|
---|
265 | s.compress()
|
---|
266 | }
|
---|
267 |
|
---|
268 | func (s *stream) count() int {
|
---|
269 | return int(s.n)
|
---|
270 | }
|
---|
271 |
|
---|
272 | func (s *stream) query(q float64) float64 {
|
---|
273 | t := math.Ceil(q * s.n)
|
---|
274 | t += math.Ceil(s.ƒ(s, t) / 2)
|
---|
275 | p := s.l[0]
|
---|
276 | var r float64
|
---|
277 | for _, c := range s.l[1:] {
|
---|
278 | r += p.Width
|
---|
279 | if r+c.Width+c.Delta > t {
|
---|
280 | return p.Value
|
---|
281 | }
|
---|
282 | p = c
|
---|
283 | }
|
---|
284 | return p.Value
|
---|
285 | }
|
---|
286 |
|
---|
287 | func (s *stream) compress() {
|
---|
288 | if len(s.l) < 2 {
|
---|
289 | return
|
---|
290 | }
|
---|
291 | x := s.l[len(s.l)-1]
|
---|
292 | xi := len(s.l) - 1
|
---|
293 | r := s.n - 1 - x.Width
|
---|
294 |
|
---|
295 | for i := len(s.l) - 2; i >= 0; i-- {
|
---|
296 | c := s.l[i]
|
---|
297 | if c.Width+x.Width+x.Delta <= s.ƒ(s, r) {
|
---|
298 | x.Width += c.Width
|
---|
299 | s.l[xi] = x
|
---|
300 | // Remove element at i.
|
---|
301 | copy(s.l[i:], s.l[i+1:])
|
---|
302 | s.l = s.l[:len(s.l)-1]
|
---|
303 | xi -= 1
|
---|
304 | } else {
|
---|
305 | x = c
|
---|
306 | xi = i
|
---|
307 | }
|
---|
308 | r -= c.Width
|
---|
309 | }
|
---|
310 | }
|
---|
311 |
|
---|
312 | func (s *stream) samples() Samples {
|
---|
313 | samples := make(Samples, len(s.l))
|
---|
314 | copy(samples, s.l)
|
---|
315 | return samples
|
---|
316 | }
|
---|