source: code/trunk/upstream.go@ 45

Last change on this file since 45 was 45, checked in by contact, 5 years ago

Close connection from writer goroutine

Connections were being closed from the reader goroutine, causing issues
when sending messages and immediately closing the connection.

File size: 7.7 KB
Line 
1package jounce
2
3import (
4 "crypto/tls"
5 "fmt"
6 "io"
7 "net"
8 "strconv"
9 "strings"
10 "time"
11
12 "gopkg.in/irc.v3"
13)
14
15type upstreamChannel struct {
16 Name string
17 Topic string
18 TopicWho string
19 TopicTime time.Time
20 Status channelStatus
21 modes modeSet
22 Members map[string]membership
23 complete bool
24}
25
26type upstreamConn struct {
27 upstream *Upstream
28 logger Logger
29 net net.Conn
30 irc *irc.Conn
31 srv *Server
32 user *user
33 messages chan<- *irc.Message
34
35 serverName string
36 availableUserModes string
37 availableChannelModes string
38 channelModesWithParam string
39
40 registered bool
41 nick string
42 closed bool
43 modes modeSet
44 channels map[string]*upstreamChannel
45}
46
47func connectToUpstream(u *user, upstream *Upstream) (*upstreamConn, error) {
48 logger := &prefixLogger{u.srv.Logger, fmt.Sprintf("upstream %q: ", upstream.Addr)}
49 logger.Printf("connecting to server")
50
51 netConn, err := tls.Dial("tcp", upstream.Addr, nil)
52 if err != nil {
53 return nil, fmt.Errorf("failed to dial %q: %v", upstream.Addr, err)
54 }
55
56 msgs := make(chan *irc.Message, 64)
57 conn := &upstreamConn{
58 upstream: upstream,
59 logger: logger,
60 net: netConn,
61 irc: irc.NewConn(netConn),
62 srv: u.srv,
63 user: u,
64 messages: msgs,
65 channels: make(map[string]*upstreamChannel),
66 }
67
68 go func() {
69 for msg := range msgs {
70 if err := conn.irc.WriteMessage(msg); err != nil {
71 conn.logger.Printf("failed to write message: %v", err)
72 }
73 }
74 if err := conn.net.Close(); err != nil {
75 conn.logger.Printf("failed to close connection: %v", err)
76 } else {
77 conn.logger.Printf("connection closed")
78 }
79 }()
80
81 return conn, nil
82}
83
84func (c *upstreamConn) Close() error {
85 if c.closed {
86 return fmt.Errorf("upstream connection already closed")
87 }
88 close(c.messages)
89 c.closed = true
90 return nil
91}
92
93func (c *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
94 ch, ok := c.channels[name]
95 if !ok {
96 return nil, fmt.Errorf("unknown channel %q", name)
97 }
98 return ch, nil
99}
100
101func (c *upstreamConn) handleMessage(msg *irc.Message) error {
102 switch msg.Command {
103 case "PING":
104 // TODO: handle params
105 c.messages <- &irc.Message{
106 Command: "PONG",
107 Params: []string{c.srv.Hostname},
108 }
109 return nil
110 case "MODE":
111 var name, modeStr string
112 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
113 return err
114 }
115
116 if name == msg.Prefix.Name { // user mode change
117 if name != c.nick {
118 return fmt.Errorf("received MODE message for unknow nick %q", name)
119 }
120 return c.modes.Apply(modeStr)
121 } else { // channel mode change
122 ch, err := c.getChannel(name)
123 if err != nil {
124 return err
125 }
126 if err := ch.modes.Apply(modeStr); err != nil {
127 return err
128 }
129
130 c.user.forEachDownstream(func(dc *downstreamConn) {
131 dc.messages <- msg
132 })
133 }
134 case "NOTICE":
135 c.logger.Print(msg)
136 case irc.RPL_WELCOME:
137 c.registered = true
138 c.logger.Printf("connection registered")
139
140 for _, ch := range c.upstream.Channels {
141 c.messages <- &irc.Message{
142 Command: "JOIN",
143 Params: []string{ch},
144 }
145 }
146 case irc.RPL_MYINFO:
147 if err := parseMessageParams(msg, nil, &c.serverName, nil, &c.availableUserModes, &c.availableChannelModes); err != nil {
148 return err
149 }
150 if len(msg.Params) > 5 {
151 c.channelModesWithParam = msg.Params[5]
152 }
153 case "NICK":
154 var newNick string
155 if err := parseMessageParams(msg, &newNick); err != nil {
156 return err
157 }
158
159 if msg.Prefix.Name == c.nick {
160 c.logger.Printf("changed nick from %q to %q", c.nick, newNick)
161 c.nick = newNick
162 }
163
164 for _, ch := range c.channels {
165 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
166 delete(ch.Members, msg.Prefix.Name)
167 ch.Members[newNick] = membership
168 }
169 }
170
171 c.user.forEachDownstream(func(dc *downstreamConn) {
172 dc.messages <- msg
173 })
174 case "JOIN":
175 var channels string
176 if err := parseMessageParams(msg, &channels); err != nil {
177 return err
178 }
179
180 for _, ch := range strings.Split(channels, ",") {
181 if msg.Prefix.Name == c.nick {
182 c.logger.Printf("joined channel %q", ch)
183 c.channels[ch] = &upstreamChannel{
184 Name: ch,
185 Members: make(map[string]membership),
186 }
187 } else {
188 ch, err := c.getChannel(ch)
189 if err != nil {
190 return err
191 }
192 ch.Members[msg.Prefix.Name] = 0
193 }
194 }
195
196 c.user.forEachDownstream(func(dc *downstreamConn) {
197 dc.messages <- msg
198 })
199 case "PART":
200 var channels string
201 if err := parseMessageParams(msg, &channels); err != nil {
202 return err
203 }
204
205 for _, ch := range strings.Split(channels, ",") {
206 if msg.Prefix.Name == c.nick {
207 c.logger.Printf("parted channel %q", ch)
208 delete(c.channels, ch)
209 } else {
210 ch, err := c.getChannel(ch)
211 if err != nil {
212 return err
213 }
214 delete(ch.Members, msg.Prefix.Name)
215 }
216 }
217
218 c.user.forEachDownstream(func(dc *downstreamConn) {
219 dc.messages <- msg
220 })
221 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
222 var name, topic string
223 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
224 return err
225 }
226 ch, err := c.getChannel(name)
227 if err != nil {
228 return err
229 }
230 if msg.Command == irc.RPL_TOPIC {
231 ch.Topic = topic
232 } else {
233 ch.Topic = ""
234 }
235 case "TOPIC":
236 var name string
237 if err := parseMessageParams(msg, nil, &name); err != nil {
238 return err
239 }
240 ch, err := c.getChannel(name)
241 if err != nil {
242 return err
243 }
244 if len(msg.Params) > 1 {
245 ch.Topic = msg.Params[1]
246 } else {
247 ch.Topic = ""
248 }
249 case rpl_topicwhotime:
250 var name, who, timeStr string
251 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
252 return err
253 }
254 ch, err := c.getChannel(name)
255 if err != nil {
256 return err
257 }
258 ch.TopicWho = who
259 sec, err := strconv.ParseInt(timeStr, 10, 64)
260 if err != nil {
261 return fmt.Errorf("failed to parse topic time: %v", err)
262 }
263 ch.TopicTime = time.Unix(sec, 0)
264 case irc.RPL_NAMREPLY:
265 var name, statusStr, members string
266 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
267 return err
268 }
269 ch, err := c.getChannel(name)
270 if err != nil {
271 return err
272 }
273
274 status, err := parseChannelStatus(statusStr)
275 if err != nil {
276 return err
277 }
278 ch.Status = status
279
280 for _, s := range strings.Split(members, " ") {
281 membership, nick := parseMembershipPrefix(s)
282 ch.Members[nick] = membership
283 }
284 case irc.RPL_ENDOFNAMES:
285 var name string
286 if err := parseMessageParams(msg, nil, &name); err != nil {
287 return err
288 }
289 ch, err := c.getChannel(name)
290 if err != nil {
291 return err
292 }
293
294 if ch.complete {
295 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
296 }
297 ch.complete = true
298
299 c.user.forEachDownstream(func(dc *downstreamConn) {
300 forwardChannel(dc, ch)
301 })
302 case "PRIVMSG":
303 c.user.forEachDownstream(func(dc *downstreamConn) {
304 dc.messages <- msg
305 })
306 case irc.RPL_YOURHOST, irc.RPL_CREATED:
307 // Ignore
308 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
309 // Ignore
310 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
311 // Ignore
312 case rpl_localusers, rpl_globalusers:
313 // Ignore
314 case irc.RPL_STATSVLINE, irc.RPL_STATSPING, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
315 // Ignore
316 default:
317 c.logger.Printf("unhandled upstream message: %v", msg)
318 }
319 return nil
320}
321
322func (c *upstreamConn) register() {
323 c.nick = c.upstream.Nick
324 c.messages <- &irc.Message{
325 Command: "NICK",
326 Params: []string{c.upstream.Nick},
327 }
328 c.messages <- &irc.Message{
329 Command: "USER",
330 Params: []string{c.upstream.Username, "0", "*", c.upstream.Realname},
331 }
332}
333
334func (c *upstreamConn) readMessages() error {
335 for {
336 msg, err := c.irc.ReadMessage()
337 if err == io.EOF {
338 break
339 } else if err != nil {
340 return fmt.Errorf("failed to read IRC command: %v", err)
341 }
342
343 if err := c.handleMessage(msg); err != nil {
344 c.logger.Printf("failed to handle message %q: %v", msg, err)
345 }
346 }
347
348 return nil
349}
Note: See TracBrowser for help on using the repository browser.