source: code/trunk/conn.go@ 389

Last change on this file since 389 was 384, checked in by contact, 5 years ago

Add conn.{Local,Remote}Addr

File size: 4.3 KB
RevLine 
[210]1package soju
2
3import (
[323]4 "context"
[210]5 "fmt"
[341]6 "io"
[210]7 "net"
[280]8 "sync"
[210]9 "time"
10
11 "gopkg.in/irc.v3"
[323]12 "nhooyr.io/websocket"
[210]13)
14
[315]15// ircConn is a generic IRC connection. It's similar to net.Conn but focuses on
16// reading and writing IRC messages.
17type ircConn interface {
18 ReadMessage() (*irc.Message, error)
19 WriteMessage(*irc.Message) error
20 Close() error
[323]21 SetReadDeadline(time.Time) error
[315]22 SetWriteDeadline(time.Time) error
[347]23 RemoteAddr() net.Addr
[383]24 LocalAddr() net.Addr
[315]25}
26
[323]27func newNetIRCConn(c net.Conn) ircConn {
[315]28 type netConn net.Conn
29 return struct {
30 *irc.Conn
31 netConn
32 }{irc.NewConn(c), c}
33}
34
[323]35type websocketIRCConn struct {
36 conn *websocket.Conn
37 readDeadline, writeDeadline time.Time
[347]38 remoteAddr string
[323]39}
40
[347]41func newWebsocketIRCConn(c *websocket.Conn, remoteAddr string) ircConn {
42 return websocketIRCConn{conn: c, remoteAddr: remoteAddr}
[323]43}
44
45func (wic websocketIRCConn) ReadMessage() (*irc.Message, error) {
46 ctx := context.Background()
47 if !wic.readDeadline.IsZero() {
48 var cancel context.CancelFunc
49 ctx, cancel = context.WithDeadline(ctx, wic.readDeadline)
50 defer cancel()
51 }
52 _, b, err := wic.conn.Read(ctx)
53 if err != nil {
[341]54 switch websocket.CloseStatus(err) {
55 case websocket.StatusNormalClosure, websocket.StatusGoingAway:
56 return nil, io.EOF
57 default:
58 return nil, err
59 }
[323]60 }
61 return irc.ParseMessage(string(b))
62}
63
64func (wic websocketIRCConn) WriteMessage(msg *irc.Message) error {
65 b := []byte(msg.String())
66 ctx := context.Background()
67 if !wic.writeDeadline.IsZero() {
68 var cancel context.CancelFunc
69 ctx, cancel = context.WithDeadline(ctx, wic.writeDeadline)
70 defer cancel()
71 }
72 return wic.conn.Write(ctx, websocket.MessageText, b)
73}
74
75func (wic websocketIRCConn) Close() error {
76 return wic.conn.Close(websocket.StatusNormalClosure, "")
77}
78
79func (wic websocketIRCConn) SetReadDeadline(t time.Time) error {
80 wic.readDeadline = t
81 return nil
82}
83
84func (wic websocketIRCConn) SetWriteDeadline(t time.Time) error {
85 wic.writeDeadline = t
86 return nil
87}
88
[347]89func (wic websocketIRCConn) RemoteAddr() net.Addr {
90 return websocketAddr(wic.remoteAddr)
91}
92
[383]93func (wic websocketIRCConn) LocalAddr() net.Addr {
94 // Behind a reverse HTTP proxy, we don't have access to the real listening
95 // address
96 return websocketAddr("")
97}
98
[347]99type websocketAddr string
100
101func (websocketAddr) Network() string {
102 return "ws"
103}
104
105func (wa websocketAddr) String() string {
106 return string(wa)
107}
108
[210]109type conn struct {
[315]110 conn ircConn
[280]111 srv *Server
112 logger Logger
113
114 lock sync.Mutex
[210]115 outgoing chan<- *irc.Message
[280]116 closed bool
[210]117}
118
[315]119func newConn(srv *Server, ic ircConn, logger Logger) *conn {
[210]120 outgoing := make(chan *irc.Message, 64)
121 c := &conn{
[315]122 conn: ic,
[210]123 srv: srv,
124 outgoing: outgoing,
125 logger: logger,
126 }
127
128 go func() {
129 for msg := range outgoing {
130 if c.srv.Debug {
131 c.logger.Printf("sent: %v", msg)
132 }
[315]133 c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
134 if err := c.conn.WriteMessage(msg); err != nil {
[210]135 c.logger.Printf("failed to write message: %v", err)
136 break
137 }
138 }
[315]139 if err := c.conn.Close(); err != nil {
[210]140 c.logger.Printf("failed to close connection: %v", err)
141 } else {
142 c.logger.Printf("connection closed")
143 }
144 // Drain the outgoing channel to prevent SendMessage from blocking
145 for range outgoing {
146 // This space is intentionally left blank
147 }
148 }()
149
150 c.logger.Printf("new connection")
151 return c
152}
153
154func (c *conn) isClosed() bool {
[280]155 c.lock.Lock()
156 defer c.lock.Unlock()
157 return c.closed
[210]158}
159
160// Close closes the connection. It is safe to call from any goroutine.
161func (c *conn) Close() error {
[280]162 c.lock.Lock()
163 defer c.lock.Unlock()
164
165 if c.closed {
[210]166 return fmt.Errorf("connection already closed")
167 }
[280]168
[315]169 err := c.conn.Close()
[280]170 c.closed = true
[210]171 close(c.outgoing)
[312]172 return err
[210]173}
174
175func (c *conn) ReadMessage() (*irc.Message, error) {
[315]176 msg, err := c.conn.ReadMessage()
[210]177 if err != nil {
178 return nil, err
179 }
180
181 if c.srv.Debug {
182 c.logger.Printf("received: %v", msg)
183 }
184
185 return msg, nil
186}
187
188// SendMessage queues a new outgoing message. It is safe to call from any
189// goroutine.
[280]190//
191// If the connection is closed before the message is sent, SendMessage silently
192// drops the message.
[210]193func (c *conn) SendMessage(msg *irc.Message) {
[280]194 c.lock.Lock()
195 defer c.lock.Unlock()
196
197 if c.closed {
[210]198 return
199 }
200 c.outgoing <- msg
201}
[384]202
203func (c *conn) RemoteAddr() net.Addr {
204 return c.conn.RemoteAddr()
205}
206
207func (c *conn) LocalAddr() net.Addr {
208 return c.conn.LocalAddr()
209}
Note: See TracBrowser for help on using the repository browser.