source: code/trunk/conn.go@ 814

Last change on this file since 814 was 804, checked in by koizumi.aoi, 2 years ago

Drunk as I like

Signed-off-by: Aoi K <koizumi.aoi@…>

File size: 6.1 KB
Line 
1package suika
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "net"
8 "strings"
9 "sync"
10 "time"
11 "unicode"
12
13 "golang.org/x/time/rate"
14 "gopkg.in/irc.v3"
15 "nhooyr.io/websocket"
16)
17
18// ircConn is a generic IRC connection. It's similar to net.Conn but focuses on
19// reading and writing IRC messages.
20type ircConn interface {
21 ReadMessage() (*irc.Message, error)
22 WriteMessage(*irc.Message) error
23 Close() error
24 SetReadDeadline(time.Time) error
25 SetWriteDeadline(time.Time) error
26 RemoteAddr() net.Addr
27 LocalAddr() net.Addr
28}
29
30func newNetIRCConn(c net.Conn) ircConn {
31 type netConn net.Conn
32 return struct {
33 *irc.Conn
34 netConn
35 }{irc.NewConn(c), c}
36}
37
38type websocketIRCConn struct {
39 conn *websocket.Conn
40 readDeadline, writeDeadline time.Time
41 remoteAddr string
42}
43
44func newWebsocketIRCConn(c *websocket.Conn, remoteAddr string) ircConn {
45 return &websocketIRCConn{conn: c, remoteAddr: remoteAddr}
46}
47
48func (wic *websocketIRCConn) ReadMessage() (*irc.Message, error) {
49 ctx := context.Background()
50 if !wic.readDeadline.IsZero() {
51 var cancel context.CancelFunc
52 ctx, cancel = context.WithDeadline(ctx, wic.readDeadline)
53 defer cancel()
54 }
55 _, b, err := wic.conn.Read(ctx)
56 if err != nil {
57 switch websocket.CloseStatus(err) {
58 case websocket.StatusNormalClosure, websocket.StatusGoingAway:
59 return nil, io.EOF
60 default:
61 return nil, err
62 }
63 }
64 return irc.ParseMessage(string(b))
65}
66
67func (wic *websocketIRCConn) WriteMessage(msg *irc.Message) error {
68 b := []byte(strings.ToValidUTF8(msg.String(), string(unicode.ReplacementChar)))
69 ctx := context.Background()
70 if !wic.writeDeadline.IsZero() {
71 var cancel context.CancelFunc
72 ctx, cancel = context.WithDeadline(ctx, wic.writeDeadline)
73 defer cancel()
74 }
75 return wic.conn.Write(ctx, websocket.MessageText, b)
76}
77
78func isErrWebSocketClosed(err error) bool {
79 return err != nil && strings.HasSuffix(err.Error(), "failed to close WebSocket: already wrote close")
80}
81
82func (wic *websocketIRCConn) Close() error {
83 err := wic.conn.Close(websocket.StatusNormalClosure, "")
84 // TODO: remove once this PR is merged:
85 // https://github.com/nhooyr/websocket/pull/303
86 if isErrWebSocketClosed(err) {
87 return nil
88 }
89 return err
90}
91
92func (wic *websocketIRCConn) SetReadDeadline(t time.Time) error {
93 wic.readDeadline = t
94 return nil
95}
96
97func (wic *websocketIRCConn) SetWriteDeadline(t time.Time) error {
98 wic.writeDeadline = t
99 return nil
100}
101
102func (wic *websocketIRCConn) RemoteAddr() net.Addr {
103 return websocketAddr(wic.remoteAddr)
104}
105
106func (wic *websocketIRCConn) LocalAddr() net.Addr {
107 // Behind a reverse HTTP proxy, we don't have access to the real listening
108 // address
109 return websocketAddr("")
110}
111
112type websocketAddr string
113
114func (websocketAddr) Network() string {
115 return "ws"
116}
117
118func (wa websocketAddr) String() string {
119 return string(wa)
120}
121
122type connOptions struct {
123 Logger Logger
124 RateLimitDelay time.Duration
125 RateLimitBurst int
126}
127
128type conn struct {
129 conn ircConn
130 srv *Server
131 logger Logger
132
133 lock sync.Mutex
134 outgoing chan<- *irc.Message
135 closed bool
136 closedCh chan struct{}
137}
138
139func newConn(srv *Server, ic ircConn, options *connOptions) *conn {
140 outgoing := make(chan *irc.Message, 64)
141 c := &conn{
142 conn: ic,
143 srv: srv,
144 outgoing: outgoing,
145 logger: options.Logger,
146 closedCh: make(chan struct{}),
147 }
148
149 go func() {
150 ctx, cancel := c.NewContext(context.Background())
151 defer cancel()
152
153 rl := rate.NewLimiter(rate.Every(options.RateLimitDelay), options.RateLimitBurst)
154 for msg := range outgoing {
155 if err := rl.Wait(ctx); err != nil {
156 break
157 }
158
159 c.logger.Debugf("sent: %v", msg)
160 c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
161 if err := c.conn.WriteMessage(msg); err != nil {
162 c.logger.Printf("failed to write message: %v", err)
163 break
164 }
165 }
166 if err := c.conn.Close(); err != nil && !isErrClosed(err) {
167 c.logger.Printf("failed to close connection: %v", err)
168 } else {
169 c.logger.Debugf("connection closed")
170 }
171 // Drain the outgoing channel to prevent SendMessage from blocking
172 for range outgoing {
173 // This space is intentionally left blank
174 }
175 }()
176
177 c.logger.Debugf("new connection")
178 return c
179}
180
181func (c *conn) isClosed() bool {
182 c.lock.Lock()
183 defer c.lock.Unlock()
184 return c.closed
185}
186
187// Close closes the connection. It is safe to call from any goroutine.
188func (c *conn) Close() error {
189 c.lock.Lock()
190 defer c.lock.Unlock()
191
192 if c.closed {
193 return fmt.Errorf("connection already closed")
194 }
195
196 err := c.conn.Close()
197 c.closed = true
198 close(c.outgoing)
199 close(c.closedCh)
200 return err
201}
202
203func (c *conn) ReadMessage() (*irc.Message, error) {
204 msg, err := c.conn.ReadMessage()
205 if isErrClosed(err) {
206 return nil, io.EOF
207 } else if err != nil {
208 return nil, err
209 }
210
211 c.logger.Debugf("received: %v", msg)
212 return msg, nil
213}
214
215// SendMessage queues a new outgoing message. It is safe to call from any
216// goroutine.
217//
218// If the connection is closed before the message is sent, SendMessage silently
219// drops the message.
220func (c *conn) SendMessage(ctx context.Context, msg *irc.Message) {
221 c.lock.Lock()
222 defer c.lock.Unlock()
223
224 if c.closed {
225 return
226 }
227
228 select {
229 case c.outgoing <- msg:
230 // Success
231 case <-ctx.Done():
232 c.logger.Printf("failed to send message: %v", ctx.Err())
233 }
234}
235
236func (c *conn) RemoteAddr() net.Addr {
237 return c.conn.RemoteAddr()
238}
239
240func (c *conn) LocalAddr() net.Addr {
241 return c.conn.LocalAddr()
242}
243
244// NewContext returns a copy of the parent context with a new Done channel. The
245// returned context's Done channel is closed when the connection is closed,
246// when the returned cancel function is called, or when the parent context's
247// Done channel is closed, whichever happens first.
248//
249// Canceling this context releases resources associated with it, so code should
250// call cancel as soon as the operations running in this Context complete.
251func (c *conn) NewContext(parent context.Context) (context.Context, context.CancelFunc) {
252 ctx, cancel := context.WithCancel(parent)
253
254 go func() {
255 defer cancel()
256
257 select {
258 case <-ctx.Done():
259 // The parent context has been cancelled, or the caller has called
260 // cancel()
261 case <-c.closedCh:
262 // The connection has been closed
263 }
264 }()
265
266 return ctx, cancel
267}
Note: See TracBrowser for help on using the repository browser.