source: code/trunk/conn.go@ 746

Last change on this file since 746 was 741, checked in by contact, 4 years ago

Use golang.org/x/time/rate

Instead of hand-rolling our own rate-limiter based on goroutines,
use golang.org/x/time/rate.

File size: 6.0 KB
Line 
1package soju
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "net"
8 "strings"
9 "sync"
10 "time"
11 "unicode"
12
13 "golang.org/x/time/rate"
14 "gopkg.in/irc.v3"
15 "nhooyr.io/websocket"
16)
17
18// ircConn is a generic IRC connection. It's similar to net.Conn but focuses on
19// reading and writing IRC messages.
20type ircConn interface {
21 ReadMessage() (*irc.Message, error)
22 WriteMessage(*irc.Message) error
23 Close() error
24 SetReadDeadline(time.Time) error
25 SetWriteDeadline(time.Time) error
26 RemoteAddr() net.Addr
27 LocalAddr() net.Addr
28}
29
30func newNetIRCConn(c net.Conn) ircConn {
31 type netConn net.Conn
32 return struct {
33 *irc.Conn
34 netConn
35 }{irc.NewConn(c), c}
36}
37
38type websocketIRCConn struct {
39 conn *websocket.Conn
40 readDeadline, writeDeadline time.Time
41 remoteAddr string
42}
43
44func newWebsocketIRCConn(c *websocket.Conn, remoteAddr string) ircConn {
45 return &websocketIRCConn{conn: c, remoteAddr: remoteAddr}
46}
47
48func (wic *websocketIRCConn) ReadMessage() (*irc.Message, error) {
49 ctx := context.Background()
50 if !wic.readDeadline.IsZero() {
51 var cancel context.CancelFunc
52 ctx, cancel = context.WithDeadline(ctx, wic.readDeadline)
53 defer cancel()
54 }
55 _, b, err := wic.conn.Read(ctx)
56 if err != nil {
57 switch websocket.CloseStatus(err) {
58 case websocket.StatusNormalClosure, websocket.StatusGoingAway:
59 return nil, io.EOF
60 default:
61 return nil, err
62 }
63 }
64 return irc.ParseMessage(string(b))
65}
66
67func (wic *websocketIRCConn) WriteMessage(msg *irc.Message) error {
68 b := []byte(strings.ToValidUTF8(msg.String(), string(unicode.ReplacementChar)))
69 ctx := context.Background()
70 if !wic.writeDeadline.IsZero() {
71 var cancel context.CancelFunc
72 ctx, cancel = context.WithDeadline(ctx, wic.writeDeadline)
73 defer cancel()
74 }
75 return wic.conn.Write(ctx, websocket.MessageText, b)
76}
77
78func isErrWebSocketClosed(err error) bool {
79 return err != nil && strings.HasSuffix(err.Error(), "failed to close WebSocket: already wrote close")
80}
81
82func (wic *websocketIRCConn) Close() error {
83 err := wic.conn.Close(websocket.StatusNormalClosure, "")
84 // TODO: remove once this PR is merged:
85 // https://github.com/nhooyr/websocket/pull/303
86 if isErrWebSocketClosed(err) {
87 return nil
88 }
89 return err
90}
91
92func (wic *websocketIRCConn) SetReadDeadline(t time.Time) error {
93 wic.readDeadline = t
94 return nil
95}
96
97func (wic *websocketIRCConn) SetWriteDeadline(t time.Time) error {
98 wic.writeDeadline = t
99 return nil
100}
101
102func (wic *websocketIRCConn) RemoteAddr() net.Addr {
103 return websocketAddr(wic.remoteAddr)
104}
105
106func (wic *websocketIRCConn) LocalAddr() net.Addr {
107 // Behind a reverse HTTP proxy, we don't have access to the real listening
108 // address
109 return websocketAddr("")
110}
111
112type websocketAddr string
113
114func (websocketAddr) Network() string {
115 return "ws"
116}
117
118func (wa websocketAddr) String() string {
119 return string(wa)
120}
121
122type connOptions struct {
123 Logger Logger
124 RateLimitDelay time.Duration
125 RateLimitBurst int
126}
127
128type conn struct {
129 conn ircConn
130 srv *Server
131 logger Logger
132
133 lock sync.Mutex
134 outgoing chan<- *irc.Message
135 closed bool
136 closedCh chan struct{}
137}
138
139func newConn(srv *Server, ic ircConn, options *connOptions) *conn {
140 outgoing := make(chan *irc.Message, 64)
141 c := &conn{
142 conn: ic,
143 srv: srv,
144 outgoing: outgoing,
145 logger: options.Logger,
146 closedCh: make(chan struct{}),
147 }
148
149 go func() {
150 ctx, cancel := c.NewContext(context.Background())
151 defer cancel()
152
153 rl := rate.NewLimiter(rate.Every(options.RateLimitDelay), options.RateLimitBurst)
154 for msg := range outgoing {
155 if err := rl.Wait(ctx); err != nil {
156 break
157 }
158
159 if c.srv.Config().Debug {
160 c.logger.Printf("sent: %v", msg)
161 }
162 c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
163 if err := c.conn.WriteMessage(msg); err != nil {
164 c.logger.Printf("failed to write message: %v", err)
165 break
166 }
167 }
168 if err := c.conn.Close(); err != nil && !isErrClosed(err) {
169 c.logger.Printf("failed to close connection: %v", err)
170 } else {
171 c.logger.Printf("connection closed")
172 }
173 // Drain the outgoing channel to prevent SendMessage from blocking
174 for range outgoing {
175 // This space is intentionally left blank
176 }
177 }()
178
179 c.logger.Printf("new connection")
180 return c
181}
182
183func (c *conn) isClosed() bool {
184 c.lock.Lock()
185 defer c.lock.Unlock()
186 return c.closed
187}
188
189// Close closes the connection. It is safe to call from any goroutine.
190func (c *conn) Close() error {
191 c.lock.Lock()
192 defer c.lock.Unlock()
193
194 if c.closed {
195 return fmt.Errorf("connection already closed")
196 }
197
198 err := c.conn.Close()
199 c.closed = true
200 close(c.outgoing)
201 close(c.closedCh)
202 return err
203}
204
205func (c *conn) ReadMessage() (*irc.Message, error) {
206 msg, err := c.conn.ReadMessage()
207 if isErrClosed(err) {
208 return nil, io.EOF
209 } else if err != nil {
210 return nil, err
211 }
212
213 if c.srv.Config().Debug {
214 c.logger.Printf("received: %v", msg)
215 }
216
217 return msg, nil
218}
219
220// SendMessage queues a new outgoing message. It is safe to call from any
221// goroutine.
222//
223// If the connection is closed before the message is sent, SendMessage silently
224// drops the message.
225func (c *conn) SendMessage(msg *irc.Message) {
226 c.lock.Lock()
227 defer c.lock.Unlock()
228
229 if c.closed {
230 return
231 }
232 c.outgoing <- msg
233}
234
235func (c *conn) RemoteAddr() net.Addr {
236 return c.conn.RemoteAddr()
237}
238
239func (c *conn) LocalAddr() net.Addr {
240 return c.conn.LocalAddr()
241}
242
243// NewContext returns a copy of the parent context with a new Done channel. The
244// returned context's Done channel is closed when the connection is closed,
245// when the returned cancel function is called, or when the parent context's
246// Done channel is closed, whichever happens first.
247//
248// Canceling this context releases resources associated with it, so code should
249// call cancel as soon as the operations running in this Context complete.
250func (c *conn) NewContext(parent context.Context) (context.Context, context.CancelFunc) {
251 ctx, cancel := context.WithCancel(parent)
252
253 go func() {
254 defer cancel()
255
256 select {
257 case <-ctx.Done():
258 // The parent context has been cancelled, or the caller has called
259 // cancel()
260 case <-c.closedCh:
261 // The connection has been closed
262 }
263 }()
264
265 return ctx, cancel
266}
Note: See TracBrowser for help on using the repository browser.