source: code/trunk/conn.go@ 346

Last change on this file since 346 was 341, checked in by contact, 5 years ago

Return io.EOF on websocket connection closure

File size: 3.7 KB
Line 
1package soju
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "net"
8 "sync"
9 "time"
10
11 "gopkg.in/irc.v3"
12 "nhooyr.io/websocket"
13)
14
15// ircConn is a generic IRC connection. It's similar to net.Conn but focuses on
16// reading and writing IRC messages.
17type ircConn interface {
18 ReadMessage() (*irc.Message, error)
19 WriteMessage(*irc.Message) error
20 Close() error
21 SetReadDeadline(time.Time) error
22 SetWriteDeadline(time.Time) error
23}
24
25func newNetIRCConn(c net.Conn) ircConn {
26 type netConn net.Conn
27 return struct {
28 *irc.Conn
29 netConn
30 }{irc.NewConn(c), c}
31}
32
33type websocketIRCConn struct {
34 conn *websocket.Conn
35 readDeadline, writeDeadline time.Time
36}
37
38func newWebsocketIRCConn(c *websocket.Conn) ircConn {
39 return websocketIRCConn{conn: c}
40}
41
42func (wic websocketIRCConn) ReadMessage() (*irc.Message, error) {
43 ctx := context.Background()
44 if !wic.readDeadline.IsZero() {
45 var cancel context.CancelFunc
46 ctx, cancel = context.WithDeadline(ctx, wic.readDeadline)
47 defer cancel()
48 }
49 _, b, err := wic.conn.Read(ctx)
50 if err != nil {
51 switch websocket.CloseStatus(err) {
52 case websocket.StatusNormalClosure, websocket.StatusGoingAway:
53 return nil, io.EOF
54 default:
55 return nil, err
56 }
57 }
58 return irc.ParseMessage(string(b))
59}
60
61func (wic websocketIRCConn) WriteMessage(msg *irc.Message) error {
62 b := []byte(msg.String())
63 ctx := context.Background()
64 if !wic.writeDeadline.IsZero() {
65 var cancel context.CancelFunc
66 ctx, cancel = context.WithDeadline(ctx, wic.writeDeadline)
67 defer cancel()
68 }
69 return wic.conn.Write(ctx, websocket.MessageText, b)
70}
71
72func (wic websocketIRCConn) Close() error {
73 return wic.conn.Close(websocket.StatusNormalClosure, "")
74}
75
76func (wic websocketIRCConn) SetReadDeadline(t time.Time) error {
77 wic.readDeadline = t
78 return nil
79}
80
81func (wic websocketIRCConn) SetWriteDeadline(t time.Time) error {
82 wic.writeDeadline = t
83 return nil
84}
85
86type conn struct {
87 conn ircConn
88 srv *Server
89 logger Logger
90
91 lock sync.Mutex
92 outgoing chan<- *irc.Message
93 closed bool
94}
95
96func newConn(srv *Server, ic ircConn, logger Logger) *conn {
97 outgoing := make(chan *irc.Message, 64)
98 c := &conn{
99 conn: ic,
100 srv: srv,
101 outgoing: outgoing,
102 logger: logger,
103 }
104
105 go func() {
106 for msg := range outgoing {
107 if c.srv.Debug {
108 c.logger.Printf("sent: %v", msg)
109 }
110 c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
111 if err := c.conn.WriteMessage(msg); err != nil {
112 c.logger.Printf("failed to write message: %v", err)
113 break
114 }
115 }
116 if err := c.conn.Close(); err != nil {
117 c.logger.Printf("failed to close connection: %v", err)
118 } else {
119 c.logger.Printf("connection closed")
120 }
121 // Drain the outgoing channel to prevent SendMessage from blocking
122 for range outgoing {
123 // This space is intentionally left blank
124 }
125 }()
126
127 c.logger.Printf("new connection")
128 return c
129}
130
131func (c *conn) isClosed() bool {
132 c.lock.Lock()
133 defer c.lock.Unlock()
134 return c.closed
135}
136
137// Close closes the connection. It is safe to call from any goroutine.
138func (c *conn) Close() error {
139 c.lock.Lock()
140 defer c.lock.Unlock()
141
142 if c.closed {
143 return fmt.Errorf("connection already closed")
144 }
145
146 err := c.conn.Close()
147 c.closed = true
148 close(c.outgoing)
149 return err
150}
151
152func (c *conn) ReadMessage() (*irc.Message, error) {
153 msg, err := c.conn.ReadMessage()
154 if err != nil {
155 return nil, err
156 }
157
158 if c.srv.Debug {
159 c.logger.Printf("received: %v", msg)
160 }
161
162 return msg, nil
163}
164
165// SendMessage queues a new outgoing message. It is safe to call from any
166// goroutine.
167//
168// If the connection is closed before the message is sent, SendMessage silently
169// drops the message.
170func (c *conn) SendMessage(msg *irc.Message) {
171 c.lock.Lock()
172 defer c.lock.Unlock()
173
174 if c.closed {
175 return
176 }
177 c.outgoing <- msg
178}
Note: See TracBrowser for help on using the repository browser.