source: code/trunk/conn.go@ 355

Last change on this file since 355 was 347, checked in by contact, 5 years ago

Add RemoteAddr to ircConn interface

File size: 4.0 KB
RevLine 
[210]1package soju
2
3import (
[323]4 "context"
[210]5 "fmt"
[341]6 "io"
[210]7 "net"
[280]8 "sync"
[210]9 "time"
10
11 "gopkg.in/irc.v3"
[323]12 "nhooyr.io/websocket"
[210]13)
14
[315]15// ircConn is a generic IRC connection. It's similar to net.Conn but focuses on
16// reading and writing IRC messages.
17type ircConn interface {
18 ReadMessage() (*irc.Message, error)
19 WriteMessage(*irc.Message) error
20 Close() error
[323]21 SetReadDeadline(time.Time) error
[315]22 SetWriteDeadline(time.Time) error
[347]23 RemoteAddr() net.Addr
[315]24}
25
[323]26func newNetIRCConn(c net.Conn) ircConn {
[315]27 type netConn net.Conn
28 return struct {
29 *irc.Conn
30 netConn
31 }{irc.NewConn(c), c}
32}
33
[323]34type websocketIRCConn struct {
35 conn *websocket.Conn
36 readDeadline, writeDeadline time.Time
[347]37 remoteAddr string
[323]38}
39
[347]40func newWebsocketIRCConn(c *websocket.Conn, remoteAddr string) ircConn {
41 return websocketIRCConn{conn: c, remoteAddr: remoteAddr}
[323]42}
43
44func (wic websocketIRCConn) ReadMessage() (*irc.Message, error) {
45 ctx := context.Background()
46 if !wic.readDeadline.IsZero() {
47 var cancel context.CancelFunc
48 ctx, cancel = context.WithDeadline(ctx, wic.readDeadline)
49 defer cancel()
50 }
51 _, b, err := wic.conn.Read(ctx)
52 if err != nil {
[341]53 switch websocket.CloseStatus(err) {
54 case websocket.StatusNormalClosure, websocket.StatusGoingAway:
55 return nil, io.EOF
56 default:
57 return nil, err
58 }
[323]59 }
60 return irc.ParseMessage(string(b))
61}
62
63func (wic websocketIRCConn) WriteMessage(msg *irc.Message) error {
64 b := []byte(msg.String())
65 ctx := context.Background()
66 if !wic.writeDeadline.IsZero() {
67 var cancel context.CancelFunc
68 ctx, cancel = context.WithDeadline(ctx, wic.writeDeadline)
69 defer cancel()
70 }
71 return wic.conn.Write(ctx, websocket.MessageText, b)
72}
73
74func (wic websocketIRCConn) Close() error {
75 return wic.conn.Close(websocket.StatusNormalClosure, "")
76}
77
78func (wic websocketIRCConn) SetReadDeadline(t time.Time) error {
79 wic.readDeadline = t
80 return nil
81}
82
83func (wic websocketIRCConn) SetWriteDeadline(t time.Time) error {
84 wic.writeDeadline = t
85 return nil
86}
87
[347]88func (wic websocketIRCConn) RemoteAddr() net.Addr {
89 return websocketAddr(wic.remoteAddr)
90}
91
92type websocketAddr string
93
94func (websocketAddr) Network() string {
95 return "ws"
96}
97
98func (wa websocketAddr) String() string {
99 return string(wa)
100}
101
[210]102type conn struct {
[315]103 conn ircConn
[280]104 srv *Server
105 logger Logger
106
107 lock sync.Mutex
[210]108 outgoing chan<- *irc.Message
[280]109 closed bool
[210]110}
111
[315]112func newConn(srv *Server, ic ircConn, logger Logger) *conn {
[210]113 outgoing := make(chan *irc.Message, 64)
114 c := &conn{
[315]115 conn: ic,
[210]116 srv: srv,
117 outgoing: outgoing,
118 logger: logger,
119 }
120
121 go func() {
122 for msg := range outgoing {
123 if c.srv.Debug {
124 c.logger.Printf("sent: %v", msg)
125 }
[315]126 c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
127 if err := c.conn.WriteMessage(msg); err != nil {
[210]128 c.logger.Printf("failed to write message: %v", err)
129 break
130 }
131 }
[315]132 if err := c.conn.Close(); err != nil {
[210]133 c.logger.Printf("failed to close connection: %v", err)
134 } else {
135 c.logger.Printf("connection closed")
136 }
137 // Drain the outgoing channel to prevent SendMessage from blocking
138 for range outgoing {
139 // This space is intentionally left blank
140 }
141 }()
142
143 c.logger.Printf("new connection")
144 return c
145}
146
147func (c *conn) isClosed() bool {
[280]148 c.lock.Lock()
149 defer c.lock.Unlock()
150 return c.closed
[210]151}
152
153// Close closes the connection. It is safe to call from any goroutine.
154func (c *conn) Close() error {
[280]155 c.lock.Lock()
156 defer c.lock.Unlock()
157
158 if c.closed {
[210]159 return fmt.Errorf("connection already closed")
160 }
[280]161
[315]162 err := c.conn.Close()
[280]163 c.closed = true
[210]164 close(c.outgoing)
[312]165 return err
[210]166}
167
168func (c *conn) ReadMessage() (*irc.Message, error) {
[315]169 msg, err := c.conn.ReadMessage()
[210]170 if err != nil {
171 return nil, err
172 }
173
174 if c.srv.Debug {
175 c.logger.Printf("received: %v", msg)
176 }
177
178 return msg, nil
179}
180
181// SendMessage queues a new outgoing message. It is safe to call from any
182// goroutine.
[280]183//
184// If the connection is closed before the message is sent, SendMessage silently
185// drops the message.
[210]186func (c *conn) SendMessage(msg *irc.Message) {
[280]187 c.lock.Lock()
188 defer c.lock.Unlock()
189
190 if c.closed {
[210]191 return
192 }
193 c.outgoing <- msg
194}
Note: See TracBrowser for help on using the repository browser.