source: code/trunk/vendor/github.com/valyala/fasthttp/stackless/writer.go@ 145

Last change on this file since 145 was 145, checked in by Izuru Yakumo, 22 months ago

Updated the Makefile and vendored depedencies

Signed-off-by: Izuru Yakumo <yakumo.izuru@…>

File size: 2.3 KB
Line 
1package stackless
2
3import (
4 "errors"
5 "fmt"
6 "io"
7
8 "github.com/valyala/bytebufferpool"
9)
10
11// Writer is an interface stackless writer must conform to.
12//
13// The interface contains common subset for Writers from compress/* packages.
14type Writer interface {
15 Write(p []byte) (int, error)
16 Flush() error
17 Close() error
18 Reset(w io.Writer)
19}
20
21// NewWriterFunc must return new writer that will be wrapped into
22// stackless writer.
23type NewWriterFunc func(w io.Writer) Writer
24
25// NewWriter creates a stackless writer around a writer returned
26// from newWriter.
27//
28// The returned writer writes data to dstW.
29//
30// Writers that use a lot of stack space may be wrapped into stackless writer,
31// thus saving stack space for high number of concurrently running goroutines.
32func NewWriter(dstW io.Writer, newWriter NewWriterFunc) Writer {
33 w := &writer{
34 dstW: dstW,
35 }
36 w.zw = newWriter(&w.xw)
37 return w
38}
39
40type writer struct {
41 dstW io.Writer
42 zw Writer
43 xw xWriter
44
45 err error
46 n int
47
48 p []byte
49 op op
50}
51
52type op int
53
54const (
55 opWrite op = iota
56 opFlush
57 opClose
58 opReset
59)
60
61func (w *writer) Write(p []byte) (int, error) {
62 w.p = p
63 err := w.do(opWrite)
64 w.p = nil
65 return w.n, err
66}
67
68func (w *writer) Flush() error {
69 return w.do(opFlush)
70}
71
72func (w *writer) Close() error {
73 return w.do(opClose)
74}
75
76func (w *writer) Reset(dstW io.Writer) {
77 w.xw.Reset()
78 w.do(opReset) //nolint:errcheck
79 w.dstW = dstW
80}
81
82func (w *writer) do(op op) error {
83 w.op = op
84 if !stacklessWriterFunc(w) {
85 return errHighLoad
86 }
87 err := w.err
88 if err != nil {
89 return err
90 }
91 if w.xw.bb != nil && len(w.xw.bb.B) > 0 {
92 _, err = w.dstW.Write(w.xw.bb.B)
93 }
94 w.xw.Reset()
95
96 return err
97}
98
99var errHighLoad = errors.New("cannot compress data due to high load")
100
101var stacklessWriterFunc = NewFunc(writerFunc)
102
103func writerFunc(ctx interface{}) {
104 w := ctx.(*writer)
105 switch w.op {
106 case opWrite:
107 w.n, w.err = w.zw.Write(w.p)
108 case opFlush:
109 w.err = w.zw.Flush()
110 case opClose:
111 w.err = w.zw.Close()
112 case opReset:
113 w.zw.Reset(&w.xw)
114 w.err = nil
115 default:
116 panic(fmt.Sprintf("BUG: unexpected op: %d", w.op))
117 }
118}
119
120type xWriter struct {
121 bb *bytebufferpool.ByteBuffer
122}
123
124func (w *xWriter) Write(p []byte) (int, error) {
125 if w.bb == nil {
126 w.bb = bufferPool.Get()
127 }
128 return w.bb.Write(p)
129}
130
131func (w *xWriter) Reset() {
132 if w.bb != nil {
133 bufferPool.Put(w.bb)
134 w.bb = nil
135 }
136}
137
138var bufferPool bytebufferpool.Pool
Note: See TracBrowser for help on using the repository browser.