source: code/trunk/conn.go@ 722

Last change on this file since 722 was 703, checked in by contact, 4 years ago

Introduce conn.NewContext

This function wraps a parent context, and returns a new context
cancelled when the connection is closed. This will make it so
operations started from downstreamConn.handleMessage will be
cancelled when the connection is closed.

File size: 6.7 KB
Line 
1package soju
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "net"
8 "strings"
9 "sync"
10 "time"
11 "unicode"
12
13 "gopkg.in/irc.v3"
14 "nhooyr.io/websocket"
15)
16
17// ircConn is a generic IRC connection. It's similar to net.Conn but focuses on
18// reading and writing IRC messages.
19type ircConn interface {
20 ReadMessage() (*irc.Message, error)
21 WriteMessage(*irc.Message) error
22 Close() error
23 SetReadDeadline(time.Time) error
24 SetWriteDeadline(time.Time) error
25 RemoteAddr() net.Addr
26 LocalAddr() net.Addr
27}
28
29func newNetIRCConn(c net.Conn) ircConn {
30 type netConn net.Conn
31 return struct {
32 *irc.Conn
33 netConn
34 }{irc.NewConn(c), c}
35}
36
37type websocketIRCConn struct {
38 conn *websocket.Conn
39 readDeadline, writeDeadline time.Time
40 remoteAddr string
41}
42
43func newWebsocketIRCConn(c *websocket.Conn, remoteAddr string) ircConn {
44 return &websocketIRCConn{conn: c, remoteAddr: remoteAddr}
45}
46
47func (wic *websocketIRCConn) ReadMessage() (*irc.Message, error) {
48 ctx := context.Background()
49 if !wic.readDeadline.IsZero() {
50 var cancel context.CancelFunc
51 ctx, cancel = context.WithDeadline(ctx, wic.readDeadline)
52 defer cancel()
53 }
54 _, b, err := wic.conn.Read(ctx)
55 if err != nil {
56 switch websocket.CloseStatus(err) {
57 case websocket.StatusNormalClosure, websocket.StatusGoingAway:
58 return nil, io.EOF
59 default:
60 return nil, err
61 }
62 }
63 return irc.ParseMessage(string(b))
64}
65
66func (wic *websocketIRCConn) WriteMessage(msg *irc.Message) error {
67 b := []byte(strings.ToValidUTF8(msg.String(), string(unicode.ReplacementChar)))
68 ctx := context.Background()
69 if !wic.writeDeadline.IsZero() {
70 var cancel context.CancelFunc
71 ctx, cancel = context.WithDeadline(ctx, wic.writeDeadline)
72 defer cancel()
73 }
74 return wic.conn.Write(ctx, websocket.MessageText, b)
75}
76
77func isErrWebSocketClosed(err error) bool {
78 return err != nil && strings.HasSuffix(err.Error(), "failed to close WebSocket: already wrote close")
79}
80
81func (wic *websocketIRCConn) Close() error {
82 err := wic.conn.Close(websocket.StatusNormalClosure, "")
83 // TODO: remove once this PR is merged:
84 // https://github.com/nhooyr/websocket/pull/303
85 if isErrWebSocketClosed(err) {
86 return nil
87 }
88 return err
89}
90
91func (wic *websocketIRCConn) SetReadDeadline(t time.Time) error {
92 wic.readDeadline = t
93 return nil
94}
95
96func (wic *websocketIRCConn) SetWriteDeadline(t time.Time) error {
97 wic.writeDeadline = t
98 return nil
99}
100
101func (wic *websocketIRCConn) RemoteAddr() net.Addr {
102 return websocketAddr(wic.remoteAddr)
103}
104
105func (wic *websocketIRCConn) LocalAddr() net.Addr {
106 // Behind a reverse HTTP proxy, we don't have access to the real listening
107 // address
108 return websocketAddr("")
109}
110
111type websocketAddr string
112
113func (websocketAddr) Network() string {
114 return "ws"
115}
116
117func (wa websocketAddr) String() string {
118 return string(wa)
119}
120
121type rateLimiter struct {
122 C <-chan struct{}
123 ticker *time.Ticker
124 stopped chan struct{}
125}
126
127func newRateLimiter(delay time.Duration, burst int) *rateLimiter {
128 ch := make(chan struct{}, burst)
129 for i := 0; i < burst; i++ {
130 ch <- struct{}{}
131 }
132 ticker := time.NewTicker(delay)
133 stopped := make(chan struct{})
134 go func() {
135 for {
136 select {
137 case <-ticker.C:
138 select {
139 case ch <- struct{}{}:
140 // This space is intentionally left blank
141 case <-stopped:
142 return
143 }
144 case <-stopped:
145 return
146 }
147 }
148 }()
149 return &rateLimiter{
150 C: ch,
151 ticker: ticker,
152 stopped: stopped,
153 }
154}
155
156func (rl *rateLimiter) Stop() {
157 rl.ticker.Stop()
158 close(rl.stopped)
159}
160
161type connOptions struct {
162 Logger Logger
163 RateLimitDelay time.Duration
164 RateLimitBurst int
165}
166
167type conn struct {
168 conn ircConn
169 srv *Server
170 logger Logger
171
172 lock sync.Mutex
173 outgoing chan<- *irc.Message
174 closed bool
175 closedCh chan struct{}
176}
177
178func newConn(srv *Server, ic ircConn, options *connOptions) *conn {
179 outgoing := make(chan *irc.Message, 64)
180 c := &conn{
181 conn: ic,
182 srv: srv,
183 outgoing: outgoing,
184 logger: options.Logger,
185 closedCh: make(chan struct{}),
186 }
187
188 go func() {
189 var rl *rateLimiter
190 if options.RateLimitDelay > 0 && options.RateLimitBurst > 0 {
191 rl = newRateLimiter(options.RateLimitDelay, options.RateLimitBurst)
192 defer rl.Stop()
193 }
194
195 for msg := range outgoing {
196 if rl != nil {
197 <-rl.C
198 }
199
200 if c.srv.Config().Debug {
201 c.logger.Printf("sent: %v", msg)
202 }
203 c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
204 if err := c.conn.WriteMessage(msg); err != nil {
205 c.logger.Printf("failed to write message: %v", err)
206 break
207 }
208 }
209 if err := c.conn.Close(); err != nil && !isErrClosed(err) {
210 c.logger.Printf("failed to close connection: %v", err)
211 } else {
212 c.logger.Printf("connection closed")
213 }
214 // Drain the outgoing channel to prevent SendMessage from blocking
215 for range outgoing {
216 // This space is intentionally left blank
217 }
218 }()
219
220 c.logger.Printf("new connection")
221 return c
222}
223
224func (c *conn) isClosed() bool {
225 c.lock.Lock()
226 defer c.lock.Unlock()
227 return c.closed
228}
229
230// Close closes the connection. It is safe to call from any goroutine.
231func (c *conn) Close() error {
232 c.lock.Lock()
233 defer c.lock.Unlock()
234
235 if c.closed {
236 return fmt.Errorf("connection already closed")
237 }
238
239 err := c.conn.Close()
240 c.closed = true
241 close(c.outgoing)
242 close(c.closedCh)
243 return err
244}
245
246func (c *conn) ReadMessage() (*irc.Message, error) {
247 msg, err := c.conn.ReadMessage()
248 if isErrClosed(err) {
249 return nil, io.EOF
250 } else if err != nil {
251 return nil, err
252 }
253
254 if c.srv.Config().Debug {
255 c.logger.Printf("received: %v", msg)
256 }
257
258 return msg, nil
259}
260
261// SendMessage queues a new outgoing message. It is safe to call from any
262// goroutine.
263//
264// If the connection is closed before the message is sent, SendMessage silently
265// drops the message.
266func (c *conn) SendMessage(msg *irc.Message) {
267 c.lock.Lock()
268 defer c.lock.Unlock()
269
270 if c.closed {
271 return
272 }
273 c.outgoing <- msg
274}
275
276func (c *conn) RemoteAddr() net.Addr {
277 return c.conn.RemoteAddr()
278}
279
280func (c *conn) LocalAddr() net.Addr {
281 return c.conn.LocalAddr()
282}
283
284// NewContext returns a copy of the parent context with a new Done channel. The
285// returned context's Done channel is closed when the connection is closed,
286// when the returned cancel function is called, or when the parent context's
287// Done channel is closed, whichever happens first.
288//
289// Canceling this context releases resources associated with it, so code should
290// call cancel as soon as the operations running in this Context complete.
291func (c *conn) NewContext(parent context.Context) (context.Context, context.CancelFunc) {
292 ctx, cancel := context.WithCancel(parent)
293
294 go func() {
295 defer cancel()
296
297 select {
298 case <-ctx.Done():
299 // The parent context has been cancelled, or the caller has called
300 // cancel()
301 case <-c.closedCh:
302 // The connection has been closed
303 }
304 }()
305
306 return ctx, cancel
307}
Note: See TracBrowser for help on using the repository browser.