source: code/trunk/vendor/github.com/valyala/fasthttp/workerpool.go@ 145

Last change on this file since 145 was 145, checked in by Izuru Yakumo, 22 months ago

Updated the Makefile and vendored depedencies

Signed-off-by: Izuru Yakumo <yakumo.izuru@…>

File size: 5.0 KB
Line 
1package fasthttp
2
3import (
4 "errors"
5 "net"
6 "runtime"
7 "strings"
8 "sync"
9 "time"
10)
11
12// workerPool serves incoming connections via a pool of workers
13// in FILO order, i.e. the most recently stopped worker will serve the next
14// incoming connection.
15//
16// Such a scheme keeps CPU caches hot (in theory).
17type workerPool struct {
18 // Function for serving server connections.
19 // It must leave c unclosed.
20 WorkerFunc ServeHandler
21
22 MaxWorkersCount int
23
24 LogAllErrors bool
25
26 MaxIdleWorkerDuration time.Duration
27
28 Logger Logger
29
30 lock sync.Mutex
31 workersCount int
32 mustStop bool
33
34 ready []*workerChan
35
36 stopCh chan struct{}
37
38 workerChanPool sync.Pool
39
40 connState func(net.Conn, ConnState)
41}
42
43type workerChan struct {
44 lastUseTime time.Time
45 ch chan net.Conn
46}
47
48func (wp *workerPool) Start() {
49 if wp.stopCh != nil {
50 panic("BUG: workerPool already started")
51 }
52 wp.stopCh = make(chan struct{})
53 stopCh := wp.stopCh
54 wp.workerChanPool.New = func() interface{} {
55 return &workerChan{
56 ch: make(chan net.Conn, workerChanCap),
57 }
58 }
59 go func() {
60 var scratch []*workerChan
61 for {
62 wp.clean(&scratch)
63 select {
64 case <-stopCh:
65 return
66 default:
67 time.Sleep(wp.getMaxIdleWorkerDuration())
68 }
69 }
70 }()
71}
72
73func (wp *workerPool) Stop() {
74 if wp.stopCh == nil {
75 panic("BUG: workerPool wasn't started")
76 }
77 close(wp.stopCh)
78 wp.stopCh = nil
79
80 // Stop all the workers waiting for incoming connections.
81 // Do not wait for busy workers - they will stop after
82 // serving the connection and noticing wp.mustStop = true.
83 wp.lock.Lock()
84 ready := wp.ready
85 for i := range ready {
86 ready[i].ch <- nil
87 ready[i] = nil
88 }
89 wp.ready = ready[:0]
90 wp.mustStop = true
91 wp.lock.Unlock()
92}
93
94func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
95 if wp.MaxIdleWorkerDuration <= 0 {
96 return 10 * time.Second
97 }
98 return wp.MaxIdleWorkerDuration
99}
100
101func (wp *workerPool) clean(scratch *[]*workerChan) {
102 maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()
103
104 // Clean least recently used workers if they didn't serve connections
105 // for more than maxIdleWorkerDuration.
106 criticalTime := time.Now().Add(-maxIdleWorkerDuration)
107
108 wp.lock.Lock()
109 ready := wp.ready
110 n := len(ready)
111
112 // Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up.
113 l, r, mid := 0, n-1, 0
114 for l <= r {
115 mid = (l + r) / 2
116 if criticalTime.After(wp.ready[mid].lastUseTime) {
117 l = mid + 1
118 } else {
119 r = mid - 1
120 }
121 }
122 i := r
123 if i == -1 {
124 wp.lock.Unlock()
125 return
126 }
127
128 *scratch = append((*scratch)[:0], ready[:i+1]...)
129 m := copy(ready, ready[i+1:])
130 for i = m; i < n; i++ {
131 ready[i] = nil
132 }
133 wp.ready = ready[:m]
134 wp.lock.Unlock()
135
136 // Notify obsolete workers to stop.
137 // This notification must be outside the wp.lock, since ch.ch
138 // may be blocking and may consume a lot of time if many workers
139 // are located on non-local CPUs.
140 tmp := *scratch
141 for i := range tmp {
142 tmp[i].ch <- nil
143 tmp[i] = nil
144 }
145}
146
147func (wp *workerPool) Serve(c net.Conn) bool {
148 ch := wp.getCh()
149 if ch == nil {
150 return false
151 }
152 ch.ch <- c
153 return true
154}
155
156var workerChanCap = func() int {
157 // Use blocking workerChan if GOMAXPROCS=1.
158 // This immediately switches Serve to WorkerFunc, which results
159 // in higher performance (under go1.5 at least).
160 if runtime.GOMAXPROCS(0) == 1 {
161 return 0
162 }
163
164 // Use non-blocking workerChan if GOMAXPROCS>1,
165 // since otherwise the Serve caller (Acceptor) may lag accepting
166 // new connections if WorkerFunc is CPU-bound.
167 return 1
168}()
169
170func (wp *workerPool) getCh() *workerChan {
171 var ch *workerChan
172 createWorker := false
173
174 wp.lock.Lock()
175 ready := wp.ready
176 n := len(ready) - 1
177 if n < 0 {
178 if wp.workersCount < wp.MaxWorkersCount {
179 createWorker = true
180 wp.workersCount++
181 }
182 } else {
183 ch = ready[n]
184 ready[n] = nil
185 wp.ready = ready[:n]
186 }
187 wp.lock.Unlock()
188
189 if ch == nil {
190 if !createWorker {
191 return nil
192 }
193 vch := wp.workerChanPool.Get()
194 ch = vch.(*workerChan)
195 go func() {
196 wp.workerFunc(ch)
197 wp.workerChanPool.Put(vch)
198 }()
199 }
200 return ch
201}
202
203func (wp *workerPool) release(ch *workerChan) bool {
204 ch.lastUseTime = time.Now()
205 wp.lock.Lock()
206 if wp.mustStop {
207 wp.lock.Unlock()
208 return false
209 }
210 wp.ready = append(wp.ready, ch)
211 wp.lock.Unlock()
212 return true
213}
214
215func (wp *workerPool) workerFunc(ch *workerChan) {
216 var c net.Conn
217
218 var err error
219 for c = range ch.ch {
220 if c == nil {
221 break
222 }
223
224 if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
225 errStr := err.Error()
226 if wp.LogAllErrors || !(strings.Contains(errStr, "broken pipe") ||
227 strings.Contains(errStr, "reset by peer") ||
228 strings.Contains(errStr, "request headers: small read buffer") ||
229 strings.Contains(errStr, "unexpected EOF") ||
230 strings.Contains(errStr, "i/o timeout") ||
231 errors.Is(err, ErrBadTrailer)) {
232 wp.Logger.Printf("error when serving connection %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err)
233 }
234 }
235 if err == errHijacked {
236 wp.connState(c, StateHijacked)
237 } else {
238 _ = c.Close()
239 wp.connState(c, StateClosed)
240 }
241 c = nil
242
243 if !wp.release(ch) {
244 break
245 }
246 }
247
248 wp.lock.Lock()
249 wp.workersCount--
250 wp.lock.Unlock()
251}
Note: See TracBrowser for help on using the repository browser.