source: code/trunk/vendor/github.com/valyala/fasthttp/fasthttputil/pipeconns.go@ 145

Last change on this file since 145 was 145, checked in by Izuru Yakumo, 22 months ago

Updated the Makefile and vendored depedencies

Signed-off-by: Izuru Yakumo <yakumo.izuru@…>

File size: 6.0 KB
Line 
1package fasthttputil
2
3import (
4 "errors"
5 "io"
6 "net"
7 "sync"
8 "time"
9)
10
11// NewPipeConns returns new bi-directional connection pipe.
12//
13// PipeConns is NOT safe for concurrent use by multiple goroutines!
14func NewPipeConns() *PipeConns {
15 ch1 := make(chan *byteBuffer, 4)
16 ch2 := make(chan *byteBuffer, 4)
17
18 pc := &PipeConns{
19 stopCh: make(chan struct{}),
20 }
21 pc.c1.rCh = ch1
22 pc.c1.wCh = ch2
23 pc.c2.rCh = ch2
24 pc.c2.wCh = ch1
25 pc.c1.pc = pc
26 pc.c2.pc = pc
27 return pc
28}
29
30// PipeConns provides bi-directional connection pipe,
31// which use in-process memory as a transport.
32//
33// PipeConns must be created by calling NewPipeConns.
34//
35// PipeConns has the following additional features comparing to connections
36// returned from net.Pipe():
37//
38// * It is faster.
39// * It buffers Write calls, so there is no need to have concurrent goroutine
40// calling Read in order to unblock each Write call.
41// * It supports read and write deadlines.
42//
43// PipeConns is NOT safe for concurrent use by multiple goroutines!
44type PipeConns struct {
45 c1 pipeConn
46 c2 pipeConn
47 stopCh chan struct{}
48 stopChLock sync.Mutex
49}
50
51// Conn1 returns the first end of bi-directional pipe.
52//
53// Data written to Conn1 may be read from Conn2.
54// Data written to Conn2 may be read from Conn1.
55func (pc *PipeConns) Conn1() net.Conn {
56 return &pc.c1
57}
58
59// Conn2 returns the second end of bi-directional pipe.
60//
61// Data written to Conn2 may be read from Conn1.
62// Data written to Conn1 may be read from Conn2.
63func (pc *PipeConns) Conn2() net.Conn {
64 return &pc.c2
65}
66
67// Close closes pipe connections.
68func (pc *PipeConns) Close() error {
69 pc.stopChLock.Lock()
70 select {
71 case <-pc.stopCh:
72 default:
73 close(pc.stopCh)
74 }
75 pc.stopChLock.Unlock()
76
77 return nil
78}
79
80type pipeConn struct {
81 b *byteBuffer
82 bb []byte
83
84 rCh chan *byteBuffer
85 wCh chan *byteBuffer
86 pc *PipeConns
87
88 readDeadlineTimer *time.Timer
89 writeDeadlineTimer *time.Timer
90
91 readDeadlineCh <-chan time.Time
92 writeDeadlineCh <-chan time.Time
93
94 readDeadlineChLock sync.Mutex
95}
96
97func (c *pipeConn) Write(p []byte) (int, error) {
98 b := acquireByteBuffer()
99 b.b = append(b.b[:0], p...)
100
101 select {
102 case <-c.pc.stopCh:
103 releaseByteBuffer(b)
104 return 0, errConnectionClosed
105 default:
106 }
107
108 select {
109 case c.wCh <- b:
110 default:
111 select {
112 case c.wCh <- b:
113 case <-c.writeDeadlineCh:
114 c.writeDeadlineCh = closedDeadlineCh
115 return 0, ErrTimeout
116 case <-c.pc.stopCh:
117 releaseByteBuffer(b)
118 return 0, errConnectionClosed
119 }
120 }
121
122 return len(p), nil
123}
124
125func (c *pipeConn) Read(p []byte) (int, error) {
126 mayBlock := true
127 nn := 0
128 for len(p) > 0 {
129 n, err := c.read(p, mayBlock)
130 nn += n
131 if err != nil {
132 if !mayBlock && err == errWouldBlock {
133 err = nil
134 }
135 return nn, err
136 }
137 p = p[n:]
138 mayBlock = false
139 }
140
141 return nn, nil
142}
143
144func (c *pipeConn) read(p []byte, mayBlock bool) (int, error) {
145 if len(c.bb) == 0 {
146 if err := c.readNextByteBuffer(mayBlock); err != nil {
147 return 0, err
148 }
149 }
150 n := copy(p, c.bb)
151 c.bb = c.bb[n:]
152
153 return n, nil
154}
155
156func (c *pipeConn) readNextByteBuffer(mayBlock bool) error {
157 releaseByteBuffer(c.b)
158 c.b = nil
159
160 select {
161 case c.b = <-c.rCh:
162 default:
163 if !mayBlock {
164 return errWouldBlock
165 }
166 c.readDeadlineChLock.Lock()
167 readDeadlineCh := c.readDeadlineCh
168 c.readDeadlineChLock.Unlock()
169 select {
170 case c.b = <-c.rCh:
171 case <-readDeadlineCh:
172 c.readDeadlineChLock.Lock()
173 c.readDeadlineCh = closedDeadlineCh
174 c.readDeadlineChLock.Unlock()
175 // rCh may contain data when deadline is reached.
176 // Read the data before returning ErrTimeout.
177 select {
178 case c.b = <-c.rCh:
179 default:
180 return ErrTimeout
181 }
182 case <-c.pc.stopCh:
183 // rCh may contain data when stopCh is closed.
184 // Read the data before returning EOF.
185 select {
186 case c.b = <-c.rCh:
187 default:
188 return io.EOF
189 }
190 }
191 }
192
193 c.bb = c.b.b
194 return nil
195}
196
197var (
198 errWouldBlock = errors.New("would block")
199 errConnectionClosed = errors.New("connection closed")
200)
201
202type timeoutError struct {
203}
204
205func (e *timeoutError) Error() string {
206 return "timeout"
207}
208
209// Only implement the Timeout() function of the net.Error interface.
210// This allows for checks like:
211//
212// if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
213func (e *timeoutError) Timeout() bool {
214 return true
215}
216
217var (
218 // ErrTimeout is returned from Read() or Write() on timeout.
219 ErrTimeout = &timeoutError{}
220)
221
222func (c *pipeConn) Close() error {
223 return c.pc.Close()
224}
225
226func (c *pipeConn) LocalAddr() net.Addr {
227 return pipeAddr(0)
228}
229
230func (c *pipeConn) RemoteAddr() net.Addr {
231 return pipeAddr(0)
232}
233
234func (c *pipeConn) SetDeadline(deadline time.Time) error {
235 c.SetReadDeadline(deadline) //nolint:errcheck
236 c.SetWriteDeadline(deadline) //nolint:errcheck
237 return nil
238}
239
240func (c *pipeConn) SetReadDeadline(deadline time.Time) error {
241 if c.readDeadlineTimer == nil {
242 c.readDeadlineTimer = time.NewTimer(time.Hour)
243 }
244 readDeadlineCh := updateTimer(c.readDeadlineTimer, deadline)
245 c.readDeadlineChLock.Lock()
246 c.readDeadlineCh = readDeadlineCh
247 c.readDeadlineChLock.Unlock()
248 return nil
249}
250
251func (c *pipeConn) SetWriteDeadline(deadline time.Time) error {
252 if c.writeDeadlineTimer == nil {
253 c.writeDeadlineTimer = time.NewTimer(time.Hour)
254 }
255 c.writeDeadlineCh = updateTimer(c.writeDeadlineTimer, deadline)
256 return nil
257}
258
259func updateTimer(t *time.Timer, deadline time.Time) <-chan time.Time {
260 if !t.Stop() {
261 select {
262 case <-t.C:
263 default:
264 }
265 }
266 if deadline.IsZero() {
267 return nil
268 }
269 d := -time.Since(deadline)
270 if d <= 0 {
271 return closedDeadlineCh
272 }
273 t.Reset(d)
274 return t.C
275}
276
277var closedDeadlineCh = func() <-chan time.Time {
278 ch := make(chan time.Time)
279 close(ch)
280 return ch
281}()
282
283type pipeAddr int
284
285func (pipeAddr) Network() string {
286 return "pipe"
287}
288
289func (pipeAddr) String() string {
290 return "pipe"
291}
292
293type byteBuffer struct {
294 b []byte
295}
296
297func acquireByteBuffer() *byteBuffer {
298 return byteBufferPool.Get().(*byteBuffer)
299}
300
301func releaseByteBuffer(b *byteBuffer) {
302 if b != nil {
303 byteBufferPool.Put(b)
304 }
305}
306
307var byteBufferPool = &sync.Pool{
308 New: func() interface{} {
309 return &byteBuffer{
310 b: make([]byte, 1024),
311 }
312 },
313}
Note: See TracBrowser for help on using the repository browser.