1 | package stackless
|
---|
2 |
|
---|
3 | import (
|
---|
4 | "errors"
|
---|
5 | "fmt"
|
---|
6 | "io"
|
---|
7 |
|
---|
8 | "github.com/valyala/bytebufferpool"
|
---|
9 | )
|
---|
10 |
|
---|
11 | // Writer is an interface stackless writer must conform to.
|
---|
12 | //
|
---|
13 | // The interface contains common subset for Writers from compress/* packages.
|
---|
14 | type Writer interface {
|
---|
15 | Write(p []byte) (int, error)
|
---|
16 | Flush() error
|
---|
17 | Close() error
|
---|
18 | Reset(w io.Writer)
|
---|
19 | }
|
---|
20 |
|
---|
21 | // NewWriterFunc must return new writer that will be wrapped into
|
---|
22 | // stackless writer.
|
---|
23 | type NewWriterFunc func(w io.Writer) Writer
|
---|
24 |
|
---|
25 | // NewWriter creates a stackless writer around a writer returned
|
---|
26 | // from newWriter.
|
---|
27 | //
|
---|
28 | // The returned writer writes data to dstW.
|
---|
29 | //
|
---|
30 | // Writers that use a lot of stack space may be wrapped into stackless writer,
|
---|
31 | // thus saving stack space for high number of concurrently running goroutines.
|
---|
32 | func NewWriter(dstW io.Writer, newWriter NewWriterFunc) Writer {
|
---|
33 | w := &writer{
|
---|
34 | dstW: dstW,
|
---|
35 | }
|
---|
36 | w.zw = newWriter(&w.xw)
|
---|
37 | return w
|
---|
38 | }
|
---|
39 |
|
---|
40 | type writer struct {
|
---|
41 | dstW io.Writer
|
---|
42 | zw Writer
|
---|
43 | xw xWriter
|
---|
44 |
|
---|
45 | err error
|
---|
46 | n int
|
---|
47 |
|
---|
48 | p []byte
|
---|
49 | op op
|
---|
50 | }
|
---|
51 |
|
---|
52 | type op int
|
---|
53 |
|
---|
54 | const (
|
---|
55 | opWrite op = iota
|
---|
56 | opFlush
|
---|
57 | opClose
|
---|
58 | opReset
|
---|
59 | )
|
---|
60 |
|
---|
61 | func (w *writer) Write(p []byte) (int, error) {
|
---|
62 | w.p = p
|
---|
63 | err := w.do(opWrite)
|
---|
64 | w.p = nil
|
---|
65 | return w.n, err
|
---|
66 | }
|
---|
67 |
|
---|
68 | func (w *writer) Flush() error {
|
---|
69 | return w.do(opFlush)
|
---|
70 | }
|
---|
71 |
|
---|
72 | func (w *writer) Close() error {
|
---|
73 | return w.do(opClose)
|
---|
74 | }
|
---|
75 |
|
---|
76 | func (w *writer) Reset(dstW io.Writer) {
|
---|
77 | w.xw.Reset()
|
---|
78 | w.do(opReset) //nolint:errcheck
|
---|
79 | w.dstW = dstW
|
---|
80 | }
|
---|
81 |
|
---|
82 | func (w *writer) do(op op) error {
|
---|
83 | w.op = op
|
---|
84 | if !stacklessWriterFunc(w) {
|
---|
85 | return errHighLoad
|
---|
86 | }
|
---|
87 | err := w.err
|
---|
88 | if err != nil {
|
---|
89 | return err
|
---|
90 | }
|
---|
91 | if w.xw.bb != nil && len(w.xw.bb.B) > 0 {
|
---|
92 | _, err = w.dstW.Write(w.xw.bb.B)
|
---|
93 | }
|
---|
94 | w.xw.Reset()
|
---|
95 |
|
---|
96 | return err
|
---|
97 | }
|
---|
98 |
|
---|
99 | var errHighLoad = errors.New("cannot compress data due to high load")
|
---|
100 |
|
---|
101 | var stacklessWriterFunc = NewFunc(writerFunc)
|
---|
102 |
|
---|
103 | func writerFunc(ctx interface{}) {
|
---|
104 | w := ctx.(*writer)
|
---|
105 | switch w.op {
|
---|
106 | case opWrite:
|
---|
107 | w.n, w.err = w.zw.Write(w.p)
|
---|
108 | case opFlush:
|
---|
109 | w.err = w.zw.Flush()
|
---|
110 | case opClose:
|
---|
111 | w.err = w.zw.Close()
|
---|
112 | case opReset:
|
---|
113 | w.zw.Reset(&w.xw)
|
---|
114 | w.err = nil
|
---|
115 | default:
|
---|
116 | panic(fmt.Sprintf("BUG: unexpected op: %d", w.op))
|
---|
117 | }
|
---|
118 | }
|
---|
119 |
|
---|
120 | type xWriter struct {
|
---|
121 | bb *bytebufferpool.ByteBuffer
|
---|
122 | }
|
---|
123 |
|
---|
124 | func (w *xWriter) Write(p []byte) (int, error) {
|
---|
125 | if w.bb == nil {
|
---|
126 | w.bb = bufferPool.Get()
|
---|
127 | }
|
---|
128 | return w.bb.Write(p)
|
---|
129 | }
|
---|
130 |
|
---|
131 | func (w *xWriter) Reset() {
|
---|
132 | if w.bb != nil {
|
---|
133 | bufferPool.Put(w.bb)
|
---|
134 | w.bb = nil
|
---|
135 | }
|
---|
136 | }
|
---|
137 |
|
---|
138 | var bufferPool bytebufferpool.Pool
|
---|