source: code/trunk/conn.go@ 407

Last change on this file since 407 was 402, checked in by contact, 5 years ago

go fmt

File size: 5.3 KB
Line 
1package soju
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "net"
8 "sync"
9 "time"
10
11 "gopkg.in/irc.v3"
12 "nhooyr.io/websocket"
13)
14
15// ircConn is a generic IRC connection. It's similar to net.Conn but focuses on
16// reading and writing IRC messages.
17type ircConn interface {
18 ReadMessage() (*irc.Message, error)
19 WriteMessage(*irc.Message) error
20 Close() error
21 SetReadDeadline(time.Time) error
22 SetWriteDeadline(time.Time) error
23 RemoteAddr() net.Addr
24 LocalAddr() net.Addr
25}
26
27func newNetIRCConn(c net.Conn) ircConn {
28 type netConn net.Conn
29 return struct {
30 *irc.Conn
31 netConn
32 }{irc.NewConn(c), c}
33}
34
35type websocketIRCConn struct {
36 conn *websocket.Conn
37 readDeadline, writeDeadline time.Time
38 remoteAddr string
39}
40
41func newWebsocketIRCConn(c *websocket.Conn, remoteAddr string) ircConn {
42 return websocketIRCConn{conn: c, remoteAddr: remoteAddr}
43}
44
45func (wic websocketIRCConn) ReadMessage() (*irc.Message, error) {
46 ctx := context.Background()
47 if !wic.readDeadline.IsZero() {
48 var cancel context.CancelFunc
49 ctx, cancel = context.WithDeadline(ctx, wic.readDeadline)
50 defer cancel()
51 }
52 _, b, err := wic.conn.Read(ctx)
53 if err != nil {
54 switch websocket.CloseStatus(err) {
55 case websocket.StatusNormalClosure, websocket.StatusGoingAway:
56 return nil, io.EOF
57 default:
58 return nil, err
59 }
60 }
61 return irc.ParseMessage(string(b))
62}
63
64func (wic websocketIRCConn) WriteMessage(msg *irc.Message) error {
65 b := []byte(msg.String())
66 ctx := context.Background()
67 if !wic.writeDeadline.IsZero() {
68 var cancel context.CancelFunc
69 ctx, cancel = context.WithDeadline(ctx, wic.writeDeadline)
70 defer cancel()
71 }
72 return wic.conn.Write(ctx, websocket.MessageText, b)
73}
74
75func (wic websocketIRCConn) Close() error {
76 return wic.conn.Close(websocket.StatusNormalClosure, "")
77}
78
79func (wic websocketIRCConn) SetReadDeadline(t time.Time) error {
80 wic.readDeadline = t
81 return nil
82}
83
84func (wic websocketIRCConn) SetWriteDeadline(t time.Time) error {
85 wic.writeDeadline = t
86 return nil
87}
88
89func (wic websocketIRCConn) RemoteAddr() net.Addr {
90 return websocketAddr(wic.remoteAddr)
91}
92
93func (wic websocketIRCConn) LocalAddr() net.Addr {
94 // Behind a reverse HTTP proxy, we don't have access to the real listening
95 // address
96 return websocketAddr("")
97}
98
99type websocketAddr string
100
101func (websocketAddr) Network() string {
102 return "ws"
103}
104
105func (wa websocketAddr) String() string {
106 return string(wa)
107}
108
109type rateLimiter struct {
110 C <-chan struct{}
111 ticker *time.Ticker
112 stopped chan struct{}
113}
114
115func newRateLimiter(delay time.Duration, burst int) *rateLimiter {
116 ch := make(chan struct{}, burst)
117 for i := 0; i < burst; i++ {
118 ch <- struct{}{}
119 }
120 ticker := time.NewTicker(delay)
121 stopped := make(chan struct{})
122 go func() {
123 for {
124 select {
125 case <-ticker.C:
126 select {
127 case ch <- struct{}{}:
128 // This space is intentionally left blank
129 case <-stopped:
130 return
131 }
132 case <-stopped:
133 return
134 }
135 }
136 }()
137 return &rateLimiter{
138 C: ch,
139 ticker: ticker,
140 stopped: stopped,
141 }
142}
143
144func (rl *rateLimiter) Stop() {
145 rl.ticker.Stop()
146 close(rl.stopped)
147}
148
149type connOptions struct {
150 Logger Logger
151 RateLimitDelay time.Duration
152 RateLimitBurst int
153}
154
155type conn struct {
156 conn ircConn
157 srv *Server
158 logger Logger
159
160 lock sync.Mutex
161 outgoing chan<- *irc.Message
162 closed bool
163}
164
165func newConn(srv *Server, ic ircConn, options *connOptions) *conn {
166 outgoing := make(chan *irc.Message, 64)
167 c := &conn{
168 conn: ic,
169 srv: srv,
170 outgoing: outgoing,
171 logger: options.Logger,
172 }
173
174 go func() {
175 var rl *rateLimiter
176 if options.RateLimitDelay > 0 && options.RateLimitBurst > 0 {
177 rl = newRateLimiter(options.RateLimitDelay, options.RateLimitBurst)
178 defer rl.Stop()
179 }
180
181 for msg := range outgoing {
182 if rl != nil {
183 <-rl.C
184 }
185
186 if c.srv.Debug {
187 c.logger.Printf("sent: %v", msg)
188 }
189 c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
190 if err := c.conn.WriteMessage(msg); err != nil {
191 c.logger.Printf("failed to write message: %v", err)
192 break
193 }
194 }
195 if err := c.conn.Close(); err != nil {
196 c.logger.Printf("failed to close connection: %v", err)
197 } else {
198 c.logger.Printf("connection closed")
199 }
200 // Drain the outgoing channel to prevent SendMessage from blocking
201 for range outgoing {
202 // This space is intentionally left blank
203 }
204 }()
205
206 c.logger.Printf("new connection")
207 return c
208}
209
210func (c *conn) isClosed() bool {
211 c.lock.Lock()
212 defer c.lock.Unlock()
213 return c.closed
214}
215
216// Close closes the connection. It is safe to call from any goroutine.
217func (c *conn) Close() error {
218 c.lock.Lock()
219 defer c.lock.Unlock()
220
221 if c.closed {
222 return fmt.Errorf("connection already closed")
223 }
224
225 err := c.conn.Close()
226 c.closed = true
227 close(c.outgoing)
228 return err
229}
230
231func (c *conn) ReadMessage() (*irc.Message, error) {
232 msg, err := c.conn.ReadMessage()
233 if err != nil {
234 return nil, err
235 }
236
237 if c.srv.Debug {
238 c.logger.Printf("received: %v", msg)
239 }
240
241 return msg, nil
242}
243
244// SendMessage queues a new outgoing message. It is safe to call from any
245// goroutine.
246//
247// If the connection is closed before the message is sent, SendMessage silently
248// drops the message.
249func (c *conn) SendMessage(msg *irc.Message) {
250 c.lock.Lock()
251 defer c.lock.Unlock()
252
253 if c.closed {
254 return
255 }
256 c.outgoing <- msg
257}
258
259func (c *conn) RemoteAddr() net.Addr {
260 return c.conn.RemoteAddr()
261}
262
263func (c *conn) LocalAddr() net.Addr {
264 return c.conn.LocalAddr()
265}
Note: See TracBrowser for help on using the repository browser.